lib/streamaggr: follow-up for the commit c0e4ccb7b5

- Clarify docs for `Ignore aggregation intervals on start` feature.

- Make more clear the code dealing with ignoreFirstIntervals at aggregator.runFlusher() functions.
  It is better from readability and maintainability PoV using distinct a.flush() calls
  for distinct cases instead of merging them into a single a.flush() call.

- Take into account the first incomplete interval when tracking the number of skipped aggregation intervals,
  since this behaviour is easier to understand by the end users.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6137
This commit is contained in:
Aliaksandr Valialkin 2024-07-02 21:21:35 +02:00
parent ffb49c677b
commit f17b408643
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 44 additions and 31 deletions

View file

@ -32,10 +32,10 @@ var (
"See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") "See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+ streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
) )
var ( var (

View file

@ -22,6 +22,8 @@ after applying all the configured [relabeling stages](https://docs.victoriametri
_By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples). _By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples).
It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples)._ It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples)._
## Configuration
Stream aggregation can be configured via the following command-line flags: Stream aggregation can be configured via the following command-line flags:
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
@ -128,25 +130,30 @@ outside the current [aggregation interval](#stream-aggregation-config) must be i
## Ignore aggregation intervals on start ## Ignore aggregation intervals on start
Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is Streaming aggregation results may be incorrect for some time after the restart of [vmagent](https://docs.victoriametrics.com/vmagent/)
received from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully or [single-node VictoriaMetrics](https://docs.victoriametrics.com/) until all the buffered [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples)
cleared within the aggregation `interval`, only a portion of the time series may be processed, leading to distorted are sent from remote sources to the `vmagent` or single-node VictoriaMetrics via [supported data ingestion protocols](https://docs.victoriametrics.com/vmagent/#how-to-push-data-to-vmagent).
calculations. To mitigate this, consider the following options: In this case it may be a good idea to drop the aggregated data during the first `N` [aggrgation intervals](#stream-aggregation-config)
just after the restart of `vmagent` or single-node VictoriaMetrics. This can be done via the following options:
- Set `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/) - The `-streamAggr.ignoreFirstIntervals=N` command-line flag at `vmagent` and single-node VictoriaMetrics. This flag instructs skipping the first `N`
or to [vmagent](https://docs.victoriametrics.com/vmagent/) to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config) [aggregation intervals](#stream-aggregation-config) just after the restart accross all the [configured stream aggregation configs](#configuration).
from persisting to the storage. At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.ignoreFirstIntervals=<intervalsCount>` flag can be specified individually per each `-remoteWrite.url`.
It is expected that all incomplete or queued data will be processed during specified `<intervalsCount>`
and all subsequent aggregation intervals will produce correct data.
- Set `ignore_first_intervals: <intervalsCount>` option individually per [aggregation config](#stream-aggregation-config). The `-remoteWrite.streamAggr.ignoreFirstIntervals=N` command-line flag can be specified individually per each `-remoteWrite.url` at [vmagent](https://docs.victoriametrics.com/vmagent/).
This enables ignoring first `<intervalsCount>` aggregation intervals for that particular aggregation config.
- The `ignore_first_intervals: N` option at the particular [aggregation config](#stream-aggregation-config).
See also:
- [Flush time alignment](#flush-time-alignment)
- [Ignoring old samples](#ignoring-old-samples)
## Flush time alignment ## Flush time alignment
By default, the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). By default, the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config).
For example: For example:
- if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute - if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute
- if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour - if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour
@ -157,6 +164,11 @@ The aggregated data on the first and the last interval is dropped during `vmagen
since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data. since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data.
If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config). If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config).
See also:
- [Ignore aggregation intervals on start](#ignore-aggregation-intervals-on-start)
- [Ignoring old samples](#ignoring-old-samples)
## Use cases ## Use cases
Stream aggregation can be used in the following cases: Stream aggregation can be used in the following cases:
@ -994,15 +1006,15 @@ specified individually per each `-remoteWrite.url`:
# ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval. # ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval.
# See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples # See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
# See also -remoteWrite.streamAggr.ignoreOldSamples or -streamAggr.ignoreOldSamples command-line flag. # See also -remoteWrite.streamAggr.ignoreOldSamples and -streamAggr.ignoreOldSamples command-line flag.
# #
# ignore_old_samples: false # ignore_old_samples: false
# ignore_first_intervals instructs ignoring first N aggregation intervals after process start. # ignore_first_intervals instructs ignoring the first N aggregation intervals after process start.
# See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start # See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
# See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals command-line flag. # See also -remoteWrite.streamAggr.ignoreFirstIntervals and -streamAggr.ignoreFirstIntervals command-line flags.
# #
# ignore_first_intervals: false # ignore_first_intervals: N
# drop_input_labels instructs dropping the given labels from input samples. # drop_input_labels instructs dropping the given labels from input samples.
# The labels' dropping is performed before input_relabel_configs are applied. # The labels' dropping is performed before input_relabel_configs are applied.

View file

@ -130,9 +130,9 @@ type Options struct {
// This option can be overridden individually per each aggregation via ignore_old_samples option. // This option can be overridden individually per each aggregation via ignore_old_samples option.
IgnoreOldSamples bool IgnoreOldSamples bool
// IgnoreFirstIntervals sets amount of aggregation intervals to ignore on start. // IgnoreFirstIntervals sets the number of aggregation intervals to be ignored on start.
// //
// By default, no intervals will be ignored. // By default, zero intervals are ignored.
// //
// This option can be overridden individually per each aggregation via ignore_first_intervals option. // This option can be overridden individually per each aggregation via ignore_first_intervals option.
IgnoreFirstIntervals int IgnoreFirstIntervals int
@ -715,15 +715,16 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if alignFlushToInterval && skipIncompleteFlush { if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil, interval, true) a.flush(nil, interval, true)
ignoreFirstIntervals--
} }
for tickerWait(t) { for tickerWait(t) {
pf := pushFunc
if ignoreFirstIntervals > 0 { if ignoreFirstIntervals > 0 {
pf = nil a.flush(nil, interval, true)
ignoreFirstIntervals-- ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
} }
a.flush(pf, interval, true)
if alignFlushToInterval { if alignFlushToInterval {
select { select {
@ -744,17 +745,17 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
ct := time.Now() ct := time.Now()
if ct.After(flushDeadline) { if ct.After(flushDeadline) {
pf := pushFunc
if ignoreFirstIntervals > 0 {
pf = nil
ignoreFirstIntervals--
}
// It is time to flush the aggregated state // It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
pf = nil a.flush(nil, interval, true)
ignoreFirstIntervals--
isSkippedFirstFlush = true isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil, interval, true)
ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
} }
a.flush(pf, interval, true)
for ct.After(flushDeadline) { for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval) flushDeadline = flushDeadline.Add(interval)
} }
@ -769,7 +770,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
} }
} }
if !skipIncompleteFlush && ignoreFirstIntervals == 0 { if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
a.dedupFlush(dedupInterval) a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval, true) a.flush(pushFunc, interval, true)
} }