From 01a2859f43e31a2271f33a5c32c19e78b83af474 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 22 Jul 2023 16:41:43 -0700 Subject: [PATCH] lib/streamaggr: skip de-duplication for series, which do not match the configured aggregation rules Previously all the incoming samples were de-duplicated, even if their series doesn't match aggregation rule filters. This could result in increased CPU usage. Now the de-duplication isn't applied to samples for series, which do not match aggregation rule filters. Such samples are just ignored. --- lib/streamaggr/streamaggr.go | 46 +++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 03ca4f5ed5..16f763a1ca 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -449,7 +449,7 @@ func (a *aggregator) dedupFlush() { skipAggrSuffix: true, } a.dedupAggr.appendSeriesForFlush(ctx) - a.push(ctx.tss) + a.push(ctx.tss, false) } func (a *aggregator) flush() { @@ -500,7 +500,7 @@ func (a *aggregator) MustStop() { // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { if a.dedupAggr == nil { - a.push(tss) + a.push(tss, true) return } @@ -509,25 +509,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { pushSample := a.dedupAggr.pushSample inputKey := "" bb := bbPool.Get() - for _, ts := range tss { - bb.B = marshalLabelsFast(bb.B[:0], ts.Labels) - outputKey := bytesutil.InternBytes(bb.B) - for _, sample := range ts.Samples { - pushSample(inputKey, outputKey, sample.Value) - } - } - bbPool.Put(bb) -} - -func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { labels := promutils.GetLabels() - tmpLabels := promutils.GetLabels() - bb := bbPool.Get() for _, ts := range tss { if !a.match.Match(ts.Labels) { continue } - labels.Labels = append(labels.Labels[:0], ts.Labels...) labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) if len(labels.Labels) == 0 { @@ -536,6 +522,34 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { } labels.Sort() + bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) + outputKey := bytesutil.InternBytes(bb.B) + for _, sample := range ts.Samples { + pushSample(inputKey, outputKey, sample.Value) + } + } + promutils.PutLabels(labels) + bbPool.Put(bb) +} + +func (a *aggregator) push(tss []prompbmarshal.TimeSeries, applyFilters bool) { + labels := promutils.GetLabels() + tmpLabels := promutils.GetLabels() + bb := bbPool.Get() + for _, ts := range tss { + if applyFilters && !a.match.Match(ts.Labels) { + continue + } + labels.Labels = append(labels.Labels[:0], ts.Labels...) + if applyFilters { + labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) + if len(labels.Labels) == 0 { + // The metric has been deleted by the relabeling + continue + } + labels.Sort() + } + if a.aggregateOnlyByTime { bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) } else {