mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00

### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
123 lines
3.2 KiB
Go
123 lines
3.2 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"math"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
)
|
|
|
|
type totalLastValue struct {
|
|
value float64
|
|
timestamp int64
|
|
deleteDeadline int64
|
|
}
|
|
|
|
type totalAggrValueShared struct {
|
|
lastValues map[string]totalLastValue
|
|
total float64
|
|
}
|
|
|
|
type totalAggrValue struct {
|
|
total float64
|
|
shared *totalAggrValueShared
|
|
}
|
|
|
|
func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
|
ac := c.(*totalAggrConfig)
|
|
currentTime := fasttime.UnixTimestamp()
|
|
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
|
|
lv, ok := av.shared.lastValues[key]
|
|
if ok || keepFirstSample {
|
|
if sample.timestamp < lv.timestamp {
|
|
// Skip out of order sample
|
|
return
|
|
}
|
|
if sample.value >= lv.value {
|
|
av.total += sample.value - lv.value
|
|
} else {
|
|
// counter reset
|
|
av.total += sample.value
|
|
}
|
|
}
|
|
lv.value = sample.value
|
|
lv.timestamp = sample.timestamp
|
|
lv.deleteDeadline = deleteDeadline
|
|
key = bytesutil.InternString(key)
|
|
av.shared.lastValues[key] = lv
|
|
}
|
|
|
|
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
|
|
ac := c.(*totalAggrConfig)
|
|
suffix := ac.getSuffix()
|
|
// check for stale entries
|
|
total := av.shared.total + av.total
|
|
av.total = 0
|
|
lvs := av.shared.lastValues
|
|
for lk, lv := range lvs {
|
|
if ctx.flushTimestamp > lv.deleteDeadline {
|
|
delete(lvs, lk)
|
|
}
|
|
}
|
|
if ac.resetTotalOnFlush {
|
|
av.shared.total = 0
|
|
} else if math.Abs(total) >= (1 << 53) {
|
|
// It is time to reset the entry, since it starts losing float64 precision
|
|
av.shared.total = 0
|
|
} else {
|
|
av.shared.total = total
|
|
}
|
|
ctx.appendSeries(key, suffix, total)
|
|
}
|
|
|
|
func (av *totalAggrValue) state() any {
|
|
return av.shared
|
|
}
|
|
|
|
func newTotalAggrConfig(ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
|
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
|
return &totalAggrConfig{
|
|
keepFirstSample: keepFirstSample,
|
|
resetTotalOnFlush: resetTotalOnFlush,
|
|
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
|
}
|
|
}
|
|
|
|
type totalAggrConfig struct {
|
|
resetTotalOnFlush bool
|
|
|
|
// Whether to take into account the first sample in new time series when calculating the output value.
|
|
keepFirstSample bool
|
|
|
|
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
|
|
// This allows avoiding an initial spike of the output values at startup when new time series
|
|
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
|
|
ignoreFirstSampleDeadline uint64
|
|
}
|
|
|
|
func (*totalAggrConfig) getValue(s any) aggrValue {
|
|
var shared *totalAggrValueShared
|
|
if s == nil {
|
|
shared = &totalAggrValueShared{
|
|
lastValues: make(map[string]totalLastValue),
|
|
}
|
|
} else {
|
|
shared = s.(*totalAggrValueShared)
|
|
}
|
|
return &totalAggrValue{
|
|
shared: shared,
|
|
}
|
|
}
|
|
|
|
func (ac *totalAggrConfig) getSuffix() string {
|
|
if ac.resetTotalOnFlush {
|
|
if ac.keepFirstSample {
|
|
return "increase"
|
|
}
|
|
return "increase_prometheus"
|
|
}
|
|
if ac.keepFirstSample {
|
|
return "total"
|
|
}
|
|
return "total_prometheus"
|
|
}
|