diff --git a/app/vminsert/concurrencylimiter/concurrencylimiter.go b/app/vminsert/concurrencylimiter/concurrencylimiter.go index ced37cc5f5..be0b7a54fa 100644 --- a/app/vminsert/concurrencylimiter/concurrencylimiter.go +++ b/app/vminsert/concurrencylimiter/concurrencylimiter.go @@ -32,6 +32,17 @@ func Init() { func Do(f func() error) error { // Limit the number of conurrent f calls in order to prevent from excess // memory usage and CPU trashing. + select { + case ch <- struct{}{}: + err := f() + <-ch + return err + default: + } + + // All the workers are busy. + // Sleep for up to waitDuration. + concurrencyLimitReached.Inc() t := timerpool.Get(waitDuration) select { case ch <- struct{}{}: @@ -41,9 +52,19 @@ func Do(f func() error) error { return err case <-t.C: timerpool.Put(t) - concurrencyLimitErrors.Inc() + concurrencyLimitTimeout.Inc() return fmt.Errorf("the server is overloaded with %d concurrent inserts; either increase -maxConcurrentInserts or reduce the load", cap(ch)) } } -var concurrencyLimitErrors = metrics.NewCounter(`vm_concurrency_limit_errors_total`) +var ( + concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`) + concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`) + + _ = metrics.NewGauge(`vm_concurrent_insert_capacity`, func() float64 { + return float64(cap(ch)) + }) + _ = metrics.NewGauge(`vm_concurrent_insert_current`, func() float64 { + return float64(len(ch)) + }) +) diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 6629841c7f..5100af632b 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -80,18 +80,37 @@ func main() { var concurrencyCh chan struct{} +var ( + concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_select_limit_reached_total`) + concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_select_limit_timeout_total`) + + _ = metrics.NewGauge(`vm_concurrent_select_capacity`, func() float64 { + return float64(cap(concurrencyCh)) + }) + _ = metrics.NewGauge(`vm_concurrent_select_current`, func() float64 { + return float64(len(concurrencyCh)) + }) +) + func requestHandler(w http.ResponseWriter, r *http.Request) bool { // Limit the number of concurrent queries. - // Sleep for a while until giving up. This should resolve short bursts in requests. - t := timerpool.Get(*maxQueueDuration) select { case concurrencyCh <- struct{}{}: - timerpool.Put(t) defer func() { <-concurrencyCh }() - case <-t.C: - timerpool.Put(t) - httpserver.Errorf(w, "cannot handle more than %d concurrent requests", cap(concurrencyCh)) - return true + default: + // Sleep for a while until giving up. This should resolve short bursts in requests. + concurrencyLimitReached.Inc() + t := timerpool.Get(*maxQueueDuration) + select { + case concurrencyCh <- struct{}{}: + timerpool.Put(t) + defer func() { <-concurrencyCh }() + case <-t.C: + timerpool.Put(t) + concurrencyLimitTimeout.Inc() + httpserver.Errorf(w, "cannot handle more than %d concurrent requests", cap(concurrencyCh)) + return true + } } path := r.URL.Path