diff --git a/lib/writeconcurrencylimiter/concurrencylimiter.go b/lib/writeconcurrencylimiter/concurrencylimiter.go index a54672d9b..a280ca058 100644 --- a/lib/writeconcurrencylimiter/concurrencylimiter.go +++ b/lib/writeconcurrencylimiter/concurrencylimiter.go @@ -62,7 +62,14 @@ var readerPool sync.Pool func (r *Reader) Read(p []byte) (int, error) { n, err := r.r.Read(p) if !r.increasedConcurrency { - if err := incConcurrency(); err != nil { + if !incConcurrency() { + err = &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are already executed. "+ + "Possible solutions: to reduce workload; to increase compute resources at the server; "+ + "to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts", + maxQueueDuration.Seconds(), *maxConcurrentInserts), + StatusCode: http.StatusServiceUnavailable, + } return 0, err } r.increasedConcurrency = true @@ -87,12 +94,12 @@ var ( concurrencyLimitChOnce sync.Once ) -func incConcurrency() error { +func incConcurrency() bool { concurrencyLimitChOnce.Do(initConcurrencyLimitCh) select { case concurrencyLimitCh <- struct{}{}: - return nil + return true default: } @@ -101,17 +108,11 @@ func incConcurrency() error { select { case concurrencyLimitCh <- struct{}{}: timerpool.Put(t) - return nil + return true case <-t.C: timerpool.Put(t) concurrencyLimitTimeout.Inc() - return &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are already executed. "+ - "Possible solutions: to reduce workload; to increase compute resources at the server; "+ - "to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts", - maxQueueDuration.Seconds(), *maxConcurrentInserts), - StatusCode: http.StatusServiceUnavailable, - } + return false } }