VictoriaMetrics/lib/streamaggr/dedup_timing_test.go
2024-03-04 01:22:40 +02:00

85 lines
1.7 KiB
Go

package streamaggr
import (
"fmt"
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
func BenchmarkDedupAggr(b *testing.B) {
for _, samplesPerPush := range []int{1, 10, 100, 1_000, 10_000, 100_000} {
b.Run(fmt.Sprintf("samplesPerPush_%d", samplesPerPush), func(b *testing.B) {
benchmarkDedupAggr(b, samplesPerPush)
})
}
}
func BenchmarkDedupAggrFlushSerial(b *testing.B) {
as := newLastAggrState()
benchSamples := newBenchSamples(100_000)
da := newDedupAggr()
b.ReportAllocs()
b.SetBytes(int64(len(benchSamples)))
for i := 0; i < b.N; i++ {
da.pushSamples(benchSamples)
da.flush(as.pushSamples)
}
}
func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
const loops = 100
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr()
b.ReportAllocs()
b.SetBytes(int64(samplesPerPush * loops))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for i := 0; i < loops; i++ {
da.pushSamples(benchSamples)
}
}
})
}
func newBenchSamples(count int) []pushSample {
var lc promutils.LabelsCompressor
labels := []prompbmarshal.Label{
{
Name: "instance",
Value: "host-123",
},
{
Name: "job",
Value: "foo-bar-baz",
},
{
Name: "pod",
Value: "pod-1-dsfdsf-dsfdsf",
},
{
Name: "namespace",
Value: "ns-asdfdsfpfd-fddf",
},
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
}
samples := make([]pushSample, count)
var keyBuf []byte
for i := range samples {
sample := &samples[i]
labels[0].Value = fmt.Sprintf("host-%d", i)
keyBuf = lc.Compress(keyBuf[:0], labels)
sample.key = string(keyBuf)
sample.value = float64(i)
}
return samples
}
var Sink atomic.Uint64