VictoriaMetrics/lib/streamaggr/rate.go

190 lines
4 KiB
Go
Raw Permalink Normal View History

package streamaggr
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
"sync"
)
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
var rateAggrSharedValuePool sync.Pool
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func putRateAggrSharedValue(v *rateAggrSharedValue) {
v.reset()
rateAggrSharedValuePool.Put(v)
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func getRateAggrSharedValue(isGreen bool) *rateAggrSharedValue {
v := rateAggrSharedValuePool.Get()
if v == nil {
v = &rateAggrSharedValue{}
}
av := v.(*rateAggrSharedValue)
if isGreen {
av.green = getRateAggrStateValue()
} else {
av.blue = getRateAggrStateValue()
}
return av
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
var rateAggrStateValuePool sync.Pool
func putRateAggrStateValue(v *rateAggrStateValue) {
v.timestamp = 0
v.increase = 0
rateAggrStateValuePool.Put(v)
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func getRateAggrStateValue() *rateAggrStateValue {
v := rateAggrStateValuePool.Get()
if v == nil {
return &rateAggrStateValue{}
}
return v.(*rateAggrStateValue)
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
// rateAggrSharedValue calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics.
type rateAggrSharedValue struct {
value float64
deleteDeadline int64
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
// prevTimestamp is the timestamp of the last registered sample in the previous aggregation interval
prevTimestamp int64
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
blue *rateAggrStateValue
green *rateAggrStateValue
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func (v *rateAggrSharedValue) getState(isGreen bool) *rateAggrStateValue {
if isGreen {
if v.green == nil {
v.green = getRateAggrStateValue()
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
return v.green
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
if v.blue == nil {
v.blue = getRateAggrStateValue()
}
return v.blue
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func (v *rateAggrSharedValue) reset() {
v.value = 0
v.deleteDeadline = 0
v.prevTimestamp = 0
putRateAggrStateValue(v.blue)
putRateAggrStateValue(v.green)
v.blue = nil
v.green = nil
}
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
type rateAggrStateValue struct {
// increase stores cumulative increase for the current time series on the current aggregation interval
increase float64
timestamp int64
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
type rateAggrValue struct {
shared map[string]*rateAggrSharedValue
isGreen bool
}
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
var state *rateAggrStateValue
sv, ok := av.shared[key]
if ok {
state = sv.getState(av.isGreen)
if sample.timestamp < state.timestamp {
// Skip out of order sample
return
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
if sample.value >= sv.value {
state.increase += sample.value - sv.value
} else {
// counter reset
state.increase += sample.value
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
} else {
sv = getRateAggrSharedValue(av.isGreen)
sv.prevTimestamp = sample.timestamp
key = bytesutil.InternString(key)
av.shared[key] = sv
state = sv.getState(av.isGreen)
}
sv.value = sample.value
sv.deleteDeadline = deleteDeadline
state.timestamp = sample.timestamp
}
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
ac := c.(*rateAggrConfig)
var state *rateAggrStateValue
suffix := ac.getSuffix()
rate := 0.0
countSeries := 0
for sk, sv := range av.shared {
if ctx.flushTimestamp > sv.deleteDeadline {
delete(av.shared, sk)
putRateAggrSharedValue(sv)
continue
}
if sv.prevTimestamp == 0 {
continue
}
if av.isGreen {
state = sv.green
} else {
state = sv.blue
}
d := float64(state.timestamp-sv.prevTimestamp) / 1000
if d > 0 {
rate += state.increase / d
countSeries++
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
sv.prevTimestamp = state.timestamp
state.timestamp = 0
state.increase = 0
av.shared[sk] = sv
}
if countSeries == 0 {
return
}
if ac.isAvg {
rate /= float64(countSeries)
}
ctx.appendSeries(key, suffix, rate)
}
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func (av *rateAggrValue) state() any {
return av.shared
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func newRateAggrConfig(isAvg bool) aggrConfig {
return &rateAggrConfig{
isAvg: isAvg,
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691 - Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
}
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
type rateAggrConfig struct {
isAvg bool
}
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func (*rateAggrConfig) getValue(s any) aggrValue {
var shared map[string]*rateAggrSharedValue
if s == nil {
shared = make(map[string]*rateAggrSharedValue)
} else {
shared = s.(map[string]*rateAggrSharedValue)
}
return &rateAggrValue{
shared: shared,
isGreen: s != nil,
}
}
lib/streamaggr: added aggregation windows (#6314) ### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com> (cherry picked from commit c8fc90366944fe07115c3571b9f7cb7864b05886) Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 12:19:33 +00:00
func (ac *rateAggrConfig) getSuffix() string {
if ac.isAvg {
return "rate_avg"
}
return "rate_sum"
}