lib/streamaggr: skip empty aggregators (#6307)

Prevent excessive resource usage when stream aggregation config file
contains no matchers by prevent pushing data into Aggregators object.
Before this change a lot of extra work was invoked without reason.

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2024-05-20 14:03:28 +02:00 committed by GitHub
parent 7dc18bf67a
commit 7ce052b32d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 16 additions and 4 deletions

View file

@ -494,7 +494,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
}
sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock)
if sas != nil {
if sas.IsEnabled() {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tssBlock, matchIdxs.B)
if !*streamAggrGlobalKeepInput {
@ -901,7 +901,7 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa
// Apply stream aggregation or deduplication if they are configured
sas := rwctx.sas.Load()
if sas != nil {
if sas.IsEnabled() {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tss, matchIdxs.B)
if !rwctx.streamAggrKeepInput {

View file

@ -140,7 +140,7 @@ func (ctx *InsertCtx) ApplyRelabeling() {
// FlushBufs flushes buffered rows to the underlying storage.
func (ctx *InsertCtx) FlushBufs() error {
sas := sasGlobal.Load()
if (sas != nil || deduplicator != nil) && !ctx.skipStreamAggr {
if (sas.IsEnabled() || deduplicator != nil) && !ctx.skipStreamAggr {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = ctx.streamAggrCtx.push(ctx.mrs, matchIdxs.B)
if !*streamAggrKeepInput {

View file

@ -242,7 +242,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte
tss = tss[tssLen:]
sas := sasGlobal.Load()
if sas != nil {
if sas.IsEnabled() {
matchIdxs = sas.Push(tss, matchIdxs)
} else if deduplicator != nil {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))

View file

@ -53,6 +53,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): skip empty data blocks before sending to the remote write destination. Thanks to @viperstars for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `<output>_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent from excessive resource usage when stream aggregation config file is empty.
* BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): correctly apply `-inmemoryDataFlushInterval` when it's set to minimum supported value 1s.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly release memory used for metrics during config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6247).

View file

@ -294,6 +294,17 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg
}, nil
}
// IsEnabled returns true if Aggregators has at least one configured aggregator
func (a *Aggregators) IsEnabled() bool {
if a == nil {
return false
}
if len(a.as) == 0 {
return false
}
return true
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
if a == nil {