diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 522c2fcb5..6cb95fbd0 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // avgAggrState calculates output=avg, e.g. the average value over input samples. @@ -62,7 +61,6 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { } func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -80,7 +78,7 @@ func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "avg", currentTimeMsec, avg) + ctx.appendSeries(key, "avg", avg) return true }) } diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index 6a05955cc..e22a0bdcb 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // countSamplesAggrState calculates output=count_samples, e.g. the count of input samples. @@ -59,7 +58,6 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { } func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -77,7 +75,7 @@ func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n)) + ctx.appendSeries(key, "count_samples", float64(n)) return true }) } diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index f1037c801..cd2f56187 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/cespare/xxhash/v2" ) @@ -68,7 +67,6 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { } func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -86,7 +84,7 @@ func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n)) + ctx.appendSeries(key, "count_series", float64(n)) return true }) } diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 92982dcec..0078e4a1f 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -88,7 +88,6 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() - currentTimeMsec := int64(currentTime) * 1000 as.removeOldEntries(currentTime) @@ -99,7 +98,7 @@ func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { if !sv.deleted { key := k.(string) sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) { - ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange) + ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange) }) } sv.mu.Unlock() diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index eaa803e83..48609d83b 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // lastAggrState calculates output=last, e.g. the last value over input samples. @@ -64,7 +63,6 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { } func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -82,7 +80,7 @@ func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "last", currentTimeMsec, last) + ctx.appendSeries(key, "last", last) return true }) } diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index bbbb3c83d..6cc69a8be 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // maxAggrState calculates output=max, e.g. the maximum value over input samples. @@ -61,7 +60,6 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { } func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -79,7 +77,7 @@ func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "max", currentTimeMsec, max) + ctx.appendSeries(key, "max", max) return true }) } diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 8970d41a7..ec6cc569a 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // minAggrState calculates output=min, e.g. the minimum value over input samples. @@ -61,7 +60,6 @@ func (as *minAggrState) pushSamples(samples []pushSample) { } func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -78,7 +76,7 @@ func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) { } sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "min", currentTimeMsec, min) + ctx.appendSeries(key, "min", min) return true }) } diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index ee187d36b..d9a3ae90a 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/valyala/histogram" ) @@ -65,7 +64,6 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { } func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m phis := as.phis var quantiles []float64 @@ -90,7 +88,7 @@ func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) { for i, quantile := range quantiles { b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64) phiStr := bytesutil.InternBytes(b) - ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr) + ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) } return true }) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 55bd393db..c2297d680 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -107,7 +107,6 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() - currentTimeMsec := int64(currentTime) * 1000 suffix := as.getSuffix() @@ -147,7 +146,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { } key := k.(string) - ctx.appendSeries(key, suffix, currentTimeMsec, result) + ctx.appendSeries(key, suffix, result) return true }) } diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 26ea2db8c..325b96286 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // stddevAggrState calculates output=stddev, e.g. the average value over input samples. @@ -62,7 +61,6 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { } func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -80,7 +78,7 @@ func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "stddev", currentTimeMsec, stddev) + ctx.appendSeries(key, "stddev", stddev) return true }) } diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index 35a15097e..cb648e562 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // stdvarAggrState calculates output=stdvar, e.g. the average value over input samples. @@ -61,7 +60,6 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { } func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -79,7 +77,7 @@ func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar) + ctx.appendSeries(key, "stdvar", stdvar) return true }) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c8571b9d7..560cc2479 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -733,11 +733,14 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } + var flushTimeMsec int64 tickerWait := func(t *time.Ticker) bool { select { case <-a.stopCh: + flushTimeMsec = time.Now().UnixMilli() return false - case <-t.C: + case ct := <-t.C: + flushTimeMsec = ct.UnixMilli() return true } } @@ -748,16 +751,16 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc defer t.Stop() if alignFlushToInterval && skipIncompleteFlush { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- } for tickerWait(t) { if ignoreFirstIntervals > 0 { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- } else { - a.flush(pushFunc) + a.flush(pushFunc, flushTimeMsec) } if alignFlushToInterval { @@ -781,14 +784,14 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if ct.After(flushDeadline) { // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- isSkippedFirstFlush = true } else if ignoreFirstIntervals > 0 { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- } else { - a.flush(pushFunc) + a.flush(pushFunc, flushTimeMsec) } for ct.After(flushDeadline) { flushDeadline = flushDeadline.Add(a.interval) @@ -806,7 +809,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { a.dedupFlush() - a.flush(pushFunc) + a.flush(pushFunc, flushTimeMsec) } } @@ -833,17 +836,17 @@ func (a *aggregator) dedupFlush() { // flush flushes aggregator state to pushFunc. // // If pushFunc is nil, then the aggregator state is just reset. -func (a *aggregator) flush(pushFunc PushFunc) { - a.flushInternal(pushFunc, true) +func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) { + a.flushInternal(pushFunc, flushTimeMsec, true) } -func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) { +func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, resetState bool) { startTime := time.Now() // Update minTimestamp before flushing samples to the storage, // since the flush durtion can be quite long. // This should prevent from dropping samples with old timestamps when the flush takes long time. - a.minTimestamp.Store(startTime.UnixMilli() - 5_000) + a.minTimestamp.Store(flushTimeMsec - 5_000) var wg sync.WaitGroup for i := range a.aggrOutputs { @@ -856,7 +859,7 @@ func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) { wg.Done() }() - ctx := getFlushCtx(a, ao, pushFunc) + ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec) ao.as.flushState(ctx, resetState) ctx.flushSeries() putFlushCtx(ctx) @@ -1073,7 +1076,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, return dstInput, dstOutput } -func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx { +func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestamp int64) *flushCtx { v := flushCtxPool.Get() if v == nil { v = &flushCtx{} @@ -1082,6 +1085,7 @@ func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx { ctx.a = a ctx.ao = ao ctx.pushFunc = pushFunc + ctx.flushTimestamp = flushTimestamp return ctx } @@ -1093,9 +1097,10 @@ func putFlushCtx(ctx *flushCtx) { var flushCtxPool sync.Pool type flushCtx struct { - a *aggregator - ao *aggrOutput - pushFunc PushFunc + a *aggregator + ao *aggrOutput + pushFunc PushFunc + flushTimestamp int64 tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label @@ -1106,6 +1111,7 @@ func (ctx *flushCtx) reset() { ctx.a = nil ctx.ao = nil ctx.pushFunc = nil + ctx.flushTimestamp = 0 ctx.resetSeries() } @@ -1161,7 +1167,7 @@ func (ctx *flushCtx) flushSeries() { promutils.PutLabels(auxLabels) } -func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { +func (ctx *flushCtx) appendSeries(key, suffix string, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) ctx.labels = decompressLabels(ctx.labels, key) @@ -1169,7 +1175,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } ctx.samples = append(ctx.samples, prompbmarshal.Sample{ - Timestamp: timestamp, + Timestamp: ctx.flushTimestamp, Value: value, }) ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{ @@ -1183,7 +1189,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo } } -func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { +func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) ctx.labels = decompressLabels(ctx.labels, key) @@ -1195,7 +1201,7 @@ func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp in Value: extraValue, }) ctx.samples = append(ctx.samples, prompbmarshal.Sample{ - Timestamp: timestamp, + Timestamp: ctx.flushTimestamp, Value: value, }) ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{ diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index bb0d98afa..10e11f96c 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -45,12 +45,14 @@ func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) { defer a.MustStop() _ = a.Push(benchSeries, nil) + flushTimeMsec := time.Now().UnixMilli() + b.ResetTimer() b.ReportAllocs() b.SetBytes(int64(len(benchSeries) * len(benchOutputs))) for i := 0; i < b.N; i++ { for _, aggr := range a.as { - aggr.flushInternal(pushFunc, false) + aggr.flushInternal(pushFunc, flushTimeMsec, false) } } } diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 685e782c5..c1b5a0e71 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples. @@ -59,7 +58,6 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { } func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -77,7 +75,7 @@ func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum) + ctx.appendSeries(key, "sum_samples", sum) return true }) } diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index fda700b4c..85f047946 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -117,7 +117,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() - currentTimeMsec := int64(currentTime) * 1000 suffix := as.getSuffix() @@ -142,7 +141,7 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { if !deleted { key := k.(string) - ctx.appendSeries(key, suffix, currentTimeMsec, total) + ctx.appendSeries(key, suffix, total) } return true }) diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 9cb63c636..ba3cf6f22 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values. @@ -63,7 +62,6 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { } func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -81,7 +79,7 @@ func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n)) + ctx.appendSeries(key, "unique_samples", float64(n)) return true }) }