From 3c06b3af92fe8ed6dd3f586120fc0673d8fc845d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 4 Mar 2024 00:53:55 +0200 Subject: [PATCH] lib/streamaggr: add a benchmark for flushing dedup state --- lib/streamaggr/dedup_timing_test.go | 53 ++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 46045043c..9ca33f292 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -4,22 +4,34 @@ import ( "fmt" "sync/atomic" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) func BenchmarkDedupAggr(b *testing.B) { - for _, samplesPerPush := range []int{1, 10, 100, 1_000, 10_000, 100_000, 1_000_000} { + for _, samplesPerPush := range []int{1, 10, 100, 1_000, 10_000, 100_000} { b.Run(fmt.Sprintf("samplesPerPush_%d", samplesPerPush), func(b *testing.B) { benchmarkDedupAggr(b, samplesPerPush) }) } } -func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { - flushSamples := func(samples []pushSample) { - Sink.Add(uint64(len(samples))) - } +func BenchmarkDedupAggrFlushSerial(b *testing.B) { + as := newLastAggrState() + benchSamples := newBenchSamples(100_000) + da := newDedupAggr() - const loops = 2 + b.ReportAllocs() + b.SetBytes(int64(len(benchSamples))) + for i := 0; i < b.N; i++ { + da.pushSamples(benchSamples) + da.flush(as.pushSamples) + } +} + +func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { + const loops = 100 benchSamples := newBenchSamples(samplesPerPush) da := newDedupAggr() @@ -30,16 +42,41 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { for i := 0; i < loops; i++ { da.pushSamples(benchSamples) } - da.flush(flushSamples) } }) } func newBenchSamples(count int) []pushSample { + var lc promutils.LabelsCompressor + labels := []prompbmarshal.Label{ + { + Name: "instance", + Value: "host-123", + }, + { + Name: "job", + Value: "foo-bar-baz", + }, + { + Name: "pod", + Value: "pod-1-dsfdsf-dsfdsf", + }, + { + Name: "namespace", + Value: "ns-asdfdsfpfd-fddf", + }, + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + } samples := make([]pushSample, count) + var keyBuf []byte for i := range samples { sample := &samples[i] - sample.key = fmt.Sprintf("key_%d", i) + labels[0].Value = fmt.Sprintf("host-%d", i) + keyBuf = lc.Compress(keyBuf[:0], labels) + sample.key = string(keyBuf) sample.value = float64(i) } return samples