app/vminsert: add -maxConcurrentInserts command-line flag for limiting the number of concurrent inserts

This commit is contained in:
Aliaksandr Valialkin 2019-05-29 12:35:47 +03:00
parent 61bad1e07e
commit 56b952c456
2 changed files with 21 additions and 7 deletions

View file

@ -1,27 +1,36 @@
package concurrencylimiter package concurrencylimiter
import ( import (
"flag"
"fmt" "fmt"
"runtime" "runtime"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
) )
var maxConcurrentInserts = flag.Int("maxConcurrentInserts", runtime.GOMAXPROCS(-1)*4, "The maximum number of concurrent inserts")
var ( var (
// ch is the channel for limiting concurrent inserts. // ch is the channel for limiting concurrent calls to Do.
// Put an item into it before performing an insert and remove ch chan struct{}
// the item after the insert is complete.
ch = make(chan struct{}, runtime.GOMAXPROCS(-1)*2)
// waitDuration is the amount of time to wait until at least a single // waitDuration is the amount of time to wait until at least a single
// concurrent insert out of cap(Ch) inserts is complete. // concurrent Do call out of cap(ch) inserts is complete.
waitDuration = time.Second * 30 waitDuration = time.Second * 30
) )
// Init initializes concurrencylimiter.
//
// Init must be called after flag.Parse call.
func Init() {
ch = make(chan struct{}, *maxConcurrentInserts)
}
// Do calls f with the limited concurrency. // Do calls f with the limited concurrency.
func Do(f func() error) error { func Do(f func() error) error {
// Limit the number of conurrent inserts in order to prevent from excess // Limit the number of conurrent f calls in order to prevent from excess
// memory usage and CPU trashing. // memory usage and CPU trashing.
t := timerpool.Get(waitDuration) t := timerpool.Get(waitDuration)
select { select {
@ -32,6 +41,9 @@ func Do(f func() error) error {
return err return err
case <-t.C: case <-t.C:
timerpool.Put(t) timerpool.Put(t)
return fmt.Errorf("the server is overloaded with %d concurrent inserts; either increase the number of CPUs or reduce the load", cap(ch)) concurrencyLimitErrors.Inc()
return fmt.Errorf("the server is overloaded with %d concurrent inserts; either -maxConcurrentInserts or reduce the load", cap(ch))
} }
} }
var concurrencyLimitErrors = metrics.NewCounter(`vm_concurrency_limit_errors_total`)

View file

@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"strings" "strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
@ -22,6 +23,7 @@ var (
// Init initializes vminsert. // Init initializes vminsert.
func Init() { func Init() {
concurrencylimiter.Init()
if len(*graphiteListenAddr) > 0 { if len(*graphiteListenAddr) > 0 {
go graphite.Serve(*graphiteListenAddr) go graphite.Serve(*graphiteListenAddr)
} }