lib/timerpool: use timer pool in concurrency limiters

This should reduce the number of memory allocations in highly loaded system
This commit is contained in:
Aliaksandr Valialkin 2019-05-28 17:17:19 +03:00
parent 0eae39daa7
commit e307a4d92c
4 changed files with 51 additions and 6 deletions

View file

@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"runtime" "runtime"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
) )
var ( var (
@ -21,14 +23,15 @@ var (
func Do(f func() error) error { func Do(f func() error) error {
// Limit the number of conurrent inserts in order to prevent from excess // Limit the number of conurrent inserts in order to prevent from excess
// memory usage and CPU trashing. // memory usage and CPU trashing.
t := time.NewTimer(waitDuration) t := timerpool.Get(waitDuration)
select { select {
case ch <- struct{}{}: case ch <- struct{}{}:
t.Stop() timerpool.Put(t)
err := f() err := f()
<-ch <-ch
return err return err
case <-t.C: case <-t.C:
timerpool.Put(t)
return fmt.Errorf("the server is overloaded with %d concurrent inserts; either increase the number of CPUs or reduce the load", cap(ch)) return fmt.Errorf("the server is overloaded with %d concurrent inserts; either increase the number of CPUs or reduce the load", cap(ch))
} }
} }

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -43,12 +44,13 @@ func Stop() {
func RequestHandler(w http.ResponseWriter, r *http.Request) bool { func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
// Limit the number of concurrent queries. // Limit the number of concurrent queries.
// Sleep for a while until giving up. This should resolve short bursts in requests. // Sleep for a while until giving up. This should resolve short bursts in requests.
t := time.NewTimer(*maxQueueDuration) t := timerpool.Get(*maxQueueDuration)
select { select {
case concurrencyCh <- struct{}{}: case concurrencyCh <- struct{}{}:
t.Stop() timerpool.Put(t)
defer func() { <-concurrencyCh }() defer func() { <-concurrencyCh }()
case <-t.C: case <-t.C:
timerpool.Put(t)
httpserver.Errorf(w, "cannot handle more than %d concurrent requests", cap(concurrencyCh)) httpserver.Errorf(w, "cannot handle more than %d concurrent requests", cap(concurrencyCh))
return true return true
} }

View file

@ -18,6 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@ -553,12 +554,13 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
// Limit the number of concurrent goroutines that may add rows to the storage. // Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU trashing when too many // This should prevent from out of memory errors and CPU trashing when too many
// goroutines call AddRows. // goroutines call AddRows.
t := time.NewTimer(addRowsTimeout) t := timerpool.Get(addRowsTimeout)
select { select {
case addRowsConcurrencyCh <- struct{}{}: case addRowsConcurrencyCh <- struct{}{}:
t.Stop() timerpool.Put(t)
defer func() { <-addRowsConcurrencyCh }() defer func() { <-addRowsConcurrencyCh }()
case <-t.C: case <-t.C:
timerpool.Put(t)
return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load", return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load",
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
} }

View file

@ -0,0 +1,38 @@
package timerpool
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// Get returns a timer for the given duration d from the pool.
//
// Return back the timer to the pool with Put.
func Get(d time.Duration) *time.Timer {
if v := timerPool.Get(); v != nil {
t := v.(*time.Timer)
if t.Reset(d) {
logger.Panicf("BUG: active timer trapped to the pool!")
}
return t
}
return time.NewTimer(d)
}
// Put returns t to the pool.
//
// t cannot be accessed after returning to the pool.
func Put(t *time.Timer) {
if !t.Stop() {
// Drain t.C if it wasn't obtained by the caller yet.
select {
case <-t.C:
default:
}
}
timerPool.Put(t)
}
var timerPool sync.Pool