From 6a465f6e299dff033754c054f3646b2388281feb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 17 Mar 2024 22:03:01 +0200 Subject: [PATCH] lib/streamaggr: ignore out of order samples when calculating increase, increase_prometheus, total and total_prometheus outputs Out of order samples may result in unexpected spikes for these outputs. So it is better to ignore such samples. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931 --- docs/CHANGELOG.md | 1 + lib/streamaggr/streamaggr_test.go | 6 +++--- lib/streamaggr/total.go | 7 +++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5bb7377702..b744ab5ab2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -42,6 +42,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): use the same logic in [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) as in [the deduplication at VictoriaMetrics](https://docs.victoriametrics.com/#deduplication). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): ignore out of order samples samples when calculating [`increase`](https://docs.victoriametrics.com/stream-aggregation/#increase), [`increase_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus), [`total`](https://docs.victoriametrics.com/stream-aggregation/#total) and [`total_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs. Thanks to @edma2 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 7d49b40c54..0808d0f42a 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -519,14 +519,14 @@ foo:1m_total 0 outputs: [total] `, ` foo 123 -bar{baz="qwe"} 1.32 -bar{baz="qwe"} 4.34 +bar{baz="qwe"} 1.31 +bar{baz="qwe"} 4.34 1000 bar{baz="qwe"} 2 foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 -`, `bar:1m_total{baz="qwe"} 5.02 +`, `bar:1m_total{baz="qwe"} 3.03 bar:1m_total{baz="qwer"} 1 foo:1m_total 0 foo:1m_total{baz="qwe"} 15 diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 4c08d9327b..e0b28d437c 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -42,6 +42,7 @@ type totalStateValue struct { type lastValueState struct { value float64 + timestamp int64 deleteDeadline uint64 } @@ -88,6 +89,11 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { if !deleted { lv, ok := sv.lastValues[inputKey] if ok || keepFirstSample { + if s.timestamp < lv.timestamp { + // Skip out of order sample + sv.mu.Unlock() + continue + } if s.value >= lv.value { sv.total += s.value - lv.value } else { @@ -96,6 +102,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } } lv.value = s.value + lv.timestamp = s.timestamp lv.deleteDeadline = deleteDeadline sv.lastValues[inputKey] = lv sv.deleteDeadline = deleteDeadline