diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 9ca33f292..46cdcdb26 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -4,6 +4,7 @@ import ( "fmt" "sync/atomic" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" @@ -18,7 +19,7 @@ func BenchmarkDedupAggr(b *testing.B) { } func BenchmarkDedupAggrFlushSerial(b *testing.B) { - as := newLastAggrState() + as := newTotalAggrState(time.Hour, true, true) benchSamples := newBenchSamples(100_000) da := newDedupAggr() @@ -70,12 +71,16 @@ func newBenchSamples(count int) []pushSample { Value: "process_cpu_seconds_total", }, } + labelsLen := len(labels) samples := make([]pushSample, count) var keyBuf []byte for i := range samples { sample := &samples[i] - labels[0].Value = fmt.Sprintf("host-%d", i) - keyBuf = lc.Compress(keyBuf[:0], labels) + labels = append(labels[:labelsLen], prompbmarshal.Label{ + Name: "app", + Value: fmt.Sprintf("app-%d", i%10), + }) + keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:]) sample.key = string(keyBuf) sample.value = float64(i) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 2d8888dc5..88120f871 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -759,7 +759,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) } - buf = a.compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels) + buf = compressLabels(buf[:0], &a.lc, inputLabels.Labels, outputLabels.Labels) key := bytesutil.InternBytes(buf) for _, sample := range ts.Samples { if math.IsNaN(sample.Value) { @@ -782,19 +782,18 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { } } -func (a *aggregator) compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { +func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, outputLabels []prompbmarshal.Label) []byte { bb := bbPool.Get() - bb.B = a.lc.Compress(bb.B, inputLabels) + bb.B = lc.Compress(bb.B, inputLabels) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) dst = append(dst, bb.B...) bbPool.Put(bb) - dst = a.lc.Compress(dst, outputLabels) + dst = lc.Compress(dst, outputLabels) return dst } -func (a *aggregator) decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label { - dst = a.lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) - return dst +func decompressLabels(dst []prompbmarshal.Label, lc *promutils.LabelsCompressor, key string) []prompbmarshal.Label { + return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) } func getOutputKey(key string) string { @@ -966,7 +965,7 @@ func (ctx *flushCtx) flushSeries() { func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = ctx.a.decompressLabels(ctx.labels, key) + ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } @@ -989,7 +988,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels = ctx.a.decompressLabels(ctx.labels, key) + ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) if !ctx.a.keepMetricNames { ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) }