diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index c5ec26ce9..b3b2fa773 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -32,10 +32,10 @@ var ( "See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") - streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+ - "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") + streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after restarts. It could be caused by receiving unordered delayed data from clients pushing data into the database. "+ + "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") ) var ( diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index b8adb18ed..971f6275c 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -22,6 +22,8 @@ after applying all the configured [relabeling stages](https://docs.victoriametri _By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples). It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples)._ +## Configuration + Stream aggregation can be configured via the following command-line flags: - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) @@ -128,25 +130,30 @@ outside the current [aggregation interval](#stream-aggregation-config) must be i ## Ignore aggregation intervals on start -Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is -received from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully -cleared within the aggregation `interval`, only a portion of the time series may be processed, leading to distorted -calculations. To mitigate this, consider the following options: +Streaming aggregation results may be incorrect for some time after the restart of [vmagent](https://docs.victoriametrics.com/vmagent/) +or [single-node VictoriaMetrics](https://docs.victoriametrics.com/) until all the buffered [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples) +are sent from remote sources to the `vmagent` or single-node VictoriaMetrics via [supported data ingestion protocols](https://docs.victoriametrics.com/vmagent/#how-to-push-data-to-vmagent). +In this case it may be a good idea to drop the aggregated data during the first `N` [aggrgation intervals](#stream-aggregation-config) +just after the restart of `vmagent` or single-node VictoriaMetrics. This can be done via the following options: -- Set `-streamAggr.ignoreFirstIntervals=` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/) - or to [vmagent](https://docs.victoriametrics.com/vmagent/) to skip first `` [aggregation intervals](#stream-aggregation-config) - from persisting to the storage. At [vmagent](https://docs.victoriametrics.com/vmagent/) - `-remoteWrite.streamAggr.ignoreFirstIntervals=` flag can be specified individually per each `-remoteWrite.url`. - It is expected that all incomplete or queued data will be processed during specified `` - and all subsequent aggregation intervals will produce correct data. +- The `-streamAggr.ignoreFirstIntervals=N` command-line flag at `vmagent` and single-node VictoriaMetrics. This flag instructs skipping the first `N` + [aggregation intervals](#stream-aggregation-config) just after the restart accross all the [configured stream aggregation configs](#configuration). -- Set `ignore_first_intervals: ` option individually per [aggregation config](#stream-aggregation-config). - This enables ignoring first `` aggregation intervals for that particular aggregation config. + The `-remoteWrite.streamAggr.ignoreFirstIntervals=N` command-line flag can be specified individually per each `-remoteWrite.url` at [vmagent](https://docs.victoriametrics.com/vmagent/). + +- The `ignore_first_intervals: N` option at the particular [aggregation config](#stream-aggregation-config). + +See also: + +- [Flush time alignment](#flush-time-alignment) +- [Ignoring old samples](#ignoring-old-samples) ## Flush time alignment By default, the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). + For example: + - if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute - if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour @@ -157,6 +164,11 @@ The aggregated data on the first and the last interval is dropped during `vmagen since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data. If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config). +See also: + +- [Ignore aggregation intervals on start](#ignore-aggregation-intervals-on-start) +- [Ignoring old samples](#ignoring-old-samples) + ## Use cases Stream aggregation can be used in the following cases: @@ -994,15 +1006,15 @@ specified individually per each `-remoteWrite.url`: # ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval. # See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples - # See also -remoteWrite.streamAggr.ignoreOldSamples or -streamAggr.ignoreOldSamples command-line flag. + # See also -remoteWrite.streamAggr.ignoreOldSamples and -streamAggr.ignoreOldSamples command-line flag. # # ignore_old_samples: false - # ignore_first_intervals instructs ignoring first N aggregation intervals after process start. + # ignore_first_intervals instructs ignoring the first N aggregation intervals after process start. # See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start - # See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals command-line flag. + # See also -remoteWrite.streamAggr.ignoreFirstIntervals and -streamAggr.ignoreFirstIntervals command-line flags. # - # ignore_first_intervals: false + # ignore_first_intervals: N # drop_input_labels instructs dropping the given labels from input samples. # The labels' dropping is performed before input_relabel_configs are applied. diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 831cd500f..c020793c3 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -130,9 +130,9 @@ type Options struct { // This option can be overridden individually per each aggregation via ignore_old_samples option. IgnoreOldSamples bool - // IgnoreFirstIntervals sets amount of aggregation intervals to ignore on start. + // IgnoreFirstIntervals sets the number of aggregation intervals to be ignored on start. // - // By default, no intervals will be ignored. + // By default, zero intervals are ignored. // // This option can be overridden individually per each aggregation via ignore_first_intervals option. IgnoreFirstIntervals int @@ -715,15 +715,16 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if alignFlushToInterval && skipIncompleteFlush { a.flush(nil, interval, true) + ignoreFirstIntervals-- } for tickerWait(t) { - pf := pushFunc if ignoreFirstIntervals > 0 { - pf = nil + a.flush(nil, interval, true) ignoreFirstIntervals-- + } else { + a.flush(pushFunc, interval, true) } - a.flush(pf, interval, true) if alignFlushToInterval { select { @@ -744,17 +745,17 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc ct := time.Now() if ct.After(flushDeadline) { - pf := pushFunc - if ignoreFirstIntervals > 0 { - pf = nil - ignoreFirstIntervals-- - } // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - pf = nil + a.flush(nil, interval, true) + ignoreFirstIntervals-- isSkippedFirstFlush = true + } else if ignoreFirstIntervals > 0 { + a.flush(nil, interval, true) + ignoreFirstIntervals-- + } else { + a.flush(pushFunc, interval, true) } - a.flush(pf, interval, true) for ct.After(flushDeadline) { flushDeadline = flushDeadline.Add(interval) } @@ -769,7 +770,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } - if !skipIncompleteFlush && ignoreFirstIntervals == 0 { + if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { a.dedupFlush(dedupInterval) a.flush(pushFunc, interval, true) }