diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 6cb95fbd0..32ab569ac 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -60,21 +60,17 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { } } -func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *avgAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*avgStateValue) sv.mu.Lock() avg := sv.sum / float64(sv.count) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index e22a0bdcb..ca2bc709e 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -57,21 +57,17 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *countSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*countSamplesStateValue) sv.mu.Lock() n := sv.n - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index cd2f56187..816581464 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -66,21 +66,17 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { } } -func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *countSeriesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*countSeriesStateValue) sv.mu.Lock() n := len(sv.m) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 0078e4a1f..fdf7b3fad 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -86,7 +86,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { }) } -func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { +func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() as.removeOldEntries(currentTime) diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 48609d83b..0ae4b9b8c 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -62,21 +62,17 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { } } -func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *lastAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*lastStateValue) sv.mu.Lock() last := sv.last - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 6cc69a8be..9197d3add 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -59,21 +59,17 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { } } -func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *maxAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*maxStateValue) sv.mu.Lock() max := sv.max - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index ec6cc569a..308f259c7 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -59,21 +59,17 @@ func (as *minAggrState) pushSamples(samples []pushSample) { } } -func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *minAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*minStateValue) sv.mu.Lock() min := sv.min - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) ctx.appendSeries(key, "min", min) diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index d9a3ae90a..00fefe877 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -63,25 +63,21 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { } } -func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *quantilesAggrState) flushState(ctx *flushCtx) { m := &as.m phis := as.phis var quantiles []float64 var b []byte m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*quantilesStateValue) sv.mu.Lock() quantiles = sv.h.Quantiles(quantiles[:0], phis) histogram.PutFast(sv.h) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index c2297d680..1c3f2fb5c 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -105,7 +105,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { } } -func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *rateAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() suffix := as.getSuffix() @@ -126,11 +126,9 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { sumRate += lv.increase / d countSeries++ } - if resetState { - lv.prevTimestamp = lv.timestamp - lv.increase = 0 - lvs[k1] = lv - } + lv.prevTimestamp = lv.timestamp + lv.increase = 0 + lvs[k1] = lv } deleted := sv.deleted sv.mu.Unlock() diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 325b96286..053e0f209 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -60,21 +60,17 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { } } -func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *stddevAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*stddevStateValue) sv.mu.Lock() stddev := math.Sqrt(sv.q / sv.count) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index cb648e562..8170f227a 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -59,21 +59,17 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { } } -func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *stdvarAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*stdvarStateValue) sv.mu.Lock() stdvar := sv.q / sv.count - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 560cc2479..6fc674e84 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -422,13 +422,7 @@ type aggrState interface { pushSamples(samples []pushSample) // flushState must flush aggrState data to ctx. - // - // if resetState is true, then aggrState must be reset after flushing the data to ctx, - // otherwise the aggrState data must be kept unchanged. - // - // The resetState is set to false only in the benchmark, which measures flushState() performance - // over the same aggrState. - flushState(ctx *flushCtx, resetState bool) + flushState(ctx *flushCtx) } // PushFunc is called by Aggregators when it needs to push its state to metrics storage @@ -837,10 +831,6 @@ func (a *aggregator) dedupFlush() { // // If pushFunc is nil, then the aggregator state is just reset. func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) { - a.flushInternal(pushFunc, flushTimeMsec, true) -} - -func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, resetState bool) { startTime := time.Now() // Update minTimestamp before flushing samples to the storage, @@ -860,7 +850,7 @@ func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, reset }() ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec) - ao.as.flushState(ctx, resetState) + ao.as.flushState(ctx) ctx.flushSeries() putFlushCtx(ctx) }(ao) diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 10e11f96c..5901e56ad 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -39,24 +39,6 @@ func BenchmarkAggregatorsPush(b *testing.B) { } } -func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) { - pushFunc := func(_ []prompbmarshal.TimeSeries) {} - a := newBenchAggregators(benchOutputs, pushFunc) - defer a.MustStop() - _ = a.Push(benchSeries, nil) - - flushTimeMsec := time.Now().UnixMilli() - - b.ResetTimer() - b.ReportAllocs() - b.SetBytes(int64(len(benchSeries) * len(benchOutputs))) - for i := 0; i < b.N; i++ { - for _, aggr := range a.as { - aggr.flushInternal(pushFunc, flushTimeMsec, false) - } - } -} - func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators([]string{output}, pushFunc) diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index c1b5a0e71..947239bb3 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -57,21 +57,17 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *sumSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*sumSamplesStateValue) sv.mu.Lock() sum := sv.sum - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 85f047946..e905e4531 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -115,7 +115,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } } -func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *totalAggrState) flushState(ctx *flushCtx) { currentTime := fasttime.UnixTimestamp() suffix := as.getSuffix() @@ -128,13 +128,11 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Lock() total := sv.total - if resetState { - if as.resetTotalOnFlush { - sv.total = 0 - } else if math.Abs(sv.total) >= (1 << 53) { - // It is time to reset the entry, since it starts losing float64 precision - sv.total = 0 - } + if as.resetTotalOnFlush { + sv.total = 0 + } else if math.Abs(sv.total) >= (1 << 53) { + // It is time to reset the entry, since it starts losing float64 precision + sv.total = 0 } deleted := sv.deleted sv.mu.Unlock() diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index ba3cf6f22..93145397c 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -61,21 +61,17 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { m := &as.m m.Range(func(k, v any) bool { - if resetState { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) - } + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) sv := v.(*uniqueSamplesStateValue) sv.mu.Lock() n := len(sv.m) - if resetState { - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true - } + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true sv.mu.Unlock() key := k.(string)