mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/streamaggr: properly generate pushSample.key in benchmarks
This commit is contained in:
parent
b232968bb4
commit
e70177c5fb
2 changed files with 16 additions and 12 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
|
@ -18,7 +19,7 @@ func BenchmarkDedupAggr(b *testing.B) {
|
|||
}
|
||||
|
||||
func BenchmarkDedupAggrFlushSerial(b *testing.B) {
|
||||
as := newLastAggrState()
|
||||
as := newTotalAggrState(time.Hour, true, true)
|
||||
benchSamples := newBenchSamples(100_000)
|
||||
da := newDedupAggr()
|
||||
|
||||
|
@ -70,12 +71,16 @@ func newBenchSamples(count int) []pushSample {
|
|||
Value: "process_cpu_seconds_total",
|
||||
},
|
||||
}
|
||||
labelsLen := len(labels)
|
||||
samples := make([]pushSample, count)
|
||||
var keyBuf []byte
|
||||
for i := range samples {
|
||||
sample := &samples[i]
|
||||
labels[0].Value = fmt.Sprintf("host-%d", i)
|
||||
keyBuf = lc.Compress(keyBuf[:0], labels)
|
||||
labels = append(labels[:labelsLen], prompbmarshal.Label{
|
||||
Name: "app",
|
||||
Value: fmt.Sprintf("app-%d", i%10),
|
||||
})
|
||||
keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:])
|
||||
sample.key = string(keyBuf)
|
||||
sample.value = float64(i)
|
||||
}
|
||||
|
|
|
@ -759,7 +759,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
}
|
||||
|
||||
buf = a.compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels)
|
||||
buf = compressLabels(buf[:0], &a.lc, inputLabels.Labels, outputLabels.Labels)
|
||||
key := bytesutil.InternBytes(buf)
|
||||
for _, sample := range ts.Samples {
|
||||
if math.IsNaN(sample.Value) {
|
||||
|
@ -782,19 +782,18 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
func (a *aggregator) compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte {
|
||||
func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, outputLabels []prompbmarshal.Label) []byte {
|
||||
bb := bbPool.Get()
|
||||
bb.B = a.lc.Compress(bb.B, inputLabels)
|
||||
bb.B = lc.Compress(bb.B, inputLabels)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
|
||||
dst = append(dst, bb.B...)
|
||||
bbPool.Put(bb)
|
||||
dst = a.lc.Compress(dst, outputLabels)
|
||||
dst = lc.Compress(dst, outputLabels)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (a *aggregator) decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label {
|
||||
dst = a.lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
|
||||
return dst
|
||||
func decompressLabels(dst []prompbmarshal.Label, lc *promutils.LabelsCompressor, key string) []prompbmarshal.Label {
|
||||
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
|
||||
}
|
||||
|
||||
func getOutputKey(key string) string {
|
||||
|
@ -966,7 +965,7 @@ func (ctx *flushCtx) flushSeries() {
|
|||
func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = ctx.a.decompressLabels(ctx.labels, key)
|
||||
ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key)
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
|
@ -989,7 +988,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo
|
|||
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = ctx.a.decompressLabels(ctx.labels, key)
|
||||
ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key)
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue