From 6369c88a68ad8f0f95443696f2c653929edf7903 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Thu, 23 Feb 2023 18:40:31 -0800
Subject: [PATCH] app/vmselect: add -search.logQueryMemoryUsage command-line
 flag for logging queries, which take big amounts of memory

Thanks to @michal-kralik for initial attempts for this feature:

- https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3651
- https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3715

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3553
---
 app/vmselect/main.go                  |  7 ++++---
 app/vmselect/prometheus/prometheus.go |  6 ++++++
 app/vmselect/promql/eval.go           | 26 +++++++++++++++++++++-----
 docs/CHANGELOG.md                     |  1 +
 4 files changed, 32 insertions(+), 8 deletions(-)

diff --git a/app/vmselect/main.go b/app/vmselect/main.go
index dc4226bc3..da58b9210 100644
--- a/app/vmselect/main.go
+++ b/app/vmselect/main.go
@@ -50,9 +50,10 @@ var (
 	minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
 		"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details")
 	resetCacheAuthKey    = flag.String("search.resetCacheAuthKey", "", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call")
-	logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging")
-	vmalertProxyURL      = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules")
-	storageNodes         = flagutil.NewArrayString("storageNode", "Comma-separated addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1,...,vmstorage-hostN . "+
+	logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
+		"See also -search.logQueryMemoryUsage")
+	vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules")
+	storageNodes    = flagutil.NewArrayString("storageNode", "Comma-separated addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1,...,vmstorage-hostN . "+
 		"Enterprise version of VictoriaMetrics supports automatic discovery of vmstorage addresses via dns+srv records. For example, -storageNode=dns+srv:vmstorage.addrs . "+
 		"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery")
 
diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go
index c79aac33f..2d6f9910f 100644
--- a/app/vmselect/prometheus/prometheus.go
+++ b/app/vmselect/prometheus/prometheus.go
@@ -855,6 +855,9 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
 		LookbackDelta:       lookbackDelta,
 		RoundDigits:         getRoundDigits(r),
 		EnforcedTagFilterss: etfs,
+		GetRequestURI: func() string {
+			return httpserver.GetRequestURI(r)
+		},
 
 		DenyPartialResponse: searchutils.GetDenyPartialResponse(r),
 	}
@@ -959,6 +962,9 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
 		LookbackDelta:       lookbackDelta,
 		RoundDigits:         getRoundDigits(r),
 		EnforcedTagFilterss: etfs,
+		GetRequestURI: func() string {
+			return httpserver.GetRequestURI(r)
+		},
 
 		DenyPartialResponse: searchutils.GetDenyPartialResponse(r),
 	}
diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go
index 58c586e18..98b53089c 100644
--- a/app/vmselect/promql/eval.go
+++ b/app/vmselect/promql/eval.go
@@ -31,7 +31,11 @@ var (
 		"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
 	maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
 		"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
-		"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests")
+		"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
+		"See also -search.logQueryMemoryUsage")
+	logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log queries, which require more memory than specified by this flag. "+
+		"This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+
+		"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery")
 	noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
 		"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
 )
@@ -125,6 +129,10 @@ type EvalConfig struct {
 	// EnforcedTagFilterss may contain additional label filters to use in the query.
 	EnforcedTagFilterss [][]storage.TagFilter
 
+	// The callback, which returns the request URI during logging.
+	// The request URI isn't stored here because its' construction may take non-trivial amounts of CPU.
+	GetRequestURI func() string
+
 	// Whether to deny partial response.
 	DenyPartialResponse bool
 
@@ -149,6 +157,7 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
 	ec.LookbackDelta = src.LookbackDelta
 	ec.RoundDigits = src.RoundDigits
 	ec.EnforcedTagFilterss = src.EnforcedTagFilterss
+	ec.GetRequestURI = src.GetRequestURI
 	ec.DenyPartialResponse = src.DenyPartialResponse
 	ec.IsPartialResponse = src.IsPartialResponse
 
@@ -1099,26 +1108,33 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
 	}
 	rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(timeseriesLen*len(rcs)))
 	rollupMemorySize = sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16))
+	if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory {
+		requestURI := ec.GetRequestURI()
+		logger.Warnf("remoteAddr=%s, requestURI=%s: the %s requires %d bytes of memory for processing; "+
+			"logging this query, since it exceeds the -search.logQueryMemoryUsage=%d; "+
+			"the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series",
+			ec.QuotedRemoteAddr, requestURI, expr.AppendString(nil), rollupMemorySize, maxMemory, timeseriesLen*len(rcs), rollupPoints)
+	}
 	if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
 		rss.Cancel()
 		return nil, &UserReadableError{
-			Err: fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series "+
+			Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+
 				"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
 				"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
 				"increasing -search.maxMemoryPerQuery",
-				rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3),
+				expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, maxMemory, rollupMemorySize, float64(ec.Step)/1e3),
 		}
 	}
 	rml := getRollupMemoryLimiter()
 	if !rml.Get(uint64(rollupMemorySize)) {
 		rss.Cancel()
 		return nil, &UserReadableError{
-			Err: fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series; "+
+			Err: fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series; "+
 				"total available memory for concurrent requests: %d bytes; "+
 				"requested memory: %d bytes; "+
 				"possible solutions are: reducing the number of matching time series; increasing `step` query arg (step=%gs); "+
 				"switching to node with more RAM; increasing -memory.allowedPercent",
-				rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3),
+				expr.AppendString(nil), rollupPoints, timeseriesLen*len(rcs), pointsPerTimeseries, rml.MaxSize, uint64(rollupMemorySize), float64(ec.Step)/1e3),
 		}
 	}
 	defer rml.Put(uint64(rollupMemorySize))
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 1949f040c..9bbb361cf 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -33,6 +33,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
 * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_outliers(k, q)` function for dropping outliers located farther than `k*range_mad(q)` from the `range_median(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
 * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_zscore(z, q)` function for dropping outliers located farther than `z*range_stddev(q)` from `range_avg(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706).
+* FEATURE: add `-search.logQueryMemoryUsage` command-line flag for logging queries, which need more memory than specified by this command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3553). Thanks to @michal-kralik for the idea and the intial implementation.
 * FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`.
 
 * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): immediately cancel in-flight scrape requests during configuration reload when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is disabled. Previously `vmagent` could wait for long time until all the in-flight requests are completed before reloading the configuration. This could significantly slow down configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747).