From 7ce052b32d29b037dea620f59fdfc4ef4b895701 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 20 May 2024 14:03:28 +0200 Subject: [PATCH] lib/streamaggr: skip empty aggregators (#6307) Prevent excessive resource usage when stream aggregation config file contains no matchers by prevent pushing data into Aggregators object. Before this change a lot of extra work was invoked without reason. Signed-off-by: hagen1778 --- app/vmagent/remotewrite/remotewrite.go | 4 ++-- app/vminsert/common/insert_ctx.go | 2 +- app/vminsert/common/streamaggr.go | 2 +- docs/CHANGELOG.md | 1 + lib/streamaggr/streamaggr.go | 11 +++++++++++ 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 3107310c51..a04361ffbd 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -494,7 +494,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF } sortLabelsIfNeeded(tssBlock) tssBlock = limitSeriesCardinality(tssBlock) - if sas != nil { + if sas.IsEnabled() { matchIdxs := matchIdxsPool.Get() matchIdxs.B = sas.Push(tssBlock, matchIdxs.B) if !*streamAggrGlobalKeepInput { @@ -901,7 +901,7 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa // Apply stream aggregation or deduplication if they are configured sas := rwctx.sas.Load() - if sas != nil { + if sas.IsEnabled() { matchIdxs := matchIdxsPool.Get() matchIdxs.B = sas.Push(tss, matchIdxs.B) if !rwctx.streamAggrKeepInput { diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index e5ce6d3ea4..f7a06960b5 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -140,7 +140,7 @@ func (ctx *InsertCtx) ApplyRelabeling() { // FlushBufs flushes buffered rows to the underlying storage. func (ctx *InsertCtx) FlushBufs() error { sas := sasGlobal.Load() - if (sas != nil || deduplicator != nil) && !ctx.skipStreamAggr { + if (sas.IsEnabled() || deduplicator != nil) && !ctx.skipStreamAggr { matchIdxs := matchIdxsPool.Get() matchIdxs.B = ctx.streamAggrCtx.push(ctx.mrs, matchIdxs.B) if !*streamAggrKeepInput { diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 8d2be61059..39b4478c91 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -242,7 +242,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte tss = tss[tssLen:] sas := sasGlobal.Load() - if sas != nil { + if sas.IsEnabled() { matchIdxs = sas.Push(tss, matchIdxs) } else if deduplicator != nil { matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7340b8fd77..ab2791aa9d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -53,6 +53,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): skip empty data blocks before sending to the remote write destination. Thanks to @viperstars for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix. +* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent from excessive resource usage when stream aggregation config file is empty. * BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): correctly apply `-inmemoryDataFlushInterval` when it's set to minimum supported value 1s. * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly release memory used for metrics during config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6247). diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index d3e2daa874..6657b2e045 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -294,6 +294,17 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg }, nil } +// IsEnabled returns true if Aggregators has at least one configured aggregator +func (a *Aggregators) IsEnabled() bool { + if a == nil { + return false + } + if len(a.as) == 0 { + return false + } + return true +} + // MustStop stops a. func (a *Aggregators) MustStop() { if a == nil {