diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index b791aaeb2..a61c9a202 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -562,14 +562,14 @@ Below are aggregation functions that can be put in the `outputs` list at [stream * [avg](#avg) * [count_samples](#count_samples) * [count_series](#count_series) +* [histogram_bucket](#histogram_bucket) * [increase](#increase) * [increase_prometheus](#increase_prometheus) -* [rate_sum](#rate_sum) -* [rate_avg](#rate_avg) -* [histogram_bucket](#histogram_bucket) * [last](#last) * [max](#max) * [min](#min) +* [rate_avg](#rate_avg) +* [rate_sum](#rate_sum) * [stddev](#stddev) * [stdvar](#stdvar) * [sum_samples](#sum_samples) @@ -593,7 +593,13 @@ For example, see below time series produced by config with aggregation interval avg aggregation -See also [min](#min), [max](#max), [sum_samples](#sum_samples) and [count_samples](#count_samples). +See also: + +- [max](#max) +- [min](#min) +- [quantiles](#quantiles) +- [sum_samples](#sum_samples) +- [count_samples](#count_samples) ### count_samples @@ -605,7 +611,10 @@ The results of `count_samples` is equal to the following [MetricsQL](https://doc sum(count_over_time(some_metric[interval])) ``` -See also [count_series](#count_series) and [sum_samples](#sum_samples). +See also: + +- [count_series](#count_series) +- [sum_samples](#sum_samples) ### count_series @@ -617,7 +626,33 @@ The results of `count_series` is equal to the following [MetricsQL](https://docs count(last_over_time(some_metric[interval])) ``` -See also [count_samples](#count_samples) and [unique_samples](#unique_samples). +See also: + +- [count_samples](#count_samples) +- [unique_samples](#unique_samples) + +### histogram_bucket + +`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) + for the input [sample values](https://docs.victoriametrics.com/keyconcepts/#raw-samples) over the given `interval`. +`histogram_bucket` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyconcepts/#gauge). +See how to aggregate regular histograms [here](#aggregating-histograms). + +The results of `histogram_bucket` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) +or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. + +```metricsql +sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange) +``` + +See also: + +- [quantiles](#quantiles) +- [avg](#avg) +- [max](#max) +- [min](#min) ### increase @@ -641,33 +676,12 @@ For example, see below time series produced by config with aggregation interval Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. -See also [increase_prometheus](#increase_prometheus) and [total](#total). +See also: -### rate_sum - -`rate_sum` returns the sum of average per-second change of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. -`rate_sum` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter). - -The results of `rate_sum` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: - -```metricsql -sum(rate(some_counter[interval])) -``` - -See also [rate_avg](#rate_avg) and [total](#total) outputs. - -### rate_avg - -`rate_avg` returns the average of average per-second of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. -`rate_avg` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter). - -The results of `rate_avg` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: - -```metricsql -avg(rate(some_counter[interval])) -``` - -See also [rate_sum](#rate_avg) and [total](#total) outputs. +- [increase_prometheus](#increase_prometheus) +- [total](#total) +- [rate_avg](#rate_avg) +- [rate_sum](#rate_sum) ### increase_prometheus @@ -686,25 +700,13 @@ If you need taking into account the first sample per time series, then take a lo Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. -See also [increase](#increase), [total](#total) and [total_prometheus](#total_prometheus). +See also: -### histogram_bucket - -`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) - for the input [sample values](https://docs.victoriametrics.com/keyconcepts/#raw-samples) over the given `interval`. -`histogram_bucket` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyconcepts/#gauge). -See how to aggregate regular histograms [here](#aggregating-histograms). - -The results of `histogram_bucket` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: - -Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) -or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. - -```metricsql -sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange) -``` - -See also [quantiles](#quantiles), [min](#min), [max](#max) and [avg](#avg). +- [increase](#increase) +- [rate_avg](#rate_avg) +- [rate_sum](#rate_sum) +- [total](#total) +- [total_prometheus](#total_prometheus) ### last @@ -716,7 +718,12 @@ The results of `last` is roughly equal to the following [MetricsQL](https://docs last_over_time(some_metric[interval]) ``` -See also [min](#min), [max](#max) and [avg](#avg). +See also: + +- [avg](#avg) +- [max](#max) +- [min](#min) +- [quantiles](#quantiles) ### max @@ -732,7 +739,12 @@ For example, see below time series produced by config with aggregation interval total aggregation -See also [min](#min) and [avg](#avg). +See also: + +- [min](#min) +- [avg](#avg) +- [last](#last) +- [quantiles](#quantiles) ### min @@ -748,7 +760,46 @@ For example, see below time series produced by config with aggregation interval min aggregation -See also [max](#max) and [avg](#avg). +See also: + +- [max](#max) +- [avg](#avg) +- [last](#last) +- [quantiles](#quantiles) + +### rate_avg + +`rate_avg` returns the average of average per-second increase rates across input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. +`rate_avg` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter). + +The results of `rate_avg` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +avg(rate(some_counter[interval])) +``` + +See also: + +- [rate_sum](#rate_sum) +- [increase](#increase) +- [total](#total) + +### rate_sum + +`rate_sum` returns the sum of average per-second increase rates across input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. +`rate_sum` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter). + +The results of `rate_sum` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +sum(rate(some_counter[interval])) +``` + +See also: + +- [rate_avg](#rate_avg) +- [increase](#increase) +- [total](#total) ### stddev @@ -762,7 +813,11 @@ The results of `stddev` is roughly equal to the following [MetricsQL](https://do histogram_stddev(sum(histogram_over_time(some_metric[interval])) by (vmrange)) ``` -See also [stdvar](#stdvar) and [avg](#avg). +See also: + +- [stdvar](#stdvar) +- [avg](#avg) +- [quantiles](#quantiles) ### stdvar @@ -780,7 +835,11 @@ For example, see below time series produced by config with aggregation interval stdvar aggregation -See also [stddev](#stddev) and [avg](#avg). +See also: + +- [stddev](#stddev) +- [avg](#avg) +- [quantiles](#quantiles) ### sum_samples @@ -797,7 +856,10 @@ For example, see below time series produced by config with aggregation interval sum_samples aggregation -See also [count_samples](#count_samples) and [count_series](#count_series). +See also: + +- [count_samples](#count_samples) +- [count_series](#count_series) ### total @@ -834,7 +896,13 @@ This changes pod name label, but the `total` accounts for such a scenario and do Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. -See also [total_prometheus](#total_prometheus), [increase](#increase) and [increase_prometheus](#increase_prometheus). +See also: + +- [total_prometheus](#total_prometheus) +- [increase](#increase) +- [increase_prometheus](#increase_prometheus) +- [rate_sum](#rate_sum) +- [rate_avg](#rate_avg) ### total_prometheus @@ -857,7 +925,13 @@ The counters are most often reset when the application is restarted. Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. -See also [total](#total), [increase](#increase) and [increase_prometheus](#increase_prometheus). +See also: + +- [total](#total) +- [increase](#increase) +- [increase_prometheus](#increase_prometheus) +- [rate_sum](#rate_sum) +- [rate_avg](#rate_avg) ### unique_samples @@ -870,7 +944,10 @@ The results of `unique_samples` is equal to the following [MetricsQL](https://do count(count_values_over_time(some_metric[interval])) ``` -See also [sum_samples](#sum_samples) and [count_series](#count_series). +See also: + +- [sum_samples](#sum_samples) +- [count_series](#count_series) ### quantiles @@ -885,7 +962,12 @@ The results of `quantiles(phi1, ..., phiN)` is equal to the following [MetricsQL histogram_quantiles("quantile", phi1, ..., phiN, sum(histogram_over_time(some_metric[interval])) by (vmrange)) ``` -See also [histogram_bucket](#histogram_bucket), [min](#min), [max](#max) and [avg](#avg). +See also: + +- [histogram_bucket](#histogram_bucket) +- [avg](#avg) +- [max](#max) +- [min](#min) ## Aggregating by labels @@ -962,11 +1044,13 @@ specified individually per each `-remoteWrite.url`: # staleness_interval is an optional interval for resetting the per-series state if no new samples # are received during this interval for the following outputs: - # - total - # - total_prometheus + # - histogram_bucket # - increase # - increase_prometheus - # - histogram_bucket + # - rate_avg + # - rate_sum + # - total + # - total_prometheus # See https://docs.victoriametrics.com/stream-aggregation/#staleness for more details. # # staleness_interval: 2m @@ -1071,13 +1155,13 @@ support the following approaches for hot reloading stream aggregation configs fr The following outputs track the last seen per-series values in order to properly calculate output values: -- [rate_sum](#rate_sum) -- [rate_avg](#rate_avg) -- [total](#total) -- [total_prometheus](#total_prometheus) +- [histogram_bucket](#histogram_bucket) - [increase](#increase) - [increase_prometheus](#increase_prometheus) -- [histogram_bucket](#histogram_bucket) +- [rate_avg](#rate_avg) +- [rate_sum](#rate_sum) +- [total](#total) +- [total_prometheus](#total_prometheus) The last seen per-series value is dropped if no new samples are received for the given time series during two consecutive aggregation intervals specified in [stream aggregation config](#stream-aggregation-config) via `interval` option. diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 6fe401d37..24101e4fe 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -8,11 +8,12 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) -// rateAggrState calculates output=rate, e.g. the counter per-second change. +// rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics. type rateAggrState struct { m sync.Map - suffix string + // isAvg is set to true if rate_avg() must be calculated instead of rate_sum(). + isAvg bool // Time series state is dropped if no new samples are received during stalenessSecs. stalenessSecs uint64 @@ -30,18 +31,17 @@ type rateLastValueState struct { timestamp int64 deleteDeadline uint64 - // total stores cumulative difference between registered values - // in the aggregation interval - total float64 - // prevTimestamp stores timestamp of the last registered value - // in the previous aggregation interval + // increase stores cumulative increase for the current time series on the current aggregation interval + increase float64 + + // prevTimestamp is the timestamp of the last registered sample in the previous aggregation interval prevTimestamp int64 } -func newRateAggrState(stalenessInterval time.Duration, suffix string) *rateAggrState { +func newRateAggrState(stalenessInterval time.Duration, isAvg bool) *rateAggrState { stalenessSecs := roundDurationToSecs(stalenessInterval) return &rateAggrState{ - suffix: suffix, + isAvg: isAvg, stalenessSecs: stalenessSecs, } } @@ -78,15 +78,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { sv.mu.Unlock() continue } - if lv.prevTimestamp == 0 { - lv.prevTimestamp = lv.timestamp - } + if s.value >= lv.value { - lv.total += s.value - lv.value + lv.increase += s.value - lv.value } else { // counter reset - lv.total += s.value + lv.increase += s.value } + } else { + lv.prevTimestamp = s.timestamp } lv.value = s.value lv.timestamp = s.timestamp @@ -108,54 +108,77 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 - var staleOutputSamples, staleInputSamples int + + suffix := "rate_sum" + if as.isAvg { + suffix = "rate_avg" + } + + as.removeOldEntries(ctx, suffix, currentTime) m := &as.m m.Range(func(k, v any) bool { sv := v.(*rateStateValue) - sv.mu.Lock() - // check for stale entries - deleted := currentTime > sv.deleteDeadline - if deleted { + sv.mu.Lock() + lvs := sv.lastValues + sumRate := 0.0 + countSeries := 0 + for k1, lv := range lvs { + d := float64(lv.timestamp-lv.prevTimestamp) / 1000 + if d > 0 { + sumRate += lv.increase / d + countSeries++ + } + lv.prevTimestamp = lv.timestamp + lv.increase = 0 + lvs[k1] = lv + } + sv.mu.Unlock() + + if countSeries == 0 { + // Nothing to update + return true + } + + result := sumRate + if as.isAvg { + result /= float64(countSeries) + } + + key := k.(string) + ctx.appendSeries(key, suffix, currentTimeMsec, result) + return true + }) +} + +func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentTime uint64) { + m := &as.m + var staleOutputSamples, staleInputSamples int + m.Range(func(k, v any) bool { + sv := v.(*rateStateValue) + + sv.mu.Lock() + if currentTime > sv.deleteDeadline { // Mark the current entry as deleted - sv.deleted = deleted - sv.mu.Unlock() + sv.deleted = true staleOutputSamples++ + sv.mu.Unlock() m.Delete(k) return true } // Delete outdated entries in sv.lastValues - var rate float64 lvs := sv.lastValues - for k1, v1 := range lvs { - if currentTime > v1.deleteDeadline { + for k1, lv := range lvs { + if currentTime > lv.deleteDeadline { delete(lvs, k1) staleInputSamples++ - continue - } - rateInterval := v1.timestamp - v1.prevTimestamp - if v1.prevTimestamp > 0 && rateInterval > 0 { - // calculate rate only if value was seen at least twice with different timestamps - rate += v1.total * 1000 / float64(rateInterval) - v1.prevTimestamp = v1.timestamp - v1.total = 0 - lvs[k1] = v1 } } - // capture m length after deleted items were removed - totalItems := len(lvs) sv.mu.Unlock() - - if as.suffix == "rate_avg" && totalItems > 0 { - rate /= float64(totalItems) - } - - key := k.(string) - ctx.appendSeries(key, as.suffix, currentTimeMsec, rate) return true }) - ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples) - ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples) + ctx.a.staleInputSamples[suffix].Add(staleInputSamples) + ctx.a.staleOutputSamples[suffix].Add(staleOutputSamples) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 7691b9b4a..efdcb5baf 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -27,24 +27,24 @@ import ( ) var supportedOutputs = []string{ - "rate_sum", - "rate_avg", - "total", - "total_prometheus", + "avg", + "count_samples", + "count_series", + "histogram_bucket", "increase", "increase_prometheus", - "count_series", - "count_samples", - "unique_samples", - "sum_samples", "last", - "min", "max", - "avg", + "min", + "quantiles(phi1, ..., phiN)", + "rate_avg", + "rate_sum", "stddev", "stdvar", - "histogram_bucket", - "quantiles(phi1, ..., phiN)", + "sum_samples", + "total", + "total_prometheus", + "unique_samples", } // maxLabelValueLen is maximum match expression label value length in stream aggregation metrics @@ -175,24 +175,24 @@ type Config struct { // // The following names are allowed: // - // - rate_sum - calculates sum of rate for input counters - // - rate_avg - calculates average of rate for input counters - // - total - aggregates input counters - // - total_prometheus - aggregates input counters, ignoring the first sample in new time series + // - avg - the average value across all the samples + // - count_samples - counts the input samples + // - count_series - counts the number of unique input series + // - histogram_bucket - creates VictoriaMetrics histogram for input samples // - increase - calculates the increase over input series // - increase_prometheus - calculates the increase over input series, ignoring the first sample in new time series - // - count_series - counts the number of unique input series - // - count_samples - counts the input samples - // - unique_samples - counts the number of unique sample values - // - sum_samples - sums the input sample values // - last - the last biggest sample value - // - min - the minimum sample value // - max - the maximum sample value - // - avg - the average value across all the samples + // - min - the minimum sample value + // - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1] + // - rate_avg - calculates average of rate for input counters + // - rate_sum - calculates sum of rate for input counters // - stddev - standard deviation across all the samples // - stdvar - standard variance across all the samples - // - histogram_bucket - creates VictoriaMetrics histogram for input samples - // - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1] + // - sum_samples - sums the input sample values + // - total - aggregates input counters + // - total_prometheus - aggregates input counters, ignoring the first sample in new time series + // - unique_samples - counts the number of unique sample values // // The output time series will have the following names by default: // @@ -562,40 +562,40 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options continue } switch output { - case "total": - aggrStates[output] = newTotalAggrState(stalenessInterval, false, true) - case "total_prometheus": - aggrStates[output] = newTotalAggrState(stalenessInterval, false, false) + case "avg": + aggrStates[output] = newAvgAggrState() + case "count_samples": + aggrStates[output] = newCountSamplesAggrState() + case "count_series": + aggrStates[output] = newCountSeriesAggrState() + case "histogram_bucket": + aggrStates[output] = newHistogramBucketAggrState(stalenessInterval) case "increase": aggrStates[output] = newTotalAggrState(stalenessInterval, true, true) case "increase_prometheus": aggrStates[output] = newTotalAggrState(stalenessInterval, true, false) - case "rate_sum": - aggrStates[output] = newRateAggrState(stalenessInterval, "rate_sum") - case "rate_avg": - aggrStates[output] = newRateAggrState(stalenessInterval, "rate_avg") - case "count_series": - aggrStates[output] = newCountSeriesAggrState() - case "count_samples": - aggrStates[output] = newCountSamplesAggrState() - case "unique_samples": - aggrStates[output] = newUniqueSamplesAggrState() - case "sum_samples": - aggrStates[output] = newSumSamplesAggrState() case "last": aggrStates[output] = newLastAggrState() - case "min": - aggrStates[output] = newMinAggrState() case "max": aggrStates[output] = newMaxAggrState() - case "avg": - aggrStates[output] = newAvgAggrState() + case "min": + aggrStates[output] = newMinAggrState() + case "rate_avg": + aggrStates[output] = newRateAggrState(stalenessInterval, true) + case "rate_sum": + aggrStates[output] = newRateAggrState(stalenessInterval, false) case "stddev": aggrStates[output] = newStddevAggrState() case "stdvar": aggrStates[output] = newStdvarAggrState() - case "histogram_bucket": - aggrStates[output] = newHistogramBucketAggrState(stalenessInterval) + case "sum_samples": + aggrStates[output] = newSumSamplesAggrState() + case "total": + aggrStates[output] = newTotalAggrState(stalenessInterval, false, true) + case "total_prometheus": + aggrStates[output] = newTotalAggrState(stalenessInterval, false, false) + case "unique_samples": + aggrStates[output] = newUniqueSamplesAggrState() default: return nil, fmt.Errorf("unsupported output=%q; supported values: %s;", output, supportedOutputs) } diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 28170cb96..868b7bdbb 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -891,21 +891,28 @@ foo{abc="123", cde="1"} 4 foo{abc="123", cde="1"} 8.5 10 foo{abc="456", cde="1"} 8 foo{abc="456", cde="1"} 10 10 +foo 12 34 `, `foo:1m_by_cde_rate_avg{cde="1"} 0.325 foo:1m_by_cde_rate_sum{cde="1"} 0.65 -`, "1111") +`, "11111") - // rate with duplicated events + // rate_sum and rate_avg with duplicated events f(` - interval: 1m - by: [cde] outputs: [rate_sum, rate_avg] `, ` foo{abc="123", cde="1"} 4 10 foo{abc="123", cde="1"} 4 10 -`, `foo:1m_by_cde_rate_avg{cde="1"} 0 -foo:1m_by_cde_rate_sum{cde="1"} 0 -`, "11") +`, ``, "11") + + // rate_sum and rate_avg for a single sample + f(` +- interval: 1m + outputs: [rate_sum, rate_avg] +`, ` +foo 4 10 +bar 5 10 +`, ``, "11") // unique_samples output f(` diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index e0c26a0fe..c0df106f5 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -9,7 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) -// totalAggrState calculates output=total, e.g. the summary counter over input counters. +// totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus. type totalAggrState struct { m sync.Map @@ -124,39 +124,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } } -func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { - m := &as.m - var staleInputSamples, staleOutputSamples int - m.Range(func(k, v any) bool { - sv := v.(*totalStateValue) - - sv.mu.Lock() - deleted := currentTime > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - staleOutputSamples++ - } else { - // Delete outdated entries in sv.lastValues - m := sv.lastValues - for k1, v1 := range m { - if currentTime > v1.deleteDeadline { - delete(m, k1) - staleInputSamples++ - } - } - } - sv.mu.Unlock() - - if deleted { - m.Delete(k) - } - return true - }) - ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples) - ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples) -} - func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 @@ -185,3 +152,34 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { return true }) } + +func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { + m := &as.m + var staleInputSamples, staleOutputSamples int + m.Range(func(k, v any) bool { + sv := v.(*totalStateValue) + + sv.mu.Lock() + if currentTime > sv.deleteDeadline { + // Mark the current entry as deleted + sv.deleted = true + staleOutputSamples++ + sv.mu.Unlock() + m.Delete(k) + return true + } + + // Delete outdated entries in sv.lastValues + lvs := sv.lastValues + for k1, lv := range lvs { + if currentTime > lv.deleteDeadline { + delete(lvs, k1) + staleInputSamples++ + } + } + sv.mu.Unlock() + return true + }) + ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples) + ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples) +}