mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/streamaggr: ignore out of order samples when calculating increase, increase_prometheus, total and total_prometheus outputs
Out of order samples may result in unexpected spikes for these outputs. So it is better to ignore such samples. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931
This commit is contained in:
parent
cbd80efcc1
commit
6a465f6e29
3 changed files with 11 additions and 3 deletions
|
@ -42,6 +42,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): use the same logic in [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) as in [the deduplication at VictoriaMetrics](https://docs.victoriametrics.com/#deduplication). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): ignore out of order samples samples when calculating [`increase`](https://docs.victoriametrics.com/stream-aggregation/#increase), [`increase_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus), [`total`](https://docs.victoriametrics.com/stream-aggregation/#total) and [`total_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs. Thanks to @edma2 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`.
|
||||
|
|
|
@ -519,14 +519,14 @@ foo:1m_total 0
|
|||
outputs: [total]
|
||||
`, `
|
||||
foo 123
|
||||
bar{baz="qwe"} 1.32
|
||||
bar{baz="qwe"} 4.34
|
||||
bar{baz="qwe"} 1.31
|
||||
bar{baz="qwe"} 4.34 1000
|
||||
bar{baz="qwe"} 2
|
||||
foo{baz="qwe"} -5
|
||||
bar{baz="qwer"} 343
|
||||
bar{baz="qwer"} 344
|
||||
foo{baz="qwe"} 10
|
||||
`, `bar:1m_total{baz="qwe"} 5.02
|
||||
`, `bar:1m_total{baz="qwe"} 3.03
|
||||
bar:1m_total{baz="qwer"} 1
|
||||
foo:1m_total 0
|
||||
foo:1m_total{baz="qwe"} 15
|
||||
|
|
|
@ -42,6 +42,7 @@ type totalStateValue struct {
|
|||
|
||||
type lastValueState struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
deleteDeadline uint64
|
||||
}
|
||||
|
||||
|
@ -88,6 +89,11 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
|||
if !deleted {
|
||||
lv, ok := sv.lastValues[inputKey]
|
||||
if ok || keepFirstSample {
|
||||
if s.timestamp < lv.timestamp {
|
||||
// Skip out of order sample
|
||||
sv.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
if s.value >= lv.value {
|
||||
sv.total += s.value - lv.value
|
||||
} else {
|
||||
|
@ -96,6 +102,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
|||
}
|
||||
}
|
||||
lv.value = s.value
|
||||
lv.timestamp = s.timestamp
|
||||
lv.deleteDeadline = deleteDeadline
|
||||
sv.lastValues[inputKey] = lv
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
|
|
Loading…
Reference in a new issue