lib/streamaggr: consistently use the same timestamp across all the output aggregated samples in a single aggregation interval

Prevsiously every aggregation output was using its own timestamp for the output aggregated samples
in a single aggregation interval. This could result in unexpected inconsitent timesetamps for the output
aggregated samples.

This commit consistently uses the same timestamp across all the output aggregated samples.
This commit makes sure that the duration between subsequent timestamps strictly equals
the configured aggregation interval.

Thanks to @AndrewChubatiuk for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6314
This commit should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580
This commit is contained in:
Aliaksandr Valialkin 2024-08-07 10:32:38 +02:00
parent 8f5c26d788
commit 86c7afd126
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
16 changed files with 44 additions and 61 deletions

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// avgAggrState calculates output=avg, e.g. the average value over input samples.
@ -62,7 +61,6 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
}
func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -80,7 +78,7 @@ func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "avg", currentTimeMsec, avg)
ctx.appendSeries(key, "avg", avg)
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples.
@ -59,7 +58,6 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
}
func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -77,7 +75,7 @@ func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n))
ctx.appendSeries(key, "count_samples", float64(n))
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/cespare/xxhash/v2"
)
@ -68,7 +67,6 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
}
func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -86,7 +84,7 @@ func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n))
ctx.appendSeries(key, "count_series", float64(n))
return true
})
}

View file

@ -88,7 +88,6 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
@ -99,7 +98,7 @@ func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) {
if !sv.deleted {
key := k.(string)
sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange)
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
})
}
sv.mu.Unlock()

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// lastAggrState calculates output=last, e.g. the last value over input samples.
@ -64,7 +63,6 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
}
func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -82,7 +80,7 @@ func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "last", currentTimeMsec, last)
ctx.appendSeries(key, "last", last)
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
@ -61,7 +60,6 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
}
func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -79,7 +77,7 @@ func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "max", currentTimeMsec, max)
ctx.appendSeries(key, "max", max)
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// minAggrState calculates output=min, e.g. the minimum value over input samples.
@ -61,7 +60,6 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
}
func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -78,7 +76,7 @@ func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", currentTimeMsec, min)
ctx.appendSeries(key, "min", min)
return true
})
}

View file

@ -5,7 +5,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/valyala/histogram"
)
@ -65,7 +64,6 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
}
func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
phis := as.phis
var quantiles []float64
@ -90,7 +88,7 @@ func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr)
ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr)
}
return true
})

View file

@ -107,7 +107,6 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
suffix := as.getSuffix()
@ -147,7 +146,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
}
key := k.(string)
ctx.appendSeries(key, suffix, currentTimeMsec, result)
ctx.appendSeries(key, suffix, result)
return true
})
}

View file

@ -5,7 +5,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stddevAggrState calculates output=stddev, e.g. the average value over input samples.
@ -62,7 +61,6 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
}
func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -80,7 +78,7 @@ func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stddev", currentTimeMsec, stddev)
ctx.appendSeries(key, "stddev", stddev)
return true
})
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
@ -61,7 +60,6 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
}
func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -79,7 +77,7 @@ func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar)
ctx.appendSeries(key, "stdvar", stdvar)
return true
})
}

View file

@ -733,11 +733,14 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
}
var flushTimeMsec int64
tickerWait := func(t *time.Ticker) bool {
select {
case <-a.stopCh:
flushTimeMsec = time.Now().UnixMilli()
return false
case <-t.C:
case ct := <-t.C:
flushTimeMsec = ct.UnixMilli()
return true
}
}
@ -748,16 +751,16 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
defer t.Stop()
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil)
a.flush(nil, 0)
ignoreFirstIntervals--
}
for tickerWait(t) {
if ignoreFirstIntervals > 0 {
a.flush(nil)
a.flush(nil, 0)
ignoreFirstIntervals--
} else {
a.flush(pushFunc)
a.flush(pushFunc, flushTimeMsec)
}
if alignFlushToInterval {
@ -781,14 +784,14 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil)
a.flush(nil, 0)
ignoreFirstIntervals--
isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil)
a.flush(nil, 0)
ignoreFirstIntervals--
} else {
a.flush(pushFunc)
a.flush(pushFunc, flushTimeMsec)
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(a.interval)
@ -806,7 +809,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
a.dedupFlush()
a.flush(pushFunc)
a.flush(pushFunc, flushTimeMsec)
}
}
@ -833,17 +836,17 @@ func (a *aggregator) dedupFlush() {
// flush flushes aggregator state to pushFunc.
//
// If pushFunc is nil, then the aggregator state is just reset.
func (a *aggregator) flush(pushFunc PushFunc) {
a.flushInternal(pushFunc, true)
func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) {
a.flushInternal(pushFunc, flushTimeMsec, true)
}
func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) {
func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, resetState bool) {
startTime := time.Now()
// Update minTimestamp before flushing samples to the storage,
// since the flush durtion can be quite long.
// This should prevent from dropping samples with old timestamps when the flush takes long time.
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
a.minTimestamp.Store(flushTimeMsec - 5_000)
var wg sync.WaitGroup
for i := range a.aggrOutputs {
@ -856,7 +859,7 @@ func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) {
wg.Done()
}()
ctx := getFlushCtx(a, ao, pushFunc)
ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec)
ao.as.flushState(ctx, resetState)
ctx.flushSeries()
putFlushCtx(ctx)
@ -1073,7 +1076,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by,
return dstInput, dstOutput
}
func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx {
func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestamp int64) *flushCtx {
v := flushCtxPool.Get()
if v == nil {
v = &flushCtx{}
@ -1082,6 +1085,7 @@ func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx {
ctx.a = a
ctx.ao = ao
ctx.pushFunc = pushFunc
ctx.flushTimestamp = flushTimestamp
return ctx
}
@ -1093,9 +1097,10 @@ func putFlushCtx(ctx *flushCtx) {
var flushCtxPool sync.Pool
type flushCtx struct {
a *aggregator
ao *aggrOutput
pushFunc PushFunc
a *aggregator
ao *aggrOutput
pushFunc PushFunc
flushTimestamp int64
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
@ -1106,6 +1111,7 @@ func (ctx *flushCtx) reset() {
ctx.a = nil
ctx.ao = nil
ctx.pushFunc = nil
ctx.flushTimestamp = 0
ctx.resetSeries()
}
@ -1161,7 +1167,7 @@ func (ctx *flushCtx) flushSeries() {
promutils.PutLabels(auxLabels)
}
func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) {
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key)
@ -1169,7 +1175,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Timestamp: ctx.flushTimestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
@ -1183,7 +1189,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo
}
}
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) {
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key)
@ -1195,7 +1201,7 @@ func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp in
Value: extraValue,
})
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Timestamp: ctx.flushTimestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{

View file

@ -45,12 +45,14 @@ func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) {
defer a.MustStop()
_ = a.Push(benchSeries, nil)
flushTimeMsec := time.Now().UnixMilli()
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * len(benchOutputs)))
for i := 0; i < b.N; i++ {
for _, aggr := range a.as {
aggr.flushInternal(pushFunc, false)
aggr.flushInternal(pushFunc, flushTimeMsec, false)
}
}
}

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
@ -59,7 +58,6 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
}
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -77,7 +75,7 @@ func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum)
ctx.appendSeries(key, "sum_samples", sum)
return true
})
}

View file

@ -117,7 +117,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
suffix := as.getSuffix()
@ -142,7 +141,7 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
if !deleted {
key := k.(string)
ctx.appendSeries(key, suffix, currentTimeMsec, total)
ctx.appendSeries(key, suffix, total)
}
return true
})

View file

@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values.
@ -63,7 +62,6 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
}
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
@ -81,7 +79,7 @@ func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n))
ctx.appendSeries(key, "unique_samples", float64(n))
return true
})
}