app/vminsert/common: push many time series at once into stream aggregation

This should reduce overhead on streamaggr.Aggregators.Push call
This commit is contained in:
Aliaksandr Valialkin 2024-03-01 21:29:09 +02:00
parent cf2e80a869
commit 18db573b10
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -12,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
@ -127,58 +126,88 @@ func MustStopStreamAggr() {
}
type streamAggrCtx struct {
mn storage.MetricName
tss [1]prompbmarshal.TimeSeries
mn storage.MetricName
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
buf []byte
}
func (ctx *streamAggrCtx) Reset() {
ctx.mn.Reset()
ts := &ctx.tss[0]
promrelabel.CleanLabels(ts.Labels)
clear(ctx.tss)
ctx.tss = ctx.tss[:0]
clear(ctx.labels)
ctx.labels = ctx.labels[:0]
ctx.samples = ctx.samples[:0]
ctx.buf = ctx.buf[:0]
}
func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(mrs))
for i := 0; i < len(matchIdxs); i++ {
matchIdxs[i] = 0
}
mn := &ctx.mn
tss := ctx.tss[:]
ts := &tss[0]
labels := ts.Labels
samples := ts.Samples
sas := sasGlobal.Load()
var matchIdxsLocal []byte
for idx, mr := range mrs {
tss := ctx.tss
labels := ctx.labels
samples := ctx.samples
buf := ctx.buf
tssLen := len(tss)
for _, mr := range mrs {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err)
}
labels = append(labels[:0], prompbmarshal.Label{
labelsLen := len(labels)
bufLen := len(buf)
buf = append(buf, mn.MetricGroup...)
metricGroup := bytesutil.ToUnsafeString(buf[bufLen:])
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: bytesutil.ToUnsafeString(mn.MetricGroup),
Value: metricGroup,
})
for _, tag := range mn.Tags {
bufLen = len(buf)
buf = append(buf, tag.Key...)
name := bytesutil.ToUnsafeString(buf[bufLen:])
bufLen = len(buf)
buf = append(buf, tag.Value...)
value := bytesutil.ToUnsafeString(buf[bufLen:])
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(tag.Key),
Value: bytesutil.ToUnsafeString(tag.Value),
Name: name,
Value: value,
})
}
samples = append(samples[:0], prompbmarshal.Sample{
samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{
Timestamp: mr.Timestamp,
Value: mr.Value,
})
ts.Labels = labels
ts.Samples = samples
matchIdxsLocal = sas.Push(tss, matchIdxsLocal)
if matchIdxsLocal[0] != 0 {
matchIdxs[idx] = 1
}
tss = append(tss, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.tss = tss
ctx.labels = labels
ctx.samples = samples
ctx.buf = buf
tss = tss[tssLen:]
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
for i := 0; i < len(matchIdxs); i++ {
matchIdxs[i] = 0
}
sas := sasGlobal.Load()
matchIdxs = sas.Push(tss, matchIdxs)
ctx.Reset()
return matchIdxs
}