diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 03ca4f5ed5..16f763a1ca 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -449,7 +449,7 @@ func (a *aggregator) dedupFlush() { skipAggrSuffix: true, } a.dedupAggr.appendSeriesForFlush(ctx) - a.push(ctx.tss) + a.push(ctx.tss, false) } func (a *aggregator) flush() { @@ -500,7 +500,7 @@ func (a *aggregator) MustStop() { // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { if a.dedupAggr == nil { - a.push(tss) + a.push(tss, true) return } @@ -509,25 +509,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { pushSample := a.dedupAggr.pushSample inputKey := "" bb := bbPool.Get() - for _, ts := range tss { - bb.B = marshalLabelsFast(bb.B[:0], ts.Labels) - outputKey := bytesutil.InternBytes(bb.B) - for _, sample := range ts.Samples { - pushSample(inputKey, outputKey, sample.Value) - } - } - bbPool.Put(bb) -} - -func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { labels := promutils.GetLabels() - tmpLabels := promutils.GetLabels() - bb := bbPool.Get() for _, ts := range tss { if !a.match.Match(ts.Labels) { continue } - labels.Labels = append(labels.Labels[:0], ts.Labels...) labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) if len(labels.Labels) == 0 { @@ -536,6 +522,34 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { } labels.Sort() + bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) + outputKey := bytesutil.InternBytes(bb.B) + for _, sample := range ts.Samples { + pushSample(inputKey, outputKey, sample.Value) + } + } + promutils.PutLabels(labels) + bbPool.Put(bb) +} + +func (a *aggregator) push(tss []prompbmarshal.TimeSeries, applyFilters bool) { + labels := promutils.GetLabels() + tmpLabels := promutils.GetLabels() + bb := bbPool.Get() + for _, ts := range tss { + if applyFilters && !a.match.Match(ts.Labels) { + continue + } + labels.Labels = append(labels.Labels[:0], ts.Labels...) + if applyFilters { + labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) + if len(labels.Labels) == 0 { + // The metric has been deleted by the relabeling + continue + } + labels.Sort() + } + if a.aggregateOnlyByTime { bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) } else {