lib/streamaggr: prevent rate_sum and rate_avg from producing NaNs (#6482)

### Describe Your Changes

* check if `lastValue` was seen at least twice with different
timestamps. Otherwise, the difference between last timestamp and
previous timestamp could be `0` and will result into `NaN` calculation
* check if there items left in lastValue map after staleness cleanup.
Otherwise, `rate_avg` could have produce `NaN` result.

### Checklist

The following checks are **mandatory**:

- [x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2024-06-14 10:06:22 +02:00 committed by GitHub
parent 2d575cd27a
commit 51d19485bb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 30 additions and 12 deletions

View file

@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: all VictoriaMetrics components: prioritize `-configAuthKey` and `-reloadAuthKey` over `-httpAuth.*` settings. This change aligns behavior of mentioned flags with other auth flags like `-metricsAuthKey`, `-flagsAuthKey`, `-pprofAuthKey`. Check [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6329).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): add `--disable-progress-bar` global command-line flag. It can be used for disabling dynamic progress bar for all migration modes. `--vm-disable-progress-bar` command-line flag is deprecated and will be removed in the future releases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6367).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) producing `NaN` results for stale time series. Before, when series matched for aggregation became stale or weren't updated during aggregation interval, the `rate_sum` or `rate_avg` could produce data point with `NaN` value. During visualization, such aggregation results would be displayed as gaps in time series.
## [v1.102.0-rc1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc1)

View file

@ -105,8 +105,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
}
}
func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
_ = resetState // it isn't used here
func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
@ -127,23 +126,29 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
// Delete outdated entries in sv.lastValues
var rate float64
m := sv.lastValues
for k1, v1 := range m {
lvs := sv.lastValues
for k1, v1 := range lvs {
if currentTime > v1.deleteDeadline {
delete(m, k1)
} else if v1.prevTimestamp > 0 {
rate += v1.total * 1000 / float64(v1.timestamp-v1.prevTimestamp)
delete(lvs, k1)
continue
}
rateInterval := v1.timestamp - v1.prevTimestamp
if v1.prevTimestamp > 0 && rateInterval > 0 {
// calculate rate only if value was seen at least twice with different timestamps
rate += v1.total * 1000 / float64(rateInterval)
v1.prevTimestamp = v1.timestamp
v1.total = 0
m[k1] = v1
lvs[k1] = v1
}
}
if as.suffix == "rate_avg" {
// note: capture m length after deleted items were removed
rate /= float64(len(m))
}
// capture m length after deleted items were removed
totalItems := len(lvs)
sv.mu.Unlock()
if as.suffix == "rate_avg" && totalItems > 0 {
rate /= float64(totalItems)
}
key := k.(string)
ctx.appendSeries(key, as.suffix, currentTimeMsec, rate)
return true

View file

@ -852,6 +852,18 @@ foo{abc="456", cde="1"} 10 10
foo:1m_by_cde_rate_sum{cde="1"} 0.65
`, "1111")
// rate with duplicated events
f(`
- interval: 1m
by: [cde]
outputs: [rate_sum, rate_avg]
`, `
foo{abc="123", cde="1"} 4 10
foo{abc="123", cde="1"} 4 10
`, `foo:1m_by_cde_rate_avg{cde="1"} 0
foo:1m_by_cde_rate_sum{cde="1"} 0
`, "11")
// keep_metric_names
f(`
- interval: 1m