From 9648c88b713bd7b85115e92d24ca044b889a3581 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 4 Mar 2024 00:45:48 +0200 Subject: [PATCH] lib/streamaggr: add a benchmark for measuring the performance of aggregator.flush --- lib/streamaggr/streamaggr_timing_test.go | 109 ++++++++++++++--------- 1 file changed, 65 insertions(+), 44 deletions(-) diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index b56957ec2..c47280312 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -8,32 +8,76 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) -func BenchmarkAggregatorsPushByJobAvg(b *testing.B) { - for _, output := range []string{ - "total", - "total_prometheus", - "increase", - "increase_prometheus", - "count_series", - "count_samples", - "unique_samples", - "sum_samples", - "last", - "min", - "max", - "avg", - "stddev", - "stdvar", - "histogram_bucket", - "quantiles(0, 0.5, 1)", - } { +var benchOutputs = []string{ + "total", + "total_prometheus", + "increase", + "increase_prometheus", + "count_series", + "count_samples", + "unique_samples", + "sum_samples", + "last", + "min", + "max", + "avg", + "stddev", + "stdvar", + "histogram_bucket", + "quantiles(0, 0.5, 1)", +} + +func BenchmarkAggregatorsPush(b *testing.B) { + for _, output := range benchOutputs { b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) { benchmarkAggregatorsPush(b, output) }) } } +func BenchmarkAggregatorsFlushSerial(b *testing.B) { + for _, output := range benchOutputs { + b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) { + benchmarkAggregatorsFlushSerial(b, output) + }) + } +} + +func benchmarkAggregatorsFlushSerial(b *testing.B, output string) { + a := newBenchAggregators(output) + defer a.MustStop() + + var matchIdxs []byte + + b.ReportAllocs() + b.SetBytes(int64(len(benchSeries))) + for i := 0; i < b.N; i++ { + matchIdxs = a.Push(benchSeries, matchIdxs) + for _, aggr := range a.as { + aggr.flush() + } + } +} + func benchmarkAggregatorsPush(b *testing.B, output string) { + a := newBenchAggregators(output) + defer a.MustStop() + + const loops = 100 + + b.ReportAllocs() + b.SetBytes(int64(len(benchSeries) * loops)) + b.RunParallel(func(pb *testing.PB) { + var matchIdxs []byte + for pb.Next() { + for i := 0; i < loops; i++ { + matchIdxs = a.Push(benchSeries, matchIdxs) + } + } + }) +} + +func newBenchAggregators(output string) *Aggregators { config := fmt.Sprintf(` - match: http_requests_total interval: 24h @@ -43,32 +87,9 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(tss []prompbmarshal.TimeSeries) {} a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { - b.Fatalf("unexpected error when initializing aggregators: %s", err) + panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) } - defer a.MustStop() - - const loops = 10 - - b.ReportAllocs() - b.SetBytes(int64(len(benchSeries) * loops)) - b.RunParallel(func(pb *testing.PB) { - var matchIdxs []byte - for pb.Next() { - for i := 0; i < loops; i++ { - series := benchSeries - for len(series) > 0 { - chunk := series - if len(chunk) > 1_000 { - chunk = series[:1_000] - series = series[len(chunk):] - } else { - series = nil - } - matchIdxs = a.Push(chunk, matchIdxs) - } - } - } - }) + return a } func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries {