From 6319d029a81cd3246b00fe07af485d8c47fd99f3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 4 Mar 2024 19:12:06 +0200 Subject: [PATCH] lib/streamaggr: benchmark only flush routines in BenchmarkDedupAggrFlushSerial and BenchmarkAggregatorsFlushSerial --- lib/streamaggr/avg.go | 15 ++++++--- lib/streamaggr/count_samples.go | 15 ++++++--- lib/streamaggr/count_series.go | 15 ++++++--- lib/streamaggr/dedup.go | 8 ++--- lib/streamaggr/dedup_test.go | 13 +------- lib/streamaggr/dedup_timing_test.go | 14 ++++---- lib/streamaggr/histogram_bucket.go | 3 +- lib/streamaggr/last.go | 15 ++++++--- lib/streamaggr/max.go | 15 ++++++--- lib/streamaggr/min.go | 14 +++++--- lib/streamaggr/quantiles.go | 14 +++++--- lib/streamaggr/stddev.go | 15 ++++++--- lib/streamaggr/stdvar.go | 15 ++++++--- lib/streamaggr/streamaggr.go | 20 ++++++------ lib/streamaggr/streamaggr_timing_test.go | 41 +++++++++++++----------- lib/streamaggr/sum_samples.go | 15 ++++++--- lib/streamaggr/total.go | 14 ++++---- lib/streamaggr/unique_samples.go | 15 ++++++--- 18 files changed, 163 insertions(+), 113 deletions(-) diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 079e3b5c8..131c9a8a0 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -59,19 +59,24 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { } } -func (as *avgAggrState) flushState(ctx *flushCtx) { +func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*avgStateValue) sv.mu.Lock() avg := sv.sum / float64(sv.count) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "avg", currentTimeMsec, avg) return true diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index ac0431deb..30099d4bd 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -56,19 +56,24 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *countSamplesAggrState) flushState(ctx *flushCtx) { +func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*countSamplesStateValue) sv.mu.Lock() n := sv.n - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n)) return true diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index 5bfbd4c34..a80c69dbb 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -66,19 +66,24 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { } } -func (as *countSeriesAggrState) flushState(ctx *flushCtx) { +func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*countSeriesStateValue) sv.mu.Lock() n := len(sv.m) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n)) return true diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 35e520ced..e7fd336eb 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -112,7 +112,7 @@ func (ctx *dedupFlushCtx) reset() { ctx.samples = ctx.samples[:0] } -func (da *dedupAggr) flush(f func(samples []pushSample)) { +func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) { var wg sync.WaitGroup for i := range da.shards { flushConcurrencyCh <- struct{}{} @@ -124,7 +124,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) { }() ctx := getDedupFlushCtx() - shard.flush(ctx, f) + shard.flush(ctx, f, resetState) putDedupFlushCtx(ctx) }(&da.shards[i]) } @@ -178,11 +178,11 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { } } -func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { +func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample), resetState bool) { das.mu.Lock() m := das.m - if len(m) != 0 { + if resetState && len(m) > 0 { das.m = make(map[string]dedupAggrSample, len(m)) } diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 663bf6563..5e843c717 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -4,7 +4,6 @@ import ( "fmt" "reflect" "sync" - "sync/atomic" "testing" ) @@ -40,7 +39,7 @@ func TestDedupAggrSerial(t *testing.T) { } mu.Unlock() } - da.flush(flushSamples) + da.flush(flushSamples, true) if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) { t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap) @@ -59,11 +58,6 @@ func TestDedupAggrConcurrent(t *testing.T) { const seriesCount = 10_000 da := newDedupAggr() - var samplesFlushed atomic.Int64 - flushSamples := func(samples []pushSample) { - samplesFlushed.Add(int64(len(samples))) - } - var wg sync.WaitGroup for i := 0; i < concurrency; i++ { wg.Add(1) @@ -78,12 +72,7 @@ func TestDedupAggrConcurrent(t *testing.T) { } da.pushSamples(samples) } - da.flush(flushSamples) }() } wg.Wait() - - if n := samplesFlushed.Load(); n < seriesCount { - t.Fatalf("too small number of series flushed; got %d; want at least %d", n, seriesCount) - } } diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 46cdcdb26..2b906fe39 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -22,12 +22,13 @@ func BenchmarkDedupAggrFlushSerial(b *testing.B) { as := newTotalAggrState(time.Hour, true, true) benchSamples := newBenchSamples(100_000) da := newDedupAggr() + da.pushSamples(benchSamples) + b.ResetTimer() b.ReportAllocs() b.SetBytes(int64(len(benchSamples))) for i := 0; i < b.N; i++ { - da.pushSamples(benchSamples) - da.flush(as.pushSamples) + da.flush(as.pushSamples, false) } } @@ -36,6 +37,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { benchSamples := newBenchSamples(samplesPerPush) da := newDedupAggr() + b.ResetTimer() b.ReportAllocs() b.SetBytes(int64(samplesPerPush * loops)) b.RunParallel(func(pb *testing.PB) { @@ -51,8 +53,8 @@ func newBenchSamples(count int) []pushSample { var lc promutils.LabelsCompressor labels := []prompbmarshal.Label{ { - Name: "instance", - Value: "host-123", + Name: "app", + Value: "app-123", }, { Name: "job", @@ -77,8 +79,8 @@ func newBenchSamples(count int) []pushSample { for i := range samples { sample := &samples[i] labels = append(labels[:labelsLen], prompbmarshal.Label{ - Name: "app", - Value: fmt.Sprintf("app-%d", i%10), + Name: "app", + Value: fmt.Sprintf("instance-%d", i), }) keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:]) sample.key = string(keyBuf) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 0d073abf3..aba933ae4 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -84,7 +84,8 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { }) } -func (as *histogramBucketAggrState) flushState(ctx *flushCtx) { +func (as *histogramBucketAggrState) flushState(ctx *flushCtx, resetState bool) { + _ = resetState // it isn't used here currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 6ba46b349..0fd39580c 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -56,19 +56,24 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { } } -func (as *lastAggrState) flushState(ctx *flushCtx) { +func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*lastStateValue) sv.mu.Lock() last := sv.last - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "last", currentTimeMsec, last) return true diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 76320a090..f92036c66 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -58,19 +58,24 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { } } -func (as *maxAggrState) flushState(ctx *flushCtx) { +func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*maxStateValue) sv.mu.Lock() max := sv.max - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "max", currentTimeMsec, max) return true diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index cde503312..17137e6cd 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -58,18 +58,22 @@ func (as *minAggrState) pushSamples(samples []pushSample) { } } -func (as *minAggrState) flushState(ctx *flushCtx) { +func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*minStateValue) sv.mu.Lock() min := sv.min - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() key := k.(string) ctx.appendSeries(key, "min", currentTimeMsec, min) diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index f9ec321d1..b97697ebb 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -63,22 +63,26 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { } } -func (as *quantilesAggrState) flushState(ctx *flushCtx) { +func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m phis := as.phis var quantiles []float64 var b []byte m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*quantilesStateValue) sv.mu.Lock() quantiles = sv.h.Quantiles(quantiles[:0], phis) histogram.PutFast(sv.h) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() key := k.(string) diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 2a5b3e3d4..0eb6df6fe 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -59,19 +59,24 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { } } -func (as *stddevAggrState) flushState(ctx *flushCtx) { +func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*stddevStateValue) sv.mu.Lock() stddev := math.Sqrt(sv.q / sv.count) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "stddev", currentTimeMsec, stddev) return true diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index 450b2feef..c5d46af2a 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -58,19 +58,24 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { } } -func (as *stdvarAggrState) flushState(ctx *flushCtx) { +func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*stdvarStateValue) sv.mu.Lock() stdvar := sv.q / sv.count - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar) return true diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 99fb9a14f..76fd4aa28 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -361,7 +361,7 @@ type aggregator struct { type aggrState interface { pushSamples(samples []pushSample) - flushState(ctx *flushCtx) + flushState(ctx *flushCtx, resetState bool) } // PushFunc is called by Aggregators when it needs to push its state to metrics storage @@ -606,18 +606,18 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc defer t.Stop() if alignFlushToInterval && skipIncompleteFlush { - a.flush(nil, interval) + a.flush(nil, interval, true) } for tickerWait(t) { - a.flush(pushFunc, interval) + a.flush(pushFunc, interval, true) if alignFlushToInterval { select { case <-t.C: if skipIncompleteFlush && tickerWait(t) { logger.Warnf("drop incomplete aggregation state because the previous flush took longer than interval=%s", interval) - a.flush(nil, interval) + a.flush(nil, interval, true) } default: } @@ -637,10 +637,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if ct.After(flushDeadline) { // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - a.flush(nil, interval) + a.flush(nil, interval, true) isSkippedFirstFlush = true } else { - a.flush(pushFunc, interval) + a.flush(pushFunc, interval, true) } for ct.After(flushDeadline) { flushDeadline = flushDeadline.Add(interval) @@ -658,7 +658,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if !skipIncompleteFlush { a.dedupFlush(dedupInterval) - a.flush(pushFunc, interval) + a.flush(pushFunc, interval, true) } } @@ -670,7 +670,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { startTime := time.Now() - a.da.flush(a.pushSamples) + a.da.flush(a.pushSamples, true) d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds()) @@ -682,7 +682,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { } } -func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration) { +func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) { startTime := time.Now() var wg sync.WaitGroup @@ -696,7 +696,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration) { }() ctx := getFlushCtx(a, pushFunc) - as.flushState(ctx) + as.flushState(ctx, resetState) ctx.flushSeries() ctx.resetSeries() putFlushCtx(ctx) diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 061a35c69..a0561b652 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -2,6 +2,7 @@ package streamaggr import ( "fmt" + "strconv" "strings" "testing" "time" @@ -37,37 +38,34 @@ func BenchmarkAggregatorsPush(b *testing.B) { } func BenchmarkAggregatorsFlushSerial(b *testing.B) { - for _, output := range benchOutputs { - b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) { - benchmarkAggregatorsFlushSerial(b, output) - }) + outputs := []string{ + "total", "sum_samples", "count_samples", "min", + "max", "avg", "increase", "count_series", + "last", "stddev", "stdvar", "total_prometheus", "increase_prometheus", } -} - -func benchmarkAggregatorsFlushSerial(b *testing.B, output string) { pushFunc := func(tss []prompbmarshal.TimeSeries) {} - a := newBenchAggregators(output, pushFunc) + a := newBenchAggregators(outputs, pushFunc) defer a.MustStop() + _ = a.Push(benchSeries, nil) - var matchIdxs []byte - + b.ResetTimer() b.ReportAllocs() - b.SetBytes(int64(len(benchSeries))) + b.SetBytes(int64(len(benchSeries) * len(outputs))) for i := 0; i < b.N; i++ { - matchIdxs = a.Push(benchSeries, matchIdxs) for _, aggr := range a.as { - aggr.flush(pushFunc, time.Hour) + aggr.flush(pushFunc, time.Hour, false) } } } func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(tss []prompbmarshal.TimeSeries) {} - a := newBenchAggregators(output, pushFunc) + a := newBenchAggregators([]string{output}, pushFunc) defer a.MustStop() const loops = 100 + b.ResetTimer() b.ReportAllocs() b.SetBytes(int64(len(benchSeries) * loops)) b.RunParallel(func(pb *testing.PB) { @@ -80,13 +78,18 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { }) } -func newBenchAggregators(output string, pushFunc PushFunc) *Aggregators { +func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { + outputsQuoted := make([]string, len(outputs)) + for i := range outputs { + outputsQuoted[i] = strconv.Quote(outputs[i]) + } config := fmt.Sprintf(` - match: http_requests_total interval: 24h - without: [job] - outputs: [%q] -`, output) + by: [job] + outputs: [%s] +`, strings.Join(outputsQuoted, ",")) + a, err := newAggregatorsFromData([]byte(config), pushFunc, nil) if err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) @@ -98,7 +101,7 @@ func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries { a := make([]string, seriesCount) for j := 0; j < seriesCount; j++ { s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+ - `some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%10, j*1000) + `some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000) a = append(a, s) } metrics := strings.Join(a, "\n") diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 4d88dc4f6..bbd63fd0e 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -56,19 +56,24 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *sumSamplesAggrState) flushState(ctx *flushCtx) { +func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*sumSamplesStateValue) sv.mu.Lock() sum := sv.sum - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum) return true diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 307b082f2..4c08d9327 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -137,7 +137,7 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) { }) } -func (as *totalAggrState) flushState(ctx *flushCtx) { +func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 @@ -148,11 +148,13 @@ func (as *totalAggrState) flushState(ctx *flushCtx) { sv := v.(*totalStateValue) sv.mu.Lock() total := sv.total - if as.resetTotalOnFlush { - sv.total = 0 - } else if math.Abs(sv.total) >= (1 << 53) { - // It is time to reset the entry, since it starts losing float64 precision - sv.total = 0 + if resetState { + if as.resetTotalOnFlush { + sv.total = 0 + } else if math.Abs(sv.total) >= (1 << 53) { + // It is time to reset the entry, since it starts losing float64 precision + sv.total = 0 + } } deleted := sv.deleted sv.mu.Unlock() diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 2c5e97828..4f10b5c02 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -60,19 +60,24 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { } } -func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx) { +func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v interface{}) bool { - // Atomically delete the entry from the map, so new entry is created for the next flush. - m.Delete(k) + if resetState { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + } sv := v.(*uniqueSamplesStateValue) sv.mu.Lock() n := len(sv.m) - // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. - sv.deleted = true + if resetState { + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + } sv.mu.Unlock() + key := k.(string) ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n)) return true