lib/streamaggr: add a benchmark for measuring the performance of aggregator.flush

This commit is contained in:
Aliaksandr Valialkin 2024-03-04 00:45:48 +02:00
parent 54a1c506e3
commit 9648c88b71
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -8,32 +8,76 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
func BenchmarkAggregatorsPushByJobAvg(b *testing.B) { var benchOutputs = []string{
for _, output := range []string{ "total",
"total", "total_prometheus",
"total_prometheus", "increase",
"increase", "increase_prometheus",
"increase_prometheus", "count_series",
"count_series", "count_samples",
"count_samples", "unique_samples",
"unique_samples", "sum_samples",
"sum_samples", "last",
"last", "min",
"min", "max",
"max", "avg",
"avg", "stddev",
"stddev", "stdvar",
"stdvar", "histogram_bucket",
"histogram_bucket", "quantiles(0, 0.5, 1)",
"quantiles(0, 0.5, 1)", }
} {
func BenchmarkAggregatorsPush(b *testing.B) {
for _, output := range benchOutputs {
b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) { b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) {
benchmarkAggregatorsPush(b, output) benchmarkAggregatorsPush(b, output)
}) })
} }
} }
func BenchmarkAggregatorsFlushSerial(b *testing.B) {
for _, output := range benchOutputs {
b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) {
benchmarkAggregatorsFlushSerial(b, output)
})
}
}
func benchmarkAggregatorsFlushSerial(b *testing.B, output string) {
a := newBenchAggregators(output)
defer a.MustStop()
var matchIdxs []byte
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries)))
for i := 0; i < b.N; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
for _, aggr := range a.as {
aggr.flush()
}
}
}
func benchmarkAggregatorsPush(b *testing.B, output string) { func benchmarkAggregatorsPush(b *testing.B, output string) {
a := newBenchAggregators(output)
defer a.MustStop()
const loops = 100
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []byte
for pb.Next() {
for i := 0; i < loops; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
}
}
})
}
func newBenchAggregators(output string) *Aggregators {
config := fmt.Sprintf(` config := fmt.Sprintf(`
- match: http_requests_total - match: http_requests_total
interval: 24h interval: 24h
@ -43,32 +87,9 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(tss []prompbmarshal.TimeSeries) {} pushFunc := func(tss []prompbmarshal.TimeSeries) {}
a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil { if err != nil {
b.Fatalf("unexpected error when initializing aggregators: %s", err) panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
} }
defer a.MustStop() return a
const loops = 10
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []byte
for pb.Next() {
for i := 0; i < loops; i++ {
series := benchSeries
for len(series) > 0 {
chunk := series
if len(chunk) > 1_000 {
chunk = series[:1_000]
series = series[len(chunk):]
} else {
series = nil
}
matchIdxs = a.Push(chunk, matchIdxs)
}
}
}
})
} }
func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries { func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries {