lib/streamaggr: follow-up for 9c3d44c8c9

- Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs.
  This should simplify future maintenance of the corresponding code and docs.

- Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs.

- Make more clear the docs for `rate_sum()` and `rate_avg()` outputs.

- Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related
  to incorrect suffix passing to newRateAggrState().

- Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates
  counter increase in the current aggregation window.

- Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample.
  This makes more clear the code logic.

- Move the code for removing outdated entries at rateAggrState into removeOldEntries() function.
  This make the code logic inside rateAggrState.flushState() more clear.

- Do not write output sample with zero value if there are no input series, which could be used
  for calculating the rate, e.g. if only a single sample is registered for every input series.

- Do not take into account input series with a single registered sample when calculating rate_avg(),
  since this leads to incorrect results.

- Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar.
  This shuld simplify future mantenance.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
This commit is contained in:
Aliaksandr Valialkin 2024-07-14 17:23:59 +02:00
parent cfc72cb129
commit 5354374b62
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 310 additions and 198 deletions

View file

@ -562,14 +562,14 @@ Below are aggregation functions that can be put in the `outputs` list at [stream
* [avg](#avg) * [avg](#avg)
* [count_samples](#count_samples) * [count_samples](#count_samples)
* [count_series](#count_series) * [count_series](#count_series)
* [histogram_bucket](#histogram_bucket)
* [increase](#increase) * [increase](#increase)
* [increase_prometheus](#increase_prometheus) * [increase_prometheus](#increase_prometheus)
* [rate_sum](#rate_sum)
* [rate_avg](#rate_avg)
* [histogram_bucket](#histogram_bucket)
* [last](#last) * [last](#last)
* [max](#max) * [max](#max)
* [min](#min) * [min](#min)
* [rate_avg](#rate_avg)
* [rate_sum](#rate_sum)
* [stddev](#stddev) * [stddev](#stddev)
* [stdvar](#stdvar) * [stdvar](#stdvar)
* [sum_samples](#sum_samples) * [sum_samples](#sum_samples)
@ -593,7 +593,13 @@ For example, see below time series produced by config with aggregation interval
<img alt="avg aggregation" src="stream-aggregation-check-avg.webp"> <img alt="avg aggregation" src="stream-aggregation-check-avg.webp">
See also [min](#min), [max](#max), [sum_samples](#sum_samples) and [count_samples](#count_samples). See also:
- [max](#max)
- [min](#min)
- [quantiles](#quantiles)
- [sum_samples](#sum_samples)
- [count_samples](#count_samples)
### count_samples ### count_samples
@ -605,7 +611,10 @@ The results of `count_samples` is equal to the following [MetricsQL](https://doc
sum(count_over_time(some_metric[interval])) sum(count_over_time(some_metric[interval]))
``` ```
See also [count_series](#count_series) and [sum_samples](#sum_samples). See also:
- [count_series](#count_series)
- [sum_samples](#sum_samples)
### count_series ### count_series
@ -617,7 +626,33 @@ The results of `count_series` is equal to the following [MetricsQL](https://docs
count(last_over_time(some_metric[interval])) count(last_over_time(some_metric[interval]))
``` ```
See also [count_samples](#count_samples) and [unique_samples](#unique_samples). See also:
- [count_samples](#count_samples)
- [unique_samples](#unique_samples)
### histogram_bucket
`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350)
for the input [sample values](https://docs.victoriametrics.com/keyconcepts/#raw-samples) over the given `interval`.
`histogram_bucket` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyconcepts/#gauge).
See how to aggregate regular histograms [here](#aggregating-histograms).
The results of `histogram_bucket` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
```metricsql
sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange)
```
See also:
- [quantiles](#quantiles)
- [avg](#avg)
- [max](#max)
- [min](#min)
### increase ### increase
@ -641,33 +676,12 @@ For example, see below time series produced by config with aggregation interval
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
See also [increase_prometheus](#increase_prometheus) and [total](#total). See also:
### rate_sum - [increase_prometheus](#increase_prometheus)
- [total](#total)
`rate_sum` returns the sum of average per-second change of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. - [rate_avg](#rate_avg)
`rate_sum` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter). - [rate_sum](#rate_sum)
The results of `rate_sum` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
sum(rate(some_counter[interval]))
```
See also [rate_avg](#rate_avg) and [total](#total) outputs.
### rate_avg
`rate_avg` returns the average of average per-second of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
`rate_avg` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter).
The results of `rate_avg` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
avg(rate(some_counter[interval]))
```
See also [rate_sum](#rate_avg) and [total](#total) outputs.
### increase_prometheus ### increase_prometheus
@ -686,25 +700,13 @@ If you need taking into account the first sample per time series, then take a lo
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
See also [increase](#increase), [total](#total) and [total_prometheus](#total_prometheus). See also:
### histogram_bucket - [increase](#increase)
- [rate_avg](#rate_avg)
`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) - [rate_sum](#rate_sum)
for the input [sample values](https://docs.victoriametrics.com/keyconcepts/#raw-samples) over the given `interval`. - [total](#total)
`histogram_bucket` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyconcepts/#gauge). - [total_prometheus](#total_prometheus)
See how to aggregate regular histograms [here](#aggregating-histograms).
The results of `histogram_bucket` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
```metricsql
sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange)
```
See also [quantiles](#quantiles), [min](#min), [max](#max) and [avg](#avg).
### last ### last
@ -716,7 +718,12 @@ The results of `last` is roughly equal to the following [MetricsQL](https://docs
last_over_time(some_metric[interval]) last_over_time(some_metric[interval])
``` ```
See also [min](#min), [max](#max) and [avg](#avg). See also:
- [avg](#avg)
- [max](#max)
- [min](#min)
- [quantiles](#quantiles)
### max ### max
@ -732,7 +739,12 @@ For example, see below time series produced by config with aggregation interval
<img alt="total aggregation" src="stream-aggregation-check-max.webp"> <img alt="total aggregation" src="stream-aggregation-check-max.webp">
See also [min](#min) and [avg](#avg). See also:
- [min](#min)
- [avg](#avg)
- [last](#last)
- [quantiles](#quantiles)
### min ### min
@ -748,7 +760,46 @@ For example, see below time series produced by config with aggregation interval
<img alt="min aggregation" src="stream-aggregation-check-min.webp"> <img alt="min aggregation" src="stream-aggregation-check-min.webp">
See also [max](#max) and [avg](#avg). See also:
- [max](#max)
- [avg](#avg)
- [last](#last)
- [quantiles](#quantiles)
### rate_avg
`rate_avg` returns the average of average per-second increase rates across input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
`rate_avg` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter).
The results of `rate_avg` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
avg(rate(some_counter[interval]))
```
See also:
- [rate_sum](#rate_sum)
- [increase](#increase)
- [total](#total)
### rate_sum
`rate_sum` returns the sum of average per-second increase rates across input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
`rate_sum` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter).
The results of `rate_sum` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
sum(rate(some_counter[interval]))
```
See also:
- [rate_avg](#rate_avg)
- [increase](#increase)
- [total](#total)
### stddev ### stddev
@ -762,7 +813,11 @@ The results of `stddev` is roughly equal to the following [MetricsQL](https://do
histogram_stddev(sum(histogram_over_time(some_metric[interval])) by (vmrange)) histogram_stddev(sum(histogram_over_time(some_metric[interval])) by (vmrange))
``` ```
See also [stdvar](#stdvar) and [avg](#avg). See also:
- [stdvar](#stdvar)
- [avg](#avg)
- [quantiles](#quantiles)
### stdvar ### stdvar
@ -780,7 +835,11 @@ For example, see below time series produced by config with aggregation interval
<img alt="stdvar aggregation" src="stream-aggregation-check-stdvar.webp"> <img alt="stdvar aggregation" src="stream-aggregation-check-stdvar.webp">
See also [stddev](#stddev) and [avg](#avg). See also:
- [stddev](#stddev)
- [avg](#avg)
- [quantiles](#quantiles)
### sum_samples ### sum_samples
@ -797,7 +856,10 @@ For example, see below time series produced by config with aggregation interval
<img alt="sum_samples aggregation" src="stream-aggregation-check-sum-samples.webp"> <img alt="sum_samples aggregation" src="stream-aggregation-check-sum-samples.webp">
See also [count_samples](#count_samples) and [count_series](#count_series). See also:
- [count_samples](#count_samples)
- [count_series](#count_series)
### total ### total
@ -834,7 +896,13 @@ This changes pod name label, but the `total` accounts for such a scenario and do
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
See also [total_prometheus](#total_prometheus), [increase](#increase) and [increase_prometheus](#increase_prometheus). See also:
- [total_prometheus](#total_prometheus)
- [increase](#increase)
- [increase_prometheus](#increase_prometheus)
- [rate_sum](#rate_sum)
- [rate_avg](#rate_avg)
### total_prometheus ### total_prometheus
@ -857,7 +925,13 @@ The counters are most often reset when the application is restarted.
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option. or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
See also [total](#total), [increase](#increase) and [increase_prometheus](#increase_prometheus). See also:
- [total](#total)
- [increase](#increase)
- [increase_prometheus](#increase_prometheus)
- [rate_sum](#rate_sum)
- [rate_avg](#rate_avg)
### unique_samples ### unique_samples
@ -870,7 +944,10 @@ The results of `unique_samples` is equal to the following [MetricsQL](https://do
count(count_values_over_time(some_metric[interval])) count(count_values_over_time(some_metric[interval]))
``` ```
See also [sum_samples](#sum_samples) and [count_series](#count_series). See also:
- [sum_samples](#sum_samples)
- [count_series](#count_series)
### quantiles ### quantiles
@ -885,7 +962,12 @@ The results of `quantiles(phi1, ..., phiN)` is equal to the following [MetricsQL
histogram_quantiles("quantile", phi1, ..., phiN, sum(histogram_over_time(some_metric[interval])) by (vmrange)) histogram_quantiles("quantile", phi1, ..., phiN, sum(histogram_over_time(some_metric[interval])) by (vmrange))
``` ```
See also [histogram_bucket](#histogram_bucket), [min](#min), [max](#max) and [avg](#avg). See also:
- [histogram_bucket](#histogram_bucket)
- [avg](#avg)
- [max](#max)
- [min](#min)
## Aggregating by labels ## Aggregating by labels
@ -962,11 +1044,13 @@ specified individually per each `-remoteWrite.url`:
# staleness_interval is an optional interval for resetting the per-series state if no new samples # staleness_interval is an optional interval for resetting the per-series state if no new samples
# are received during this interval for the following outputs: # are received during this interval for the following outputs:
# - total # - histogram_bucket
# - total_prometheus
# - increase # - increase
# - increase_prometheus # - increase_prometheus
# - histogram_bucket # - rate_avg
# - rate_sum
# - total
# - total_prometheus
# See https://docs.victoriametrics.com/stream-aggregation/#staleness for more details. # See https://docs.victoriametrics.com/stream-aggregation/#staleness for more details.
# #
# staleness_interval: 2m # staleness_interval: 2m
@ -1071,13 +1155,13 @@ support the following approaches for hot reloading stream aggregation configs fr
The following outputs track the last seen per-series values in order to properly calculate output values: The following outputs track the last seen per-series values in order to properly calculate output values:
- [rate_sum](#rate_sum) - [histogram_bucket](#histogram_bucket)
- [rate_avg](#rate_avg)
- [total](#total)
- [total_prometheus](#total_prometheus)
- [increase](#increase) - [increase](#increase)
- [increase_prometheus](#increase_prometheus) - [increase_prometheus](#increase_prometheus)
- [histogram_bucket](#histogram_bucket) - [rate_avg](#rate_avg)
- [rate_sum](#rate_sum)
- [total](#total)
- [total_prometheus](#total_prometheus)
The last seen per-series value is dropped if no new samples are received for the given time series during two consecutive aggregation The last seen per-series value is dropped if no new samples are received for the given time series during two consecutive aggregation
intervals specified in [stream aggregation config](#stream-aggregation-config) via `interval` option. intervals specified in [stream aggregation config](#stream-aggregation-config) via `interval` option.

View file

@ -8,11 +8,12 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
// rateAggrState calculates output=rate, e.g. the counter per-second change. // rateAggrState calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics.
type rateAggrState struct { type rateAggrState struct {
m sync.Map m sync.Map
suffix string // isAvg is set to true if rate_avg() must be calculated instead of rate_sum().
isAvg bool
// Time series state is dropped if no new samples are received during stalenessSecs. // Time series state is dropped if no new samples are received during stalenessSecs.
stalenessSecs uint64 stalenessSecs uint64
@ -30,18 +31,17 @@ type rateLastValueState struct {
timestamp int64 timestamp int64
deleteDeadline uint64 deleteDeadline uint64
// total stores cumulative difference between registered values // increase stores cumulative increase for the current time series on the current aggregation interval
// in the aggregation interval increase float64
total float64
// prevTimestamp stores timestamp of the last registered value // prevTimestamp is the timestamp of the last registered sample in the previous aggregation interval
// in the previous aggregation interval
prevTimestamp int64 prevTimestamp int64
} }
func newRateAggrState(stalenessInterval time.Duration, suffix string) *rateAggrState { func newRateAggrState(stalenessInterval time.Duration, isAvg bool) *rateAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval) stalenessSecs := roundDurationToSecs(stalenessInterval)
return &rateAggrState{ return &rateAggrState{
suffix: suffix, isAvg: isAvg,
stalenessSecs: stalenessSecs, stalenessSecs: stalenessSecs,
} }
} }
@ -78,15 +78,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
sv.mu.Unlock() sv.mu.Unlock()
continue continue
} }
if lv.prevTimestamp == 0 {
lv.prevTimestamp = lv.timestamp
}
if s.value >= lv.value { if s.value >= lv.value {
lv.total += s.value - lv.value lv.increase += s.value - lv.value
} else { } else {
// counter reset // counter reset
lv.total += s.value lv.increase += s.value
} }
} else {
lv.prevTimestamp = s.timestamp
} }
lv.value = s.value lv.value = s.value
lv.timestamp = s.timestamp lv.timestamp = s.timestamp
@ -108,54 +108,77 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000 currentTimeMsec := int64(currentTime) * 1000
var staleOutputSamples, staleInputSamples int
suffix := "rate_sum"
if as.isAvg {
suffix = "rate_avg"
}
as.removeOldEntries(ctx, suffix, currentTime)
m := &as.m m := &as.m
m.Range(func(k, v any) bool { m.Range(func(k, v any) bool {
sv := v.(*rateStateValue) sv := v.(*rateStateValue)
sv.mu.Lock()
// check for stale entries sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline lvs := sv.lastValues
if deleted { sumRate := 0.0
countSeries := 0
for k1, lv := range lvs {
d := float64(lv.timestamp-lv.prevTimestamp) / 1000
if d > 0 {
sumRate += lv.increase / d
countSeries++
}
lv.prevTimestamp = lv.timestamp
lv.increase = 0
lvs[k1] = lv
}
sv.mu.Unlock()
if countSeries == 0 {
// Nothing to update
return true
}
result := sumRate
if as.isAvg {
result /= float64(countSeries)
}
key := k.(string)
ctx.appendSeries(key, suffix, currentTimeMsec, result)
return true
})
}
func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentTime uint64) {
m := &as.m
var staleOutputSamples, staleInputSamples int
m.Range(func(k, v any) bool {
sv := v.(*rateStateValue)
sv.mu.Lock()
if currentTime > sv.deleteDeadline {
// Mark the current entry as deleted // Mark the current entry as deleted
sv.deleted = deleted sv.deleted = true
sv.mu.Unlock()
staleOutputSamples++ staleOutputSamples++
sv.mu.Unlock()
m.Delete(k) m.Delete(k)
return true return true
} }
// Delete outdated entries in sv.lastValues // Delete outdated entries in sv.lastValues
var rate float64
lvs := sv.lastValues lvs := sv.lastValues
for k1, v1 := range lvs { for k1, lv := range lvs {
if currentTime > v1.deleteDeadline { if currentTime > lv.deleteDeadline {
delete(lvs, k1) delete(lvs, k1)
staleInputSamples++ staleInputSamples++
continue
}
rateInterval := v1.timestamp - v1.prevTimestamp
if v1.prevTimestamp > 0 && rateInterval > 0 {
// calculate rate only if value was seen at least twice with different timestamps
rate += v1.total * 1000 / float64(rateInterval)
v1.prevTimestamp = v1.timestamp
v1.total = 0
lvs[k1] = v1
} }
} }
// capture m length after deleted items were removed
totalItems := len(lvs)
sv.mu.Unlock() sv.mu.Unlock()
if as.suffix == "rate_avg" && totalItems > 0 {
rate /= float64(totalItems)
}
key := k.(string)
ctx.appendSeries(key, as.suffix, currentTimeMsec, rate)
return true return true
}) })
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples) ctx.a.staleInputSamples[suffix].Add(staleInputSamples)
ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples) ctx.a.staleOutputSamples[suffix].Add(staleOutputSamples)
} }

View file

@ -27,24 +27,24 @@ import (
) )
var supportedOutputs = []string{ var supportedOutputs = []string{
"rate_sum", "avg",
"rate_avg", "count_samples",
"total", "count_series",
"total_prometheus", "histogram_bucket",
"increase", "increase",
"increase_prometheus", "increase_prometheus",
"count_series",
"count_samples",
"unique_samples",
"sum_samples",
"last", "last",
"min",
"max", "max",
"avg", "min",
"quantiles(phi1, ..., phiN)",
"rate_avg",
"rate_sum",
"stddev", "stddev",
"stdvar", "stdvar",
"histogram_bucket", "sum_samples",
"quantiles(phi1, ..., phiN)", "total",
"total_prometheus",
"unique_samples",
} }
// maxLabelValueLen is maximum match expression label value length in stream aggregation metrics // maxLabelValueLen is maximum match expression label value length in stream aggregation metrics
@ -175,24 +175,24 @@ type Config struct {
// //
// The following names are allowed: // The following names are allowed:
// //
// - rate_sum - calculates sum of rate for input counters // - avg - the average value across all the samples
// - rate_avg - calculates average of rate for input counters // - count_samples - counts the input samples
// - total - aggregates input counters // - count_series - counts the number of unique input series
// - total_prometheus - aggregates input counters, ignoring the first sample in new time series // - histogram_bucket - creates VictoriaMetrics histogram for input samples
// - increase - calculates the increase over input series // - increase - calculates the increase over input series
// - increase_prometheus - calculates the increase over input series, ignoring the first sample in new time series // - increase_prometheus - calculates the increase over input series, ignoring the first sample in new time series
// - count_series - counts the number of unique input series
// - count_samples - counts the input samples
// - unique_samples - counts the number of unique sample values
// - sum_samples - sums the input sample values
// - last - the last biggest sample value // - last - the last biggest sample value
// - min - the minimum sample value
// - max - the maximum sample value // - max - the maximum sample value
// - avg - the average value across all the samples // - min - the minimum sample value
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
// - rate_avg - calculates average of rate for input counters
// - rate_sum - calculates sum of rate for input counters
// - stddev - standard deviation across all the samples // - stddev - standard deviation across all the samples
// - stdvar - standard variance across all the samples // - stdvar - standard variance across all the samples
// - histogram_bucket - creates VictoriaMetrics histogram for input samples // - sum_samples - sums the input sample values
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1] // - total - aggregates input counters
// - total_prometheus - aggregates input counters, ignoring the first sample in new time series
// - unique_samples - counts the number of unique sample values
// //
// The output time series will have the following names by default: // The output time series will have the following names by default:
// //
@ -562,40 +562,40 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
continue continue
} }
switch output { switch output {
case "total": case "avg":
aggrStates[output] = newTotalAggrState(stalenessInterval, false, true) aggrStates[output] = newAvgAggrState()
case "total_prometheus": case "count_samples":
aggrStates[output] = newTotalAggrState(stalenessInterval, false, false) aggrStates[output] = newCountSamplesAggrState()
case "count_series":
aggrStates[output] = newCountSeriesAggrState()
case "histogram_bucket":
aggrStates[output] = newHistogramBucketAggrState(stalenessInterval)
case "increase": case "increase":
aggrStates[output] = newTotalAggrState(stalenessInterval, true, true) aggrStates[output] = newTotalAggrState(stalenessInterval, true, true)
case "increase_prometheus": case "increase_prometheus":
aggrStates[output] = newTotalAggrState(stalenessInterval, true, false) aggrStates[output] = newTotalAggrState(stalenessInterval, true, false)
case "rate_sum":
aggrStates[output] = newRateAggrState(stalenessInterval, "rate_sum")
case "rate_avg":
aggrStates[output] = newRateAggrState(stalenessInterval, "rate_avg")
case "count_series":
aggrStates[output] = newCountSeriesAggrState()
case "count_samples":
aggrStates[output] = newCountSamplesAggrState()
case "unique_samples":
aggrStates[output] = newUniqueSamplesAggrState()
case "sum_samples":
aggrStates[output] = newSumSamplesAggrState()
case "last": case "last":
aggrStates[output] = newLastAggrState() aggrStates[output] = newLastAggrState()
case "min":
aggrStates[output] = newMinAggrState()
case "max": case "max":
aggrStates[output] = newMaxAggrState() aggrStates[output] = newMaxAggrState()
case "avg": case "min":
aggrStates[output] = newAvgAggrState() aggrStates[output] = newMinAggrState()
case "rate_avg":
aggrStates[output] = newRateAggrState(stalenessInterval, true)
case "rate_sum":
aggrStates[output] = newRateAggrState(stalenessInterval, false)
case "stddev": case "stddev":
aggrStates[output] = newStddevAggrState() aggrStates[output] = newStddevAggrState()
case "stdvar": case "stdvar":
aggrStates[output] = newStdvarAggrState() aggrStates[output] = newStdvarAggrState()
case "histogram_bucket": case "sum_samples":
aggrStates[output] = newHistogramBucketAggrState(stalenessInterval) aggrStates[output] = newSumSamplesAggrState()
case "total":
aggrStates[output] = newTotalAggrState(stalenessInterval, false, true)
case "total_prometheus":
aggrStates[output] = newTotalAggrState(stalenessInterval, false, false)
case "unique_samples":
aggrStates[output] = newUniqueSamplesAggrState()
default: default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s;", output, supportedOutputs) return nil, fmt.Errorf("unsupported output=%q; supported values: %s;", output, supportedOutputs)
} }

View file

@ -891,21 +891,28 @@ foo{abc="123", cde="1"} 4
foo{abc="123", cde="1"} 8.5 10 foo{abc="123", cde="1"} 8.5 10
foo{abc="456", cde="1"} 8 foo{abc="456", cde="1"} 8
foo{abc="456", cde="1"} 10 10 foo{abc="456", cde="1"} 10 10
foo 12 34
`, `foo:1m_by_cde_rate_avg{cde="1"} 0.325 `, `foo:1m_by_cde_rate_avg{cde="1"} 0.325
foo:1m_by_cde_rate_sum{cde="1"} 0.65 foo:1m_by_cde_rate_sum{cde="1"} 0.65
`, "1111") `, "11111")
// rate with duplicated events // rate_sum and rate_avg with duplicated events
f(` f(`
- interval: 1m - interval: 1m
by: [cde]
outputs: [rate_sum, rate_avg] outputs: [rate_sum, rate_avg]
`, ` `, `
foo{abc="123", cde="1"} 4 10 foo{abc="123", cde="1"} 4 10
foo{abc="123", cde="1"} 4 10 foo{abc="123", cde="1"} 4 10
`, `foo:1m_by_cde_rate_avg{cde="1"} 0 `, ``, "11")
foo:1m_by_cde_rate_sum{cde="1"} 0
`, "11") // rate_sum and rate_avg for a single sample
f(`
- interval: 1m
outputs: [rate_sum, rate_avg]
`, `
foo 4 10
bar 5 10
`, ``, "11")
// unique_samples output // unique_samples output
f(` f(`

View file

@ -9,7 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
// totalAggrState calculates output=total, e.g. the summary counter over input counters. // totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus.
type totalAggrState struct { type totalAggrState struct {
m sync.Map m sync.Map
@ -124,39 +124,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
} }
} }
func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
m := &as.m
var staleInputSamples, staleOutputSamples int
m.Range(func(k, v any) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
staleOutputSamples++
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
staleInputSamples++
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples)
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)
}
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000 currentTimeMsec := int64(currentTime) * 1000
@ -185,3 +152,34 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
return true return true
}) })
} }
func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
m := &as.m
var staleInputSamples, staleOutputSamples int
m.Range(func(k, v any) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
if currentTime > sv.deleteDeadline {
// Mark the current entry as deleted
sv.deleted = true
staleOutputSamples++
sv.mu.Unlock()
m.Delete(k)
return true
}
// Delete outdated entries in sv.lastValues
lvs := sv.lastValues
for k1, lv := range lvs {
if currentTime > lv.deleteDeadline {
delete(lvs, k1)
staleInputSamples++
}
}
sv.mu.Unlock()
return true
})
ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples)
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)
}