diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 0f2b2ba5c..6770dfa0e 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -620,7 +620,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { // from affecting time series for other remoteWrite.url configs. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599 - v = tssRelabelPool.Get().(*[]prompbmarshal.TimeSeries) + v = tssPool.Get().(*[]prompbmarshal.TimeSeries) tss = append(*v, tss...) rowsCountBeforeRelabel := getRowsCount(tss) tss = rctx.applyRelabeling(tss, nil, pcs) @@ -630,20 +630,41 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { rowsCount := getRowsCount(tss) rwctx.rowsPushedAfterRelabel.Add(rowsCount) - // Apply stream aggregation if any - sas := rwctx.sas.Load() - sas.Push(tss) - if sas == nil || rwctx.streamAggrKeepInput { - // Push samples to the remote storage - rwctx.pushInternal(tss) + defer func() { + // Return back relabeling contexts to the pool + if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + } } - // Return back relabeling contexts to the pool - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssRelabelPool.Put(v) - putRelabelCtx(rctx) + // Load stream aggregagation config + sas := rwctx.sas.Load() + + // Fast path, no need to track used series + if sas == nil || rwctx.streamAggrKeepInput { + // Apply stream aggregation to the input samples + // it's safe to call sas.Push with sas == nil + sas.Push(tss, nil) + + // Push all samples to the remote storage + rwctx.pushInternal(tss) + + return } + + // Track series which were used for stream aggregation. + ut := streamaggr.NewTssUsageTracker(len(tss)) + sas.Push(tss, ut.Matched) + + unmatchedSeries := tssPool.Get().(*[]prompbmarshal.TimeSeries) + // Push only unmatched series to the remote storage + *unmatchedSeries = ut.GetUnmatched(tss, *unmatchedSeries) + rwctx.pushInternal(*unmatchedSeries) + + *unmatchedSeries = prompbmarshal.ResetTimeSeries(*unmatchedSeries) + tssPool.Put(unmatchedSeries) } func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { @@ -682,7 +703,7 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } -var tssRelabelPool = &sync.Pool{ +var tssPool = &sync.Pool{ New: func() interface{} { a := []prompbmarshal.TimeSeries{} return &a diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 12bbb83e0..15118e07a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,10 @@ The following `tip` changes can be tested by building VictoriaMetrics components ## tip +**Update notes:** release contains breaking change to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) `-remoteWrite.streamAggr.keepInput` command-line flag. +Default behaviour has changed to keep metrics which were not matched by any aggregation rule when `-remoteWrite.streamAggr.keepInput` is set to false (default value). +See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) for details. + * SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html). * SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved). diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 16f763a1c..939b33f7f 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "gopkg.in/yaml.v2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -18,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "gopkg.in/yaml.v2" ) var supportedOutputs = []string{ @@ -194,12 +195,12 @@ func (a *Aggregators) Equal(b *Aggregators) bool { } // Push pushes tss to a. -func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) { +func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) { if a == nil { return } for _, aggr := range a.as { - aggr.Push(tss) + aggr.Push(tss, matched) } } @@ -449,7 +450,7 @@ func (a *aggregator) dedupFlush() { skipAggrSuffix: true, } a.dedupAggr.appendSeriesForFlush(ctx) - a.push(ctx.tss, false) + a.push(ctx.tss, nil) } func (a *aggregator) flush() { @@ -498,9 +499,9 @@ func (a *aggregator) MustStop() { } // Push pushes tss to a. -func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { +func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) { if a.dedupAggr == nil { - a.push(tss, true) + a.push(tss, matched) return } @@ -509,20 +510,14 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { pushSample := a.dedupAggr.pushSample inputKey := "" bb := bbPool.Get() - labels := promutils.GetLabels() - for _, ts := range tss { + for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue } - labels.Labels = append(labels.Labels[:0], ts.Labels...) - labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) - if len(labels.Labels) == 0 { - // The metric has been deleted by the relabeling - continue + if matched != nil { + matched(idx) } - labels.Sort() - - bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) + bb.B = marshalLabelsFast(bb.B[:0], ts.Labels) outputKey := bytesutil.InternBytes(bb.B) for _, sample := range ts.Samples { pushSample(inputKey, outputKey, sample.Value) @@ -532,14 +527,17 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { bbPool.Put(bb) } -func (a *aggregator) push(tss []prompbmarshal.TimeSeries, applyFilters bool) { +func (a *aggregator) push(tss []prompbmarshal.TimeSeries, tracker func(id int)) { labels := promutils.GetLabels() tmpLabels := promutils.GetLabels() bb := bbPool.Get() - for _, ts := range tss { - if applyFilters && !a.match.Match(ts.Labels) { + for idx, ts := range tss { + if !a.match.Match(ts.Labels) { continue } + if tracker != nil { + tracker(idx) + } labels.Labels = append(labels.Labels[:0], ts.Labels...) if applyFilters { labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) @@ -804,3 +802,30 @@ func sortAndRemoveDuplicates(a []string) []string { } return dst } + +// TssUsageTracker tracks used series for streaming aggregation. +type TssUsageTracker struct { + usedSeries map[int]struct{} +} + +// NewTssUsageTracker returns new TssUsageTracker. +func NewTssUsageTracker(totalSeries int) *TssUsageTracker { + return &TssUsageTracker{usedSeries: make(map[int]struct{}, totalSeries)} +} + +// Matched marks series with id as used. +// Not safe for concurrent use. The caller must +// ensure that there are no concurrent calls to Matched. +func (tut *TssUsageTracker) Matched(id int) { + tut.usedSeries[id] = struct{}{} +} + +// GetUnmatched returns unused series from tss. +func (tut *TssUsageTracker) GetUnmatched(tss, dst []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { + for k := range tss { + if _, ok := tut.usedSeries[k]; !ok { + dst = append(dst, tss[k]) + } + } + return dst +} diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 57c74aa8e..311442791 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -183,7 +183,7 @@ func TestAggregatorsSuccess(t *testing.T) { // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) - a.Push(tssInput) + a.Push(tssInput, nil) a.MustStop() // Verify the tssOutput contains the expected metrics @@ -703,7 +703,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) - a.Push(tssInput) + a.Push(tssInput, nil) if a != nil { for _, aggr := range a.as { aggr.dedupFlush() diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index f45dd0b40..f3c81c3cd 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -37,8 +37,13 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { without: [job] outputs: [%q] `, output) + i := 0 pushFunc := func(tss []prompbmarshal.TimeSeries) { - panic(fmt.Errorf("unexpected pushFunc call")) + i++ + if i > 1 { + // pushFunc is expected to be called exactly once at MustStop + panic(fmt.Errorf("unexpected pushFunc call")) + } } a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { @@ -50,16 +55,78 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { b.SetBytes(int64(len(benchSeries))) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - a.Push(benchSeries) + a.Push(benchSeries, nil) } }) } -func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSeries { +func BenchmarkAggregatorsPushWithSeriesTracker(b *testing.B) { + config := fmt.Sprintf(` +- match: http_requests_total + interval: 24h + without: [job] + outputs: [%q] +`, "total") + i := 0 + pushFunc := func(tss []prompbmarshal.TimeSeries) { + i++ + if i > 1 { + // pushFunc is expected to be called exactly once at MustStop + panic(fmt.Errorf("unexpected pushFunc call")) + } + } + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) + if err != nil { + b.Fatalf("unexpected error when initializing aggregators: %s", err) + } + defer a.MustStop() + + tests := []struct { + name string + series []prompbmarshal.TimeSeries + }{ + { + name: "all matches", + series: benchSeries, + }, + { + name: "no matches", + series: benchSeriesWithRandomNames100, + }, + { + name: "50% matches", + series: benchSeriesWithRandomNames50, + }, + { + name: "10% matches", + series: benchSeriesWithRandomNames10, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(len(tt.series))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + ut := NewTssUsageTracker(len(tt.series)) + a.Push(tt.series, ut.Matched) + } + }) + }) + } +} + +func newBenchSeries(seriesCount, samplesPerSeries int, randomNameFraction float64) []prompbmarshal.TimeSeries { a := make([]string, seriesCount*samplesPerSeries) for i := 0; i < samplesPerSeries; i++ { for j := 0; j < seriesCount; j++ { - s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo",instance="bar"} %d`, j, i*10) + metricName := "http_requests_total" + if randomNameFraction > 0 && j%int(1/randomNameFraction) == 0 { + metricName = fmt.Sprintf("random_other_name_%d", j) + } + + s := fmt.Sprintf(`%s{path="/foo/%d",job="foo",instance="bar"} %d`, metricName, j, i*10) a = append(a, s) } } @@ -70,4 +137,7 @@ func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSerie const seriesCount = 10000 const samplesPerSeries = 10 -var benchSeries = newBenchSeries(seriesCount, samplesPerSeries) +var benchSeries = newBenchSeries(seriesCount, samplesPerSeries, 0) +var benchSeriesWithRandomNames10 = newBenchSeries(seriesCount, samplesPerSeries, 0.1) +var benchSeriesWithRandomNames50 = newBenchSeries(seriesCount, samplesPerSeries, 0.5) +var benchSeriesWithRandomNames100 = newBenchSeries(seriesCount, samplesPerSeries, 1)