From ac3cf3f35724f3bcb1ba8016a5a65d4785a979c0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 4 Mar 2024 05:42:55 +0200 Subject: [PATCH] lib/streamaggr: enable time alignment for aggregate flushed to multiples of interval For example, if `interval: 1m`, then data flush occurs at the end of every minute, while `interval: 1h` leads to data flush at the end of every hour. Add `no_align_flush_to_interval` option, which can be used for disabling the alignment. --- app/vmagent/remotewrite/remotewrite.go | 23 ++- app/vminsert/common/streamaggr.go | 20 ++- docs/CHANGELOG.md | 5 +- docs/stream-aggregation.md | 36 +++- lib/streamaggr/streamaggr.go | 212 ++++++++++++++++------- lib/streamaggr/streamaggr_test.go | 30 ++-- lib/streamaggr/streamaggr_timing_test.go | 13 +- 7 files changed, 228 insertions(+), 111 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index c568e2a36..ee29d77ce 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -89,8 +89,9 @@ var ( streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+ "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ - "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+ + "by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html") disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+ "See also -remoteWrite.dropSamplesOnOverload") @@ -739,8 +740,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in // Initialize sas sasFile := streamAggrConfig.GetOptionalArg(argIdx) if sasFile != "" { - dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) - sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval) + opts := &streamaggr.Options{ + DedupInterval: streamAggrDedupInterval.GetOptionalArg(argIdx), + } + sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) if err != nil { logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) } @@ -894,8 +897,10 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() { logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() - dedupInterval := streamAggrDedupInterval.GetOptionalArg(rwctx.idx) - sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval) + opts := &streamaggr.Options{ + DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx), + } + sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) if err != nil { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0) @@ -937,8 +942,10 @@ func CheckStreamAggrConfigs() error { if sasFile == "" { continue } - dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx) - sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval) + opts := &streamaggr.Options{ + DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), + } + sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts) if err != nil { return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err) } diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index e517eb758..d5e0730c2 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -27,8 +27,9 @@ var ( streamAggrDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation with -streamAggr.config. "+ "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ "See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ - "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") + streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+ + "by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html") ) var ( @@ -49,7 +50,10 @@ func CheckStreamAggrConfig() error { return nil } pushNoop := func(tss []prompbmarshal.TimeSeries) {} - sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, *streamAggrDedupInterval) + opts := &streamaggr.Options{ + DedupInterval: *streamAggrDedupInterval, + } + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts) if err != nil { return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) } @@ -69,7 +73,10 @@ func InitStreamAggr() { sighupCh := procutil.NewSighupChan() - sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) + opts := &streamaggr.Options{ + DedupInterval: *streamAggrDedupInterval, + } + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } @@ -96,7 +103,10 @@ func reloadStreamAggrConfig() { logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) saCfgReloads.Inc() - sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) + opts := &streamaggr.Options{ + DedupInterval: *streamAggrDedupInterval, + } + sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { saCfgSuccess.Set(0) saCfgReloadErr.Inc() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2b01abf3e..2099ea007 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -31,8 +31,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%. -* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). -* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `no_align_flush_to_interval` option for disabling time alignment for aggregated data flush. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states. diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 32f10ce9a..909984d20 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -62,6 +62,20 @@ In this case the [de-duplication](https://docs.victoriametrics.com/#deduplicatio De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling). +## Flush time alignment + +By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config). +For example: +- if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute +- if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour + +If you do not need such an alignment, then set `no_align_flush_to_interval: true` option in the [aggregate config](#stream-aggregation-config). +In this case aggregated data flushes will be aligned to the `vmagent` start time or to [config reload](#configuration-update) time. + +The aggregated data on the first and the last interval is dropped during `vmagent` start, restart or [config reload](#configuration-update), +since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data. +If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config). + ## Use cases Stream aggregation can be used in the following cases: @@ -423,10 +437,6 @@ and then sent to the storage once per `interval`. The aggregated samples are nam If `by` and `without` lists are specified in the [config](#stream-aggregation-config), then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`. -On vmagent shutdown or [configuration reload](#configuration-update) unfinished aggregated states are discarded, -as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in -aggregation config to make vmagent to send unfinished states to the remote storage. - Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config): * [avg](#avg) @@ -809,7 +819,8 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # Samples are de-duplicated on a per-series basis. See https://docs.victoriametrics.com/keyconcepts/#time-series # and https://docs.victoriametrics.com/#deduplication # The deduplication is performed after input_relabel_configs relabeling is applied. - # By default the deduplication is disabled. + # By default the deduplication is disabled unless -remoteWrite.streamAggr.dedupInterval or -streamAggr.dedupInterval + # command-line flags are set. # # dedup_interval: 30s @@ -824,10 +835,17 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # # staleness_interval: 2m - # flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts - # or config reloads. It is not recommended changing this setting, unless unfinished aggregations states - # are preferred to missing data points. - # Unfinished aggregation states aren't flushed on shutdown by default. + # no_align_flush_to_interval disables aligning of flush times for the aggregated data to multiples of interval. + # By default flush times for the aggregated data is aligned to multiples of interval. + # For example: + # - if `interval: 1m` is set, then flushes happen at the end of every minute, + # - if `interval: 1h` is set, then flushes happen at the end of every hour + # + # no_align_flush_to_interval: false + + # flush_on_shutdown instructs to flush aggregated data to the storage on the first and the last intervals + # during vmagent starts, restarts or configuration reloads. + # Incomplete aggregated data isn't flushed to the storage by default, since it is usually confusing. # # flush_on_shutdown: false diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index e3860c183..19e0be433 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" "gopkg.in/yaml.v2" ) @@ -45,11 +46,10 @@ var supportedOutputs = []string{ // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // -// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, -// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// opts can contain additional options. If opts is nil, then default options are used. // // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { +func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, error) { data, err := fscore.ReadFileOrHTTP(path) if err != nil { return nil, fmt.Errorf("cannot load aggregators: %w", err) @@ -59,7 +59,7 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) ( return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) } - as, err := newAggregatorsFromData(data, pushFunc, dedupInterval) + as, err := newAggregatorsFromData(data, pushFunc, opts) if err != nil { return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } @@ -67,12 +67,40 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) ( return as, nil } -func newAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { +func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) } - return NewAggregators(cfgs, pushFunc, dedupInterval) + return NewAggregators(cfgs, pushFunc, opts) +} + +// Options contains optional settings for the Aggregators. +type Options struct { + // DedupInterval is deduplication interval for samples received for the same time series. + // + // The last sample per each series is left per each DedupInterval if DedupInterval > 0. + // + // By default deduplication is disabled. + DedupInterval time.Duration + + // NoAlignFlushToInterval disables alignment of flushes to the aggregation interval. + // + // By default flushes are aligned to aggregation interval. + NoAlignFlushToInterval bool + + // FlushOnShutdown enables flush of incomplete state on shutdown. + // + // By default incomplete state is dropped on shutdown. + FlushOnShutdown bool + + // KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix. + // + // By default the following suffix is added to every output time series: + // + // input_name:[_by_][_without_]_ + // + KeepMetricNames bool } // Config is a configuration for a single stream aggregation. @@ -85,6 +113,16 @@ type Config struct { // Interval is the interval between aggregations. Interval string `yaml:"interval"` + // NoAlighFlushToInterval disables aligning of flushes to multiples of Interval. + // By default flushes are aligned to Interval. + // + // See also FlushOnShutdown. + NoAlignFlushToInterval *bool `yaml:"no_align_flush_to_interval,omitempty"` + + // FlushOnShutdown defines whether to flush the aggregation state on process termination + // or config reload. By default the state is dropped on these events. + FlushOnShutdown *bool `yaml:"flush_on_shutdown,omitempty"` + // DedupInterval is an optional interval for deduplication. DedupInterval string `yaml:"dedup_interval,omitempty"` @@ -121,9 +159,8 @@ type Config struct { // Outputs []string `yaml:"outputs"` - // KeepMetricNames instructs to leave metric names as is for the output time series - // without adding any suffix. - KeepMetricNames bool `yaml:"keep_metric_names,omitempty"` + // KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix. + KeepMetricNames *bool `yaml:"keep_metric_names,omitempty"` // By is an optional list of labels for grouping input series. // @@ -148,10 +185,6 @@ type Config struct { // OutputRelabelConfigs is an optional relabeling rules, which are applied // on the aggregated output before being sent to remote storage. OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` - - // FlushOnShutdown defines whether to flush the aggregation state on process termination - // or config reload. Is `false` by default. - FlushOnShutdown bool `yaml:"flush_on_shutdown,omitempty"` } // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. @@ -169,15 +202,14 @@ type Aggregators struct { // // pushFunc is called when the aggregated data must be flushed. // -// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, -// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// opts can contain additional options. If opts is nil, then default options are used. // // MustStop must be called on the returned Aggregators when they are no longer needed. -func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { +func NewAggregators(cfgs []*Config, pushFunc PushFunc, opts *Options) (*Aggregators, error) { ms := metrics.NewSet() as := make([]*aggregator, len(cfgs)) for i, cfg := range cfgs { - a, err := newAggregator(cfg, pushFunc, ms, dedupInterval) + a, err := newAggregator(cfg, pushFunc, ms, opts) if err != nil { // Stop already initialized aggregators before returning the error. for _, a := range as[:i] { @@ -294,19 +326,15 @@ type aggregator struct { without []string aggregateOnlyByTime bool - // da is set to non-nil if input samples must be de-duplicated according - // to the dedupInterval passed to newAggregator(). + // da is set to non-nil if input samples must be de-duplicated da *dedupAggr // aggrStates contains aggregate states for the given outputs aggrStates []aggrState - // lc is used for compressing series keys before passing them to dedupAggr and aggrState. + // lc is used for compressing series keys before passing them to dedupAggr and aggrState lc promutils.LabelsCompressor - // pushFunc is the callback, which is called by aggrState when flushing its state. - pushFunc PushFunc - // suffix contains a suffix, which should be added to aggregate metric names // // It contains the interval, labels in (by, without), plus output name. @@ -314,10 +342,6 @@ type aggregator struct { // for `interval: 1m`, `by: [job]` suffix string - // flushOnShutdown defines whether to flush the state of aggregation - // on MustStop call. - flushOnShutdown bool - wg sync.WaitGroup stopCh chan struct{} @@ -338,11 +362,14 @@ type PushFunc func(tss []prompbmarshal.TimeSeries) // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. // -// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, -// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// opts can contain additional options. If opts is nil, then default options are used. // // The returned aggregator must be stopped when no longer needed by calling MustStop(). -func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterval time.Duration) (*aggregator, error) { +func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Options) (*aggregator, error) { + if opts == nil { + opts = &Options{} + } + // check cfg.Interval interval, err := time.ParseDuration(cfg.Interval) if err != nil { @@ -353,6 +380,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva } // check cfg.DedupInterval + dedupInterval := opts.DedupInterval if cfg.DedupInterval != "" { di, err := time.ParseDuration(cfg.DedupInterval) if err != nil { @@ -401,7 +429,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva } // check cfg.KeepMetricNames - if cfg.KeepMetricNames { + keepMetricNames := opts.KeepMetricNames + if v := cfg.KeepMetricNames; v != nil { + keepMetricNames = *v + } + if keepMetricNames { if len(cfg.Outputs) != 1 { return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs) } @@ -495,17 +527,15 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva inputRelabeling: inputRelabeling, outputRelabeling: outputRelabeling, - keepMetricNames: cfg.KeepMetricNames, + keepMetricNames: keepMetricNames, by: by, without: without, aggregateOnlyByTime: aggregateOnlyByTime, aggrStates: aggrStates, - pushFunc: pushFunc, - suffix: suffix, - flushOnShutdown: cfg.FlushOnShutdown, + suffix: suffix, stopCh: make(chan struct{}), @@ -519,33 +549,90 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva a.da = newDedupAggr() } + alignFlushToInterval := !opts.NoAlignFlushToInterval + if v := cfg.NoAlignFlushToInterval; v != nil { + alignFlushToInterval = !*v + } + + skipIncompleteFlush := !opts.FlushOnShutdown + if v := cfg.FlushOnShutdown; v != nil { + skipIncompleteFlush = !*v + } + a.wg.Add(1) go func() { - a.runFlusher(interval, dedupInterval) + a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval) a.wg.Done() }() return a, nil } -func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) { - tickerFlush := time.NewTicker(interval) - defer tickerFlush.Stop() +func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) { + flushTickerCh := make(chan *time.Ticker, 1) + dedupFlushTickerCh := make(chan *time.Ticker, 1) + go func() { + if !alignFlushToInterval { + flushTickerCh <- time.NewTicker(interval) + if dedupInterval > 0 { + dedupFlushTickerCh <- time.NewTicker(dedupInterval) + } + return + } - var dedupTickerCh <-chan time.Time - if dedupInterval > 0 { - t := time.NewTicker(dedupInterval) - defer t.Stop() - dedupTickerCh = t.C - } + sleep := func(d time.Duration) { + timer := timerpool.Get(d) + defer timerpool.Put(timer) + select { + case <-a.stopCh: + case <-timer.C: + } + } + currentTime := time.Duration(time.Now().UnixNano()) + if dedupInterval > 0 { + d := dedupInterval - (currentTime % dedupInterval) + if d < dedupInterval { + sleep(d) + } + dedupFlushTickerCh <- time.NewTicker(dedupInterval) + currentTime += d + } + d := interval - (currentTime % interval) + if d < interval { + sleep(d) + } + flushTickerCh <- time.NewTicker(interval) + }() + var flushTickerC <-chan time.Time + var dedupFlushTickerC <-chan time.Time + isFirstFlush := true for { select { case <-a.stopCh: + if !skipIncompleteFlush { + if dedupInterval > 0 { + a.dedupFlush() + } + a.flush(pushFunc) + } return - case <-tickerFlush.C: + case flushTicker := <-flushTickerCh: + flushTickerC = flushTicker.C + defer flushTicker.Stop() + case dedupFlushTicker := <-dedupFlushTickerCh: + dedupFlushTickerC = dedupFlushTicker.C + defer dedupFlushTicker.Stop() + case <-flushTickerC: + if isFirstFlush { + isFirstFlush = false + if alignFlushToInterval && skipIncompleteFlush { + a.flush(nil) + continue + } + } startTime := time.Now() - a.flush() + a.flush(pushFunc) d := time.Since(startTime) a.flushDuration.Update(d.Seconds()) if d > interval { @@ -554,7 +641,7 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) { "possible solutions: increase interval; use match filter matching smaller number of series; "+ "reduce samples' ingestion rate to stream aggregation", interval, d) } - case <-dedupTickerCh: + case <-dedupFlushTickerC: startTime := time.Now() a.dedupFlush() d := time.Since(startTime) @@ -573,7 +660,7 @@ func (a *aggregator) dedupFlush() { a.da.flush(a.pushSamples) } -func (a *aggregator) flush() { +func (a *aggregator) flush(pushFunc PushFunc) { var wg sync.WaitGroup for _, as := range a.aggrStates { flushConcurrencyCh <- struct{}{} @@ -584,7 +671,7 @@ func (a *aggregator) flush() { wg.Done() }() - ctx := getFlushCtx(a) + ctx := getFlushCtx(a, pushFunc) as.flushState(ctx) ctx.flushSeries() putFlushCtx(ctx) @@ -601,16 +688,6 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) func (a *aggregator) MustStop() { close(a.stopCh) a.wg.Wait() - - if !a.flushOnShutdown { - return - } - - // Flush the remaining data from the last interval if needed. - if a.da != nil { - a.dedupFlush() - } - a.flush() } // Push pushes tss to a. @@ -770,13 +847,14 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, return dstInput, dstOutput } -func getFlushCtx(a *aggregator) *flushCtx { +func getFlushCtx(a *aggregator, pushFunc PushFunc) *flushCtx { v := flushCtxPool.Get() if v == nil { v = &flushCtx{} } ctx := v.(*flushCtx) ctx.a = a + ctx.pushFunc = pushFunc return ctx } @@ -788,7 +866,8 @@ func putFlushCtx(ctx *flushCtx) { var flushCtxPool sync.Pool type flushCtx struct { - a *aggregator + a *aggregator + pushFunc PushFunc tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label @@ -797,6 +876,7 @@ type flushCtx struct { func (ctx *flushCtx) reset() { ctx.a = nil + ctx.pushFunc = nil ctx.resetSeries() } @@ -819,7 +899,9 @@ func (ctx *flushCtx) flushSeries() { outputRelabeling := ctx.a.outputRelabeling if outputRelabeling == nil { // Fast path - push the output metrics. - ctx.a.pushFunc(tss) + if ctx.pushFunc != nil { + ctx.pushFunc(tss) + } return } @@ -838,7 +920,9 @@ func (ctx *flushCtx) flushSeries() { ts.Labels = dstLabels[dstLabelsLen:] dst = append(dst, ts) } - ctx.a.pushFunc(dst) + if ctx.pushFunc != nil { + ctx.pushFunc(dst) + } auxLabels.Labels = dstLabels promutils.PutLabels(auxLabels) diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 5e10ade64..d00bc4d85 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -20,7 +20,7 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(tss []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) + a, err := newAggregatorsFromData([]byte(config), pushFunc, nil) if err == nil { t.Fatalf("expecting non-nil error") } @@ -158,11 +158,11 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(tss []prompbmarshal.TimeSeries) {} - aa, err := newAggregatorsFromData([]byte(a), pushFunc, 0) + aa, err := newAggregatorsFromData([]byte(a), pushFunc, nil) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := newAggregatorsFromData([]byte(b), pushFunc, 0) + ab, err := newAggregatorsFromData([]byte(b), pushFunc, nil) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -220,15 +220,14 @@ func TestAggregatorsSuccess(t *testing.T) { } tssOutputLock.Unlock() } - a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) + opts := &Options{ + FlushOnShutdown: true, + NoAlignFlushToInterval: true, + } + a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - for _, ag := range a.as { - // explicitly set flushOnShutdown, so aggregations results - // are immediately available after a.MustStop() call. - ag.flushOnShutdown = true - } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) @@ -862,8 +861,11 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { } tssOutputLock.Unlock() } - const dedupInterval = 30 * time.Second - a, err := newAggregatorsFromData([]byte(config), pushFunc, dedupInterval) + opts := &Options{ + DedupInterval: 30 * time.Second, + FlushOnShutdown: true, + } + a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -871,12 +873,6 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) matchIdxs := a.Push(tssInput, nil) - if a != nil { - for _, aggr := range a.as { - aggr.dedupFlush() - aggr.flush() - } - } a.MustStop() // Verify matchIdxs equals to matchIdxsExpected diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index c47280312..ad220eee6 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -44,7 +44,8 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) { } func benchmarkAggregatorsFlushSerial(b *testing.B, output string) { - a := newBenchAggregators(output) + pushFunc := func(tss []prompbmarshal.TimeSeries) {} + a := newBenchAggregators(output, pushFunc) defer a.MustStop() var matchIdxs []byte @@ -54,13 +55,14 @@ func benchmarkAggregatorsFlushSerial(b *testing.B, output string) { for i := 0; i < b.N; i++ { matchIdxs = a.Push(benchSeries, matchIdxs) for _, aggr := range a.as { - aggr.flush() + aggr.flush(pushFunc) } } } func benchmarkAggregatorsPush(b *testing.B, output string) { - a := newBenchAggregators(output) + pushFunc := func(tss []prompbmarshal.TimeSeries) {} + a := newBenchAggregators(output, pushFunc) defer a.MustStop() const loops = 100 @@ -77,15 +79,14 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { }) } -func newBenchAggregators(output string) *Aggregators { +func newBenchAggregators(output string, pushFunc PushFunc) *Aggregators { config := fmt.Sprintf(` - match: http_requests_total interval: 24h without: [job] outputs: [%q] `, output) - pushFunc := func(tss []prompbmarshal.TimeSeries) {} - a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) + a, err := newAggregatorsFromData([]byte(config), pushFunc, nil) if err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) }