From e307a4d92c94b2985cefba24c44b7fdf9a452cf6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 28 May 2019 17:17:19 +0300 Subject: [PATCH] lib/timerpool: use timer pool in concurrency limiters This should reduce the number of memory allocations in highly loaded system --- .../concurrencylimiter/concurrencylimiter.go | 7 +++- app/vmselect/main.go | 6 ++- lib/storage/storage.go | 6 ++- lib/timerpool/timerpool.go | 38 +++++++++++++++++++ 4 files changed, 51 insertions(+), 6 deletions(-) create mode 100644 lib/timerpool/timerpool.go diff --git a/app/vminsert/concurrencylimiter/concurrencylimiter.go b/app/vminsert/concurrencylimiter/concurrencylimiter.go index f65b32a6e..82895171a 100644 --- a/app/vminsert/concurrencylimiter/concurrencylimiter.go +++ b/app/vminsert/concurrencylimiter/concurrencylimiter.go @@ -4,6 +4,8 @@ import ( "fmt" "runtime" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" ) var ( @@ -21,14 +23,15 @@ var ( func Do(f func() error) error { // Limit the number of conurrent inserts in order to prevent from excess // memory usage and CPU trashing. - t := time.NewTimer(waitDuration) + t := timerpool.Get(waitDuration) select { case ch <- struct{}{}: - t.Stop() + timerpool.Put(t) err := f() <-ch return err case <-t.C: + timerpool.Put(t) return fmt.Errorf("the server is overloaded with %d concurrent inserts; either increase the number of CPUs or reduce the load", cap(ch)) } } diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 315eb9c79..aede06d22 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" ) @@ -43,12 +44,13 @@ func Stop() { func RequestHandler(w http.ResponseWriter, r *http.Request) bool { // Limit the number of concurrent queries. // Sleep for a while until giving up. This should resolve short bursts in requests. - t := time.NewTimer(*maxQueueDuration) + t := timerpool.Get(*maxQueueDuration) select { case concurrencyCh <- struct{}{}: - t.Stop() + timerpool.Put(t) defer func() { <-concurrencyCh }() case <-t.C: + timerpool.Put(t) httpserver.Errorf(w, "cannot handle more than %d concurrent requests", cap(concurrencyCh)) return true } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 394191fb6..8da439cd8 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/fastcache" "golang.org/x/sys/unix" ) @@ -553,12 +554,13 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { // Limit the number of concurrent goroutines that may add rows to the storage. // This should prevent from out of memory errors and CPU trashing when too many // goroutines call AddRows. - t := time.NewTimer(addRowsTimeout) + t := timerpool.Get(addRowsTimeout) select { case addRowsConcurrencyCh <- struct{}{}: - t.Stop() + timerpool.Put(t) defer func() { <-addRowsConcurrencyCh }() case <-t.C: + timerpool.Put(t) return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load", len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) } diff --git a/lib/timerpool/timerpool.go b/lib/timerpool/timerpool.go new file mode 100644 index 000000000..746f458e0 --- /dev/null +++ b/lib/timerpool/timerpool.go @@ -0,0 +1,38 @@ +package timerpool + +import ( + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// Get returns a timer for the given duration d from the pool. +// +// Return back the timer to the pool with Put. +func Get(d time.Duration) *time.Timer { + if v := timerPool.Get(); v != nil { + t := v.(*time.Timer) + if t.Reset(d) { + logger.Panicf("BUG: active timer trapped to the pool!") + } + return t + } + return time.NewTimer(d) +} + +// Put returns t to the pool. +// +// t cannot be accessed after returning to the pool. +func Put(t *time.Timer) { + if !t.Stop() { + // Drain t.C if it wasn't obtained by the caller yet. + select { + case <-t.C: + default: + } + } + timerPool.Put(t) +} + +var timerPool sync.Pool