VictoriaMetrics/app/vminsert/concurrencylimiter/concurrencylimiter.go

50 lines
1.3 KiB
Go
Raw Normal View History

2019-05-22 21:16:55 +00:00
package concurrencylimiter
import (
"flag"
2019-05-22 21:16:55 +00:00
"fmt"
"runtime"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
2019-05-22 21:16:55 +00:00
)
var maxConcurrentInserts = flag.Int("maxConcurrentInserts", runtime.GOMAXPROCS(-1)*4, "The maximum number of concurrent inserts")
2019-05-22 21:16:55 +00:00
var (
// ch is the channel for limiting concurrent calls to Do.
ch chan struct{}
2019-05-22 21:16:55 +00:00
// waitDuration is the amount of time to wait until at least a single
// concurrent Do call out of cap(ch) inserts is complete.
2019-05-22 21:16:55 +00:00
waitDuration = time.Second * 30
)
// Init initializes concurrencylimiter.
//
// Init must be called after flag.Parse call.
func Init() {
ch = make(chan struct{}, *maxConcurrentInserts)
}
2019-05-22 21:16:55 +00:00
// Do calls f with the limited concurrency.
func Do(f func() error) error {
// Limit the number of conurrent f calls in order to prevent from excess
2019-05-22 21:16:55 +00:00
// memory usage and CPU trashing.
t := timerpool.Get(waitDuration)
2019-05-22 21:16:55 +00:00
select {
case ch <- struct{}{}:
timerpool.Put(t)
2019-05-22 21:16:55 +00:00
err := f()
<-ch
return err
case <-t.C:
timerpool.Put(t)
concurrencyLimitErrors.Inc()
return fmt.Errorf("the server is overloaded with %d concurrent inserts; either -maxConcurrentInserts or reduce the load", cap(ch))
2019-05-22 21:16:55 +00:00
}
}
var concurrencyLimitErrors = metrics.NewCounter(`vm_concurrency_limit_errors_total`)