app/vmselect: improve error message when the request cannot be started because too many concurrent requests are already executed

This commit is contained in:
Aliaksandr Valialkin 2023-01-06 18:19:05 -08:00
parent be896ddfd4
commit cd705b0f69
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -113,7 +113,7 @@ func main() {
netstorage.InitTmpBlocksDir("") netstorage.InitTmpBlocksDir("")
promql.InitRollupResultCache("") promql.InitRollupResultCache("")
} }
concurrencyCh = make(chan struct{}, *maxConcurrentRequests) concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
initVMAlertProxy() initVMAlertProxy()
var vmselectapiServer *vmselectapi.Server var vmselectapiServer *vmselectapi.Server
if *clusternativeListenAddr != "" { if *clusternativeListenAddr != "" {
@ -159,17 +159,17 @@ func main() {
logger.Infof("the vmselect has been stopped") logger.Infof("the vmselect has been stopped")
} }
var concurrencyCh chan struct{} var concurrencyLimitCh chan struct{}
var ( var (
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_select_limit_reached_total`) concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_select_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_select_limit_timeout_total`) concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_select_limit_timeout_total`)
_ = metrics.NewGauge(`vm_concurrent_select_capacity`, func() float64 { _ = metrics.NewGauge(`vm_concurrent_select_capacity`, func() float64 {
return float64(cap(concurrencyCh)) return float64(cap(concurrencyLimitCh))
}) })
_ = metrics.NewGauge(`vm_concurrent_select_current`, func() float64 { _ = metrics.NewGauge(`vm_concurrent_select_current`, func() float64 {
return float64(len(concurrencyCh)) return float64(len(concurrencyLimitCh))
}) })
) )
@ -191,8 +191,8 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
// Limit the number of concurrent queries. // Limit the number of concurrent queries.
select { select {
case concurrencyCh <- struct{}{}: case concurrencyLimitCh <- struct{}{}:
defer func() { <-concurrencyCh }() defer func() { <-concurrencyLimitCh }()
default: default:
// Sleep for a while until giving up. This should resolve short bursts in requests. // Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc() concurrencyLimitReached.Inc()
@ -202,18 +202,18 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
t := timerpool.Get(d) t := timerpool.Get(d)
select { select {
case concurrencyCh <- struct{}{}: case concurrencyLimitCh <- struct{}{}:
qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", *maxConcurrentRequests)
timerpool.Put(t) timerpool.Put(t)
defer func() { <-concurrencyCh }() qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", *maxConcurrentRequests)
defer func() { <-concurrencyLimitCh }()
case <-t.C: case <-t.C:
timerpool.Put(t) timerpool.Put(t)
concurrencyLimitTimeout.Inc() concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{ err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot handle more than %d concurrent search requests during %s; possible solutions: "+ Err: fmt.Errorf("couldn't start executing the request in %.3fs, since -search.maxConcurrentRequests=%d concurrent requests "+
"increase `-search.maxQueueDuration`; increase `-search.maxQueryDuration`; increase `-search.maxConcurrentRequests`; "+ "are already executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"increase server capacity", "to increase -search.maxQueueDuration; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
*maxConcurrentRequests, d), d.Seconds(), *maxConcurrentRequests),
StatusCode: http.StatusServiceUnavailable, StatusCode: http.StatusServiceUnavailable,
} }
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)