mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
lib/writeconcurrencylimiter: moved the error generation from incConcurrency() to the caller place
This commit is contained in:
parent
7fb02f536a
commit
3864357772
1 changed files with 12 additions and 11 deletions
|
@ -62,7 +62,14 @@ var readerPool sync.Pool
|
||||||
func (r *Reader) Read(p []byte) (int, error) {
|
func (r *Reader) Read(p []byte) (int, error) {
|
||||||
n, err := r.r.Read(p)
|
n, err := r.r.Read(p)
|
||||||
if !r.increasedConcurrency {
|
if !r.increasedConcurrency {
|
||||||
if err := incConcurrency(); err != nil {
|
if !incConcurrency() {
|
||||||
|
err = &httpserver.ErrorWithStatusCode{
|
||||||
|
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are already executed. "+
|
||||||
|
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
|
||||||
|
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
|
||||||
|
maxQueueDuration.Seconds(), *maxConcurrentInserts),
|
||||||
|
StatusCode: http.StatusServiceUnavailable,
|
||||||
|
}
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
r.increasedConcurrency = true
|
r.increasedConcurrency = true
|
||||||
|
@ -87,12 +94,12 @@ var (
|
||||||
concurrencyLimitChOnce sync.Once
|
concurrencyLimitChOnce sync.Once
|
||||||
)
|
)
|
||||||
|
|
||||||
func incConcurrency() error {
|
func incConcurrency() bool {
|
||||||
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
|
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case concurrencyLimitCh <- struct{}{}:
|
case concurrencyLimitCh <- struct{}{}:
|
||||||
return nil
|
return true
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,17 +108,11 @@ func incConcurrency() error {
|
||||||
select {
|
select {
|
||||||
case concurrencyLimitCh <- struct{}{}:
|
case concurrencyLimitCh <- struct{}{}:
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
return nil
|
return true
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
concurrencyLimitTimeout.Inc()
|
concurrencyLimitTimeout.Inc()
|
||||||
return &httpserver.ErrorWithStatusCode{
|
return false
|
||||||
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are already executed. "+
|
|
||||||
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
|
|
||||||
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
|
|
||||||
maxQueueDuration.Seconds(), *maxConcurrentInserts),
|
|
||||||
StatusCode: http.StatusServiceUnavailable,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue