mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
lib/streamaggr: prevent rate_sum
and rate_avg
from producing NaNs (#6482)
### Describe Your Changes
* check if `lastValue` was seen at least twice with different
timestamps. Otherwise, the difference between last timestamp and
previous timestamp could be `0` and will result into `NaN` calculation
* check if there items left in lastValue map after staleness cleanup.
Otherwise, `rate_avg` could have produce `NaN` result.
### Checklist
The following checks are **mandatory**:
- [x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).
---------
Signed-off-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 51d19485bb
)
This commit is contained in:
parent
5683610bf8
commit
5df50e5645
3 changed files with 30 additions and 12 deletions
|
@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
|
||||
* BUGFIX: all VictoriaMetrics components: prioritize `-configAuthKey` and `-reloadAuthKey` over `-httpAuth.*` settings. This change aligns behavior of mentioned flags with other auth flags like `-metricsAuthKey`, `-flagsAuthKey`, `-pprofAuthKey`. Check [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6329).
|
||||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): add `--disable-progress-bar` global command-line flag. It can be used for disabling dynamic progress bar for all migration modes. `--vm-disable-progress-bar` command-line flag is deprecated and will be removed in the future releases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6367).
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) producing `NaN` results for stale time series. Before, when series matched for aggregation became stale or weren't updated during aggregation interval, the `rate_sum` or `rate_avg` could produce data point with `NaN` value. During visualization, such aggregation results would be displayed as gaps in time series.
|
||||
|
||||
## [v1.102.0-rc1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc1)
|
||||
|
||||
|
|
|
@ -105,8 +105,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
|
|||
}
|
||||
}
|
||||
|
||||
func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
_ = resetState // it isn't used here
|
||||
func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
currentTimeMsec := int64(currentTime) * 1000
|
||||
|
||||
|
@ -127,23 +126,29 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
|
|||
|
||||
// Delete outdated entries in sv.lastValues
|
||||
var rate float64
|
||||
m := sv.lastValues
|
||||
for k1, v1 := range m {
|
||||
lvs := sv.lastValues
|
||||
for k1, v1 := range lvs {
|
||||
if currentTime > v1.deleteDeadline {
|
||||
delete(m, k1)
|
||||
} else if v1.prevTimestamp > 0 {
|
||||
rate += v1.total * 1000 / float64(v1.timestamp-v1.prevTimestamp)
|
||||
delete(lvs, k1)
|
||||
continue
|
||||
}
|
||||
rateInterval := v1.timestamp - v1.prevTimestamp
|
||||
if v1.prevTimestamp > 0 && rateInterval > 0 {
|
||||
// calculate rate only if value was seen at least twice with different timestamps
|
||||
rate += v1.total * 1000 / float64(rateInterval)
|
||||
v1.prevTimestamp = v1.timestamp
|
||||
v1.total = 0
|
||||
m[k1] = v1
|
||||
lvs[k1] = v1
|
||||
}
|
||||
}
|
||||
if as.suffix == "rate_avg" {
|
||||
// note: capture m length after deleted items were removed
|
||||
rate /= float64(len(m))
|
||||
}
|
||||
// capture m length after deleted items were removed
|
||||
totalItems := len(lvs)
|
||||
sv.mu.Unlock()
|
||||
|
||||
if as.suffix == "rate_avg" && totalItems > 0 {
|
||||
rate /= float64(totalItems)
|
||||
}
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, as.suffix, currentTimeMsec, rate)
|
||||
return true
|
||||
|
|
|
@ -852,6 +852,18 @@ foo{abc="456", cde="1"} 10 10
|
|||
foo:1m_by_cde_rate_sum{cde="1"} 0.65
|
||||
`, "1111")
|
||||
|
||||
// rate with duplicated events
|
||||
f(`
|
||||
- interval: 1m
|
||||
by: [cde]
|
||||
outputs: [rate_sum, rate_avg]
|
||||
`, `
|
||||
foo{abc="123", cde="1"} 4 10
|
||||
foo{abc="123", cde="1"} 4 10
|
||||
`, `foo:1m_by_cde_rate_avg{cde="1"} 0
|
||||
foo:1m_by_cde_rate_sum{cde="1"} 0
|
||||
`, "11")
|
||||
|
||||
// keep_metric_names
|
||||
f(`
|
||||
- interval: 1m
|
||||
|
|
Loading…
Reference in a new issue