lib/streamaggr: remove resetState arg from aggrState.flushState()

The resetState arg was used only for the BenchmarkAggregatorsFlushInternalSerial benchmark.
This benchmark was testing aggregate state flush performance by keeping the same state across flushes.
The benhmark didn't reflect the performance and scalability of stream aggregation in production,
while it led to non-trivial code changes related to resetState arg handling.

So let's drop the benchmark together with all the code related to resetState handling,
in order to simplify the code at lib/streamaggr a bit.

Thanks to @AndrewChubatiuk for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6314
This commit is contained in:
Aliaksandr Valialkin 2024-08-07 11:34:18 +02:00
parent 86c7afd126
commit 04981c7a7f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
16 changed files with 68 additions and 144 deletions

View file

@ -60,21 +60,17 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
}
}
func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *avgAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*avgStateValue)
sv.mu.Lock()
avg := sv.sum / float64(sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -57,21 +57,17 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *countSamplesAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -66,21 +66,17 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *countSeriesAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := len(sv.m)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -86,7 +86,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
})
}
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) {
func (as *histogramBucketAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
as.removeOldEntries(currentTime)

View file

@ -62,21 +62,17 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
}
}
func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *lastAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -59,21 +59,17 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
}
}
func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *maxAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -59,21 +59,17 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
}
}
func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *minAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", min)

View file

@ -63,25 +63,21 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *quantilesAggrState) flushState(ctx *flushCtx) {
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -105,7 +105,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
}
}
func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *rateAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
suffix := as.getSuffix()
@ -126,12 +126,10 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
sumRate += lv.increase / d
countSeries++
}
if resetState {
lv.prevTimestamp = lv.timestamp
lv.increase = 0
lvs[k1] = lv
}
}
deleted := sv.deleted
sv.mu.Unlock()

View file

@ -60,21 +60,17 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *stddevAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -59,21 +59,17 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *stdvarAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -422,13 +422,7 @@ type aggrState interface {
pushSamples(samples []pushSample)
// flushState must flush aggrState data to ctx.
//
// if resetState is true, then aggrState must be reset after flushing the data to ctx,
// otherwise the aggrState data must be kept unchanged.
//
// The resetState is set to false only in the benchmark, which measures flushState() performance
// over the same aggrState.
flushState(ctx *flushCtx, resetState bool)
flushState(ctx *flushCtx)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
@ -837,10 +831,6 @@ func (a *aggregator) dedupFlush() {
//
// If pushFunc is nil, then the aggregator state is just reset.
func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) {
a.flushInternal(pushFunc, flushTimeMsec, true)
}
func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, resetState bool) {
startTime := time.Now()
// Update minTimestamp before flushing samples to the storage,
@ -860,7 +850,7 @@ func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, reset
}()
ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec)
ao.as.flushState(ctx, resetState)
ao.as.flushState(ctx)
ctx.flushSeries()
putFlushCtx(ctx)
}(ao)

View file

@ -39,24 +39,6 @@ func BenchmarkAggregatorsPush(b *testing.B) {
}
}
func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(benchOutputs, pushFunc)
defer a.MustStop()
_ = a.Push(benchSeries, nil)
flushTimeMsec := time.Now().UnixMilli()
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * len(benchOutputs)))
for i := 0; i < b.N; i++ {
for _, aggr := range a.as {
aggr.flushInternal(pushFunc, flushTimeMsec, false)
}
}
}
func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators([]string{output}, pushFunc)

View file

@ -57,21 +57,17 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *sumSamplesAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)

View file

@ -115,7 +115,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
}
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *totalAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
suffix := as.getSuffix()
@ -128,14 +128,12 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
sv.mu.Lock()
total := sv.total
if resetState {
if as.resetTotalOnFlush {
sv.total = 0
} else if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
}
deleted := sv.deleted
sv.mu.Unlock()

View file

@ -61,21 +61,17 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) {
m := &as.m
m.Range(func(k, v any) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
n := len(sv.m)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)