From 3b1e3a03e091a6597c467c8032d78dfbdc12428a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 22 Sep 2020 01:21:20 +0300 Subject: [PATCH] app/vmselect: make sure the request doesnt wait in pending queue more than the configured timeout Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711 --- app/vmselect/main.go | 15 +++++++++++---- app/vmselect/searchutils/searchutils.go | 13 +++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 1c6cb940a..305149a58 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -23,7 +24,8 @@ var ( deleteAuthKey = flag.String("deleteAuthKey", "", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series") maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+ "It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration") - maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached") + maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+ + "limit is reached; see also -search.maxQueryDuration") resetCacheAuthKey = flag.String("search.resetCacheAuthKey", "", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call") ) @@ -77,7 +79,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { default: // Sleep for a while until giving up. This should resolve short bursts in requests. concurrencyLimitReached.Inc() - t := timerpool.Get(*maxQueueDuration) + d := searchutils.GetMaxQueryDuration(r) + if d > *maxQueueDuration { + d = *maxQueueDuration + } + t := timerpool.Get(d) select { case concurrencyCh <- struct{}{}: timerpool.Put(t) @@ -87,8 +93,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { concurrencyLimitTimeout.Inc() err := &httpserver.ErrorWithStatusCode{ Err: fmt.Errorf("cannot handle more than %d concurrent search requests during %s; possible solutions: "+ - "increase `-search.maxQueueDuration`, increase `-search.maxConcurrentRequests`, increase server capacity", - *maxConcurrentRequests, *maxQueueDuration), + "increase `-search.maxQueueDuration`; increase `-search.maxQueryDuration`; increase `-search.maxConcurrentRequests`; "+ + "increase server capacity", + *maxConcurrentRequests, d), StatusCode: http.StatusServiceUnavailable, } httpserver.Errorf(w, r, "%s", err) diff --git a/app/vmselect/searchutils/searchutils.go b/app/vmselect/searchutils/searchutils.go index 19e453306..85e0a4688 100644 --- a/app/vmselect/searchutils/searchutils.go +++ b/app/vmselect/searchutils/searchutils.go @@ -104,6 +104,19 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000 +// GetMaxQueryDuration returns the maximum duration for query from r. +func GetMaxQueryDuration(r *http.Request) time.Duration { + dms, err := GetDuration(r, "timeout", 0) + if err != nil { + dms = 0 + } + d := time.Duration(dms) * time.Millisecond + if d <= 0 || d > *maxQueryDuration { + d = *maxQueryDuration + } + return d +} + // GetDeadlineForQuery returns deadline for the given query r. func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline { dMax := maxQueryDuration.Milliseconds()