diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 41f41dc5f..63d0b91d7 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -105,7 +105,8 @@ var ( "with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+ "for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") - streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ + streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent.") + streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ @@ -857,9 +858,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx) if sasFile != "" { opts := &streamaggr.Options{ - DedupInterval: dedupInterval, - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: ignoreOldSamples, + DedupInterval: dedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: ignoreOldSamples, + IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, } sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) if err != nil { diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 680956428..e1cd6146a 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -32,7 +32,8 @@ var ( "See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") - streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+ + streamAggrIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent.") + streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") ) @@ -56,9 +57,10 @@ func CheckStreamAggrConfig() error { } pushNoop := func(_ []prompbmarshal.TimeSeries) {} opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: *streamAggrIgnoreOldSamples, + DedupInterval: *streamAggrDedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: *streamAggrIgnoreOldSamples, + IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts) if err != nil { @@ -84,9 +86,10 @@ func InitStreamAggr() { sighupCh := procutil.NewSighupChan() opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: *streamAggrIgnoreOldSamples, + DedupInterval: *streamAggrDedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: *streamAggrIgnoreOldSamples, + IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { @@ -117,9 +120,10 @@ func reloadStreamAggrConfig() { saCfgReloads.Inc() opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: *streamAggrIgnoreOldSamples, + DedupInterval: *streamAggrDedupInterval, + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: *streamAggrIgnoreOldSamples, + IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, } sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 5b7027c2f..c100c059d 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -92,6 +92,18 @@ must be ignored, then the following options can be used: - To set `ignore_old_samples: true` option at the particular [aggregation config](#stream-aggregation-config). This enables ignoring old samples for that particular aggregation config. +## Ignore aggregation intervals on start + +Stream aggregation may yield inaccurate results if it processes incomplete data. This issue can arise when data is sourced from clients that maintain a queue of unsent data, such as Prometheus or vmagent. If the queue isn't fully cleared within the aggregation interval, only a portion of the time series may be included in that period, leading to distorted calculations. To mitigate this, consider the following options: + +- Set `-remoteWrite.streamAggr.ignoreFirstIntervals=` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/) + or `-streamAggr.ignoreFirstIntervals=` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/) to skip first `` [aggregation intervals](#stream-aggregation-config) + from persisting to the storage. It is expected that all incomplete or queued data will be processed during + specified `` and all subsequent aggregation intervals will produce correct data. + +- To set `ignore_first_intervals: ` option at the particular [aggregation config](#stream-aggregation-config). + This enables ignoring first `` aggregation intervals for that particular aggregation config. + ## Flush time alignment By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 06d281f69..055af0436 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -111,6 +111,13 @@ type Options struct { // // This option can be overridden individually per each aggregation via ignore_old_samples option. IgnoreOldSamples bool + + // IgnoreFirstIntervals sets amount of intervals to ignore on start + // + // By default no intervals will be ignored. + // + // This option can be overridden individually per each aggregation via ignore_intervals_on_start option. + IgnoreFirstIntervals int } // Config is a configuration for a single stream aggregation. @@ -175,6 +182,9 @@ type Config struct { // IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. IgnoreOldSamples *bool `yaml:"ignore_old_samples,omitempty"` + // IgnoreFirstIntervals sets number of aggregation intervals to be ignored on start. + IgnoreFirstIntervals *int `yaml:"ignore_first_intervals,omitempty"` + // By is an optional list of labels for grouping input series. // // See also Without. @@ -479,6 +489,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option ignoreOldSamples = *v } + // check cfg.IgnoreFirstIntervals + ignoreFirstIntervals := opts.IgnoreFirstIntervals + if v := cfg.IgnoreFirstIntervals; v != nil { + ignoreFirstIntervals = *v + } + // initialize outputs list if len(cfg.Outputs) == 0 { return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ @@ -600,14 +616,14 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option a.wg.Add(1) go func() { - a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval) + a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals) a.wg.Done() }() return a, nil } -func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) { +func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration, ignoreFirstIntervals int) { alignedSleep := func(d time.Duration) { if !alignFlushToInterval { return @@ -642,7 +658,12 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } for tickerWait(t) { - a.flush(pushFunc, interval, true) + pf := pushFunc + if ignoreFirstIntervals > 0 { + pf = nil + ignoreFirstIntervals-- + } + a.flush(pf, interval, true) if alignFlushToInterval { select { @@ -663,13 +684,17 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc ct := time.Now() if ct.After(flushDeadline) { + pf := pushFunc + if ignoreFirstIntervals > 0 { + pf = nil + ignoreFirstIntervals-- + } // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - a.flush(nil, interval, true) + pf = nil isSkippedFirstFlush = true - } else { - a.flush(pushFunc, interval, true) } + a.flush(pf, interval, true) for ct.After(flushDeadline) { flushDeadline = flushDeadline.Add(interval) } @@ -684,7 +709,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } - if !skipIncompleteFlush { + if !skipIncompleteFlush && ignoreFirstIntervals == 0 { a.dedupFlush(dedupInterval) a.flush(pushFunc, interval, true) }