From 04981c7a7f29acf1b41fb2c1c599dfd0b921a1f2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 7 Aug 2024 11:34:18 +0200 Subject: [PATCH] lib/streamaggr: remove resetState arg from aggrState.flushState() The resetState arg was used only for the BenchmarkAggregatorsFlushInternalSerial benchmark. This benchmark was testing aggregate state flush performance by keeping the same state across flushes. The benhmark didn't reflect the performance and scalability of stream aggregation in production, while it led to non-trivial code changes related to resetState arg handling. So let's drop the benchmark together with all the code related to resetState handling, in order to simplify the code at lib/streamaggr a bit. Thanks to @AndrewChubatiuk for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6314 --- lib/streamaggr/avg.go | 14 +++++--------- lib/streamaggr/count_samples.go | 14 +++++--------- lib/streamaggr/count_series.go | 14 +++++--------- lib/streamaggr/histogram_bucket.go | 2 +- lib/streamaggr/last.go | 14 +++++--------- lib/streamaggr/max.go | 14 +++++--------- lib/streamaggr/min.go | 14 +++++--------- lib/streamaggr/quantiles.go | 14 +++++--------- lib/streamaggr/rate.go | 10 ++++------ lib/streamaggr/stddev.go | 14 +++++--------- lib/streamaggr/stdvar.go | 14 +++++--------- lib/streamaggr/streamaggr.go | 14 ++------------ lib/streamaggr/streamaggr_timing_test.go | 18 ------------------ lib/streamaggr/sum_samples.go | 14 +++++--------- lib/streamaggr/total.go | 14 ++++++-------- lib/streamaggr/unique_samples.go | 14 +++++--------- 16 files changed, 68 insertions(+), 144 deletions(-) diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 6cb95fbd0..32ab569ac 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -60,21 +60,17 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { } } -func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *avgAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*avgStateValue) sv.mu.Lock() avg := sv.sum / float64(sv.count) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index e22a0bdcb..ca2bc709e 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -57,21 +57,17 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *countSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*countSamplesStateValue) sv.mu.Lock() n := sv.n - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index cd2f56187..816581464 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -66,21 +66,17 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { } } -func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *countSeriesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*countSeriesStateValue) sv.mu.Lock() n := len(sv.m) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 0078e4a1f..fdf7b3fad 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -86,7 +86,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { }) } -func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { +func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() as.removeOldEntries(currentTime) diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 48609d83b..0ae4b9b8c 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -62,21 +62,17 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { } } -func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *lastAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*lastStateValue) sv.mu.Lock() last := sv.last - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 6cc69a8be..9197d3add 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -59,21 +59,17 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { } } -func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *maxAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*maxStateValue) sv.mu.Lock() max := sv.max - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index ec6cc569a..308f259c7 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -59,21 +59,17 @@ func (as *minAggrState) pushSamples(samples []pushSample) { } } -func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *minAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*minStateValue) sv.mu.Lock() min := sv.min - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) ctx.appendSeries(key, "min", min) diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index d9a3ae90a..00fefe877 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -63,25 +63,21 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { } } -func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *quantilesAggrState) flushState(ctx *flushCtx) { m := &as.m phis := as.phis var quantiles []float64 var b []byte m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*quantilesStateValue) sv.mu.Lock() quantiles = sv.h.Quantiles(quantiles[:0], phis) histogram.PutFast(sv.h) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index c2297d680..1c3f2fb5c 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -105,7 +105,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { } } -func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *rateAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() suffix := as.getSuffix() @@ -126,11 +126,9 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { sumRate += lv.increase / d countSeries++ } - if resetState { - lv.prevTimestamp = lv.timestamp - lv.increase = 0 - lvs[k1] = lv - } + lv.prevTimestamp = lv.timestamp + lv.increase = 0 + lvs[k1] = lv } deleted := sv.deleted sv.mu.Unlock() diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 325b96286..053e0f209 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -60,21 +60,17 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { } } -func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *stddevAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*stddevStateValue) sv.mu.Lock() stddev := math.Sqrt(sv.q / sv.count) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index cb648e562..8170f227a 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -59,21 +59,17 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { } } -func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *stdvarAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*stdvarStateValue) sv.mu.Lock() stdvar := sv.q / sv.count - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 560cc2479..6fc674e84 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -422,13 +422,7 @@ type aggrState interface { pushSamples(samples []pushSample) // flushState must flush aggrState data to ctx. - // - // if resetState is true, then aggrState must be reset after flushing the data to ctx, - // otherwise the aggrState data must be kept unchanged. - // - // The resetState is set to false only in the benchmark, which measures flushState() performance - // over the same aggrState. - flushState(ctx *flushCtx, resetState bool) + flushState(ctx *flushCtx) } // PushFunc is called by Aggregators when it needs to push its state to metrics storage @@ -837,10 +831,6 @@ func (a *aggregator) dedupFlush() { // // If pushFunc is nil, then the aggregator state is just reset. func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) { - a.flushInternal(pushFunc, flushTimeMsec, true) -} - -func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, resetState bool) { startTime := time.Now() // Update minTimestamp before flushing samples to the storage, @@ -860,7 +850,7 @@ func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, reset }() ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec) - ao.as.flushState(ctx, resetState) + ao.as.flushState(ctx) ctx.flushSeries() putFlushCtx(ctx) }(ao) diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 10e11f96c..5901e56ad 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -39,24 +39,6 @@ func BenchmarkAggregatorsPush(b *testing.B) { } } -func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) { - pushFunc := func(_ []prompbmarshal.TimeSeries) {} - a := newBenchAggregators(benchOutputs, pushFunc) - defer a.MustStop() - _ = a.Push(benchSeries, nil) - - flushTimeMsec := time.Now().UnixMilli() - - b.ResetTimer() - b.ReportAllocs() - b.SetBytes(int64(len(benchSeries) * len(benchOutputs))) - for i := 0; i < b.N; i++ { - for _, aggr := range a.as { - aggr.flushInternal(pushFunc, flushTimeMsec, false) - } - } -} - func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators([]string{output}, pushFunc) diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index c1b5a0e71..947239bb3 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -57,21 +57,17 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *sumSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*sumSamplesStateValue) sv.mu.Lock() sum := sv.sum - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 85f047946..e905e4531 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -115,7 +115,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } } -func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *totalAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() suffix := as.getSuffix() @@ -128,13 +128,11 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Lock() total := sv.total - if resetState { - if as.resetTotalOnFlush { - sv.total = 0 - } else if math.Abs(sv.total) >= (1 << 53) { - // It is time to reset the entry, since it starts losing float64 precision - sv.total = 0 - } + if as.resetTotalOnFlush { + sv.total = 0 + } else if math.Abs(sv.total) >= (1 << 53) { + // It is time to reset the entry, since it starts losing float64 precision + sv.total = 0 } deleted := sv.deleted sv.mu.Unlock() diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index ba3cf6f22..93145397c 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -61,21 +61,17 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*uniqueSamplesStateValue) sv.mu.Lock() n := len(sv.m) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string)