From 56b952c4567229862cd8d520ca876202d65fc008 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 29 May 2019 12:35:47 +0300 Subject: [PATCH] app/vminsert: add `-maxConcurrentInserts` command-line flag for limiting the number of concurrent inserts --- .../concurrencylimiter/concurrencylimiter.go | 26 ++++++++++++++----- app/vminsert/main.go | 2 ++ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/app/vminsert/concurrencylimiter/concurrencylimiter.go b/app/vminsert/concurrencylimiter/concurrencylimiter.go index 82895171a..42a6af168 100644 --- a/app/vminsert/concurrencylimiter/concurrencylimiter.go +++ b/app/vminsert/concurrencylimiter/concurrencylimiter.go @@ -1,27 +1,36 @@ package concurrencylimiter import ( + "flag" "fmt" "runtime" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/metrics" ) +var maxConcurrentInserts = flag.Int("maxConcurrentInserts", runtime.GOMAXPROCS(-1)*4, "The maximum number of concurrent inserts") + var ( - // ch is the channel for limiting concurrent inserts. - // Put an item into it before performing an insert and remove - // the item after the insert is complete. - ch = make(chan struct{}, runtime.GOMAXPROCS(-1)*2) + // ch is the channel for limiting concurrent calls to Do. + ch chan struct{} // waitDuration is the amount of time to wait until at least a single - // concurrent insert out of cap(Ch) inserts is complete. + // concurrent Do call out of cap(ch) inserts is complete. waitDuration = time.Second * 30 ) +// Init initializes concurrencylimiter. +// +// Init must be called after flag.Parse call. +func Init() { + ch = make(chan struct{}, *maxConcurrentInserts) +} + // Do calls f with the limited concurrency. func Do(f func() error) error { - // Limit the number of conurrent inserts in order to prevent from excess + // Limit the number of conurrent f calls in order to prevent from excess // memory usage and CPU trashing. t := timerpool.Get(waitDuration) select { @@ -32,6 +41,9 @@ func Do(f func() error) error { return err case <-t.C: timerpool.Put(t) - return fmt.Errorf("the server is overloaded with %d concurrent inserts; either increase the number of CPUs or reduce the load", cap(ch)) + concurrencyLimitErrors.Inc() + return fmt.Errorf("the server is overloaded with %d concurrent inserts; either -maxConcurrentInserts or reduce the load", cap(ch)) } } + +var concurrencyLimitErrors = metrics.NewCounter(`vm_concurrency_limit_errors_total`) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index eff463b81..24863ff44 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" @@ -22,6 +23,7 @@ var ( // Init initializes vminsert. func Init() { + concurrencylimiter.Init() if len(*graphiteListenAddr) > 0 { go graphite.Serve(*graphiteListenAddr) }