diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1276c5ce2..daa16d605 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: all VictoriaMetrics components: prioritize `-configAuthKey` and `-reloadAuthKey` over `-httpAuth.*` settings. This change aligns behavior of mentioned flags with other auth flags like `-metricsAuthKey`, `-flagsAuthKey`, `-pprofAuthKey`. Check [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6329). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): add `--disable-progress-bar` global command-line flag. It can be used for disabling dynamic progress bar for all migration modes. `--vm-disable-progress-bar` command-line flag is deprecated and will be removed in the future releases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6367). +* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): prevent [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) producing `NaN` results for stale time series. Before, when series matched for aggregation became stale or weren't updated during aggregation interval, the `rate_sum` or `rate_avg` could produce data point with `NaN` value. During visualization, such aggregation results would be displayed as gaps in time series. ## [v1.102.0-rc1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc1) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 44e6bcea4..33d81bc2f 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -105,8 +105,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { } } -func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { - _ = resetState // it isn't used here +func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 @@ -127,23 +126,29 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { // Delete outdated entries in sv.lastValues var rate float64 - m := sv.lastValues - for k1, v1 := range m { + lvs := sv.lastValues + for k1, v1 := range lvs { if currentTime > v1.deleteDeadline { - delete(m, k1) - } else if v1.prevTimestamp > 0 { - rate += v1.total * 1000 / float64(v1.timestamp-v1.prevTimestamp) + delete(lvs, k1) + continue + } + rateInterval := v1.timestamp - v1.prevTimestamp + if v1.prevTimestamp > 0 && rateInterval > 0 { + // calculate rate only if value was seen at least twice with different timestamps + rate += v1.total * 1000 / float64(rateInterval) v1.prevTimestamp = v1.timestamp v1.total = 0 - m[k1] = v1 + lvs[k1] = v1 } } - if as.suffix == "rate_avg" { - // note: capture m length after deleted items were removed - rate /= float64(len(m)) - } + // capture m length after deleted items were removed + totalItems := len(lvs) sv.mu.Unlock() + if as.suffix == "rate_avg" && totalItems > 0 { + rate /= float64(totalItems) + } + key := k.(string) ctx.appendSeries(key, as.suffix, currentTimeMsec, rate) return true diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index f3e73ba51..5090ee31c 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -852,6 +852,18 @@ foo{abc="456", cde="1"} 10 10 foo:1m_by_cde_rate_sum{cde="1"} 0.65 `, "1111") + // rate with duplicated events + f(` +- interval: 1m + by: [cde] + outputs: [rate_sum, rate_avg] +`, ` +foo{abc="123", cde="1"} 4 10 +foo{abc="123", cde="1"} 4 10 +`, `foo:1m_by_cde_rate_avg{cde="1"} 0 +foo:1m_by_cde_rate_sum{cde="1"} 0 +`, "11") + // keep_metric_names f(` - interval: 1m