lib/streamaggr: enable time alignment for aggregate flushed to multiples of interval

For example, if `interval: 1m`, then data flush occurs at the end of every minute,
while `interval: 1h` leads to data flush at the end of every hour.

Add `no_align_flush_to_interval` option, which can be used for disabling the alignment.
This commit is contained in:
Aliaksandr Valialkin 2024-03-04 05:42:55 +02:00
parent 2b8253185b
commit ac3cf3f357
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 228 additions and 111 deletions

View file

@ -89,8 +89,9 @@ var (
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+ streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") "by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+ "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+
"See also -remoteWrite.dropSamplesOnOverload") "See also -remoteWrite.dropSamplesOnOverload")
@ -739,8 +740,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
// Initialize sas // Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx) sasFile := streamAggrConfig.GetOptionalArg(argIdx)
if sasFile != "" { if sasFile != "" {
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) opts := &streamaggr.Options{
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval) DedupInterval: streamAggrDedupInterval.GetOptionalArg(argIdx),
}
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil { if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
} }
@ -894,8 +897,10 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
dedupInterval := streamAggrDedupInterval.GetOptionalArg(rwctx.idx) opts := &streamaggr.Options{
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, dedupInterval) DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx),
}
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil { if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
@ -937,8 +942,10 @@ func CheckStreamAggrConfigs() error {
if sasFile == "" { if sasFile == "" {
continue continue
} }
dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx) opts := &streamaggr.Options{
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval) DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
}
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts)
if err != nil { if err != nil {
return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err) return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
} }

View file

@ -27,8 +27,9 @@ var (
streamAggrDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation with -streamAggr.config. "+ streamAggrDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation with -streamAggr.config. "+
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
"See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") "See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") "by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
) )
var ( var (
@ -49,7 +50,10 @@ func CheckStreamAggrConfig() error {
return nil return nil
} }
pushNoop := func(tss []prompbmarshal.TimeSeries) {} pushNoop := func(tss []prompbmarshal.TimeSeries) {}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, *streamAggrDedupInterval) opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts)
if err != nil { if err != nil {
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
} }
@ -69,7 +73,10 @@ func InitStreamAggr() {
sighupCh := procutil.NewSighupChan() sighupCh := procutil.NewSighupChan()
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil { if err != nil {
logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err)
} }
@ -96,7 +103,10 @@ func reloadStreamAggrConfig() {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc() saCfgReloads.Inc()
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
}
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil { if err != nil {
saCfgSuccess.Set(0) saCfgSuccess.Set(0)
saCfgReloadErr.Inc() saCfgReloadErr.Inc()

View file

@ -31,8 +31,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip ## tip
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `no_align_flush_to_interval` option for disabling time alignment for aggregated data flush. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states.

View file

@ -62,6 +62,20 @@ In this case the [de-duplication](https://docs.victoriametrics.com/#deduplicatio
De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling). De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling).
## Flush time alignment
By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config).
For example:
- if `interval: 1m` is set, then the aggregated data is flushed to the storage at the end of every minute
- if `interval: 1h` is set, then the aggregated data is flushed to the storage at the end of every hour
If you do not need such an alignment, then set `no_align_flush_to_interval: true` option in the [aggregate config](#stream-aggregation-config).
In this case aggregated data flushes will be aligned to the `vmagent` start time or to [config reload](#configuration-update) time.
The aggregated data on the first and the last interval is dropped during `vmagent` start, restart or [config reload](#configuration-update),
since the first and the last aggregation intervals are incomplete, so they usually contain incomplete confusing data.
If you need preserving the aggregated data on these intervals, then set `flush_on_shutdown: true` option in the [aggregate config](#stream-aggregation-config).
## Use cases ## Use cases
Stream aggregation can be used in the following cases: Stream aggregation can be used in the following cases:
@ -423,10 +437,6 @@ and then sent to the storage once per `interval`. The aggregated samples are nam
If `by` and `without` lists are specified in the [config](#stream-aggregation-config), If `by` and `without` lists are specified in the [config](#stream-aggregation-config),
then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`. then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`.
On vmagent shutdown or [configuration reload](#configuration-update) unfinished aggregated states are discarded,
as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in
aggregation config to make vmagent to send unfinished states to the remote storage.
Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config): Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config):
* [avg](#avg) * [avg](#avg)
@ -809,7 +819,8 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# Samples are de-duplicated on a per-series basis. See https://docs.victoriametrics.com/keyconcepts/#time-series # Samples are de-duplicated on a per-series basis. See https://docs.victoriametrics.com/keyconcepts/#time-series
# and https://docs.victoriametrics.com/#deduplication # and https://docs.victoriametrics.com/#deduplication
# The deduplication is performed after input_relabel_configs relabeling is applied. # The deduplication is performed after input_relabel_configs relabeling is applied.
# By default the deduplication is disabled. # By default the deduplication is disabled unless -remoteWrite.streamAggr.dedupInterval or -streamAggr.dedupInterval
# command-line flags are set.
# #
# dedup_interval: 30s # dedup_interval: 30s
@ -824,10 +835,17 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# #
# staleness_interval: 2m # staleness_interval: 2m
# flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts # no_align_flush_to_interval disables aligning of flush times for the aggregated data to multiples of interval.
# or config reloads. It is not recommended changing this setting, unless unfinished aggregations states # By default flush times for the aggregated data is aligned to multiples of interval.
# are preferred to missing data points. # For example:
# Unfinished aggregation states aren't flushed on shutdown by default. # - if `interval: 1m` is set, then flushes happen at the end of every minute,
# - if `interval: 1h` is set, then flushes happen at the end of every hour
#
# no_align_flush_to_interval: false
# flush_on_shutdown instructs to flush aggregated data to the storage on the first and the last intervals
# during vmagent starts, restarts or configuration reloads.
# Incomplete aggregated data isn't flushed to the storage by default, since it is usually confusing.
# #
# flush_on_shutdown: false # flush_on_shutdown: false

View file

@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -45,11 +46,10 @@ var supportedOutputs = []string{
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
// //
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, // opts can contain additional options. If opts is nil, then default options are used.
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
// //
// The returned Aggregators must be stopped with MustStop() when no longer needed. // The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
data, err := fscore.ReadFileOrHTTP(path) data, err := fscore.ReadFileOrHTTP(path)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err) return nil, fmt.Errorf("cannot load aggregators: %w", err)
@ -59,7 +59,7 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
} }
as, err := newAggregatorsFromData(data, pushFunc, dedupInterval) as, err := newAggregatorsFromData(data, pushFunc, opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
} }
@ -67,12 +67,40 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (
return as, nil return as, nil
} }
func newAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
var cfgs []*Config var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
} }
return NewAggregators(cfgs, pushFunc, dedupInterval) return NewAggregators(cfgs, pushFunc, opts)
}
// Options contains optional settings for the Aggregators.
type Options struct {
// DedupInterval is deduplication interval for samples received for the same time series.
//
// The last sample per each series is left per each DedupInterval if DedupInterval > 0.
//
// By default deduplication is disabled.
DedupInterval time.Duration
// NoAlignFlushToInterval disables alignment of flushes to the aggregation interval.
//
// By default flushes are aligned to aggregation interval.
NoAlignFlushToInterval bool
// FlushOnShutdown enables flush of incomplete state on shutdown.
//
// By default incomplete state is dropped on shutdown.
FlushOnShutdown bool
// KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
//
// By default the following suffix is added to every output time series:
//
// input_name:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
//
KeepMetricNames bool
} }
// Config is a configuration for a single stream aggregation. // Config is a configuration for a single stream aggregation.
@ -85,6 +113,16 @@ type Config struct {
// Interval is the interval between aggregations. // Interval is the interval between aggregations.
Interval string `yaml:"interval"` Interval string `yaml:"interval"`
// NoAlighFlushToInterval disables aligning of flushes to multiples of Interval.
// By default flushes are aligned to Interval.
//
// See also FlushOnShutdown.
NoAlignFlushToInterval *bool `yaml:"no_align_flush_to_interval,omitempty"`
// FlushOnShutdown defines whether to flush the aggregation state on process termination
// or config reload. By default the state is dropped on these events.
FlushOnShutdown *bool `yaml:"flush_on_shutdown,omitempty"`
// DedupInterval is an optional interval for deduplication. // DedupInterval is an optional interval for deduplication.
DedupInterval string `yaml:"dedup_interval,omitempty"` DedupInterval string `yaml:"dedup_interval,omitempty"`
@ -121,9 +159,8 @@ type Config struct {
// //
Outputs []string `yaml:"outputs"` Outputs []string `yaml:"outputs"`
// KeepMetricNames instructs to leave metric names as is for the output time series // KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
// without adding any suffix. KeepMetricNames *bool `yaml:"keep_metric_names,omitempty"`
KeepMetricNames bool `yaml:"keep_metric_names,omitempty"`
// By is an optional list of labels for grouping input series. // By is an optional list of labels for grouping input series.
// //
@ -148,10 +185,6 @@ type Config struct {
// OutputRelabelConfigs is an optional relabeling rules, which are applied // OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage. // on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
// FlushOnShutdown defines whether to flush the aggregation state on process termination
// or config reload. Is `false` by default.
FlushOnShutdown bool `yaml:"flush_on_shutdown,omitempty"`
} }
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
@ -169,15 +202,14 @@ type Aggregators struct {
// //
// pushFunc is called when the aggregated data must be flushed. // pushFunc is called when the aggregated data must be flushed.
// //
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, // opts can contain additional options. If opts is nil, then default options are used.
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
// //
// MustStop must be called on the returned Aggregators when they are no longer needed. // MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { func NewAggregators(cfgs []*Config, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
ms := metrics.NewSet() ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs)) as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs { for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc, ms, dedupInterval) a, err := newAggregator(cfg, pushFunc, ms, opts)
if err != nil { if err != nil {
// Stop already initialized aggregators before returning the error. // Stop already initialized aggregators before returning the error.
for _, a := range as[:i] { for _, a := range as[:i] {
@ -294,19 +326,15 @@ type aggregator struct {
without []string without []string
aggregateOnlyByTime bool aggregateOnlyByTime bool
// da is set to non-nil if input samples must be de-duplicated according // da is set to non-nil if input samples must be de-duplicated
// to the dedupInterval passed to newAggregator().
da *dedupAggr da *dedupAggr
// aggrStates contains aggregate states for the given outputs // aggrStates contains aggregate states for the given outputs
aggrStates []aggrState aggrStates []aggrState
// lc is used for compressing series keys before passing them to dedupAggr and aggrState. // lc is used for compressing series keys before passing them to dedupAggr and aggrState
lc promutils.LabelsCompressor lc promutils.LabelsCompressor
// pushFunc is the callback, which is called by aggrState when flushing its state.
pushFunc PushFunc
// suffix contains a suffix, which should be added to aggregate metric names // suffix contains a suffix, which should be added to aggregate metric names
// //
// It contains the interval, labels in (by, without), plus output name. // It contains the interval, labels in (by, without), plus output name.
@ -314,10 +342,6 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]` // for `interval: 1m`, `by: [job]`
suffix string suffix string
// flushOnShutdown defines whether to flush the state of aggregation
// on MustStop call.
flushOnShutdown bool
wg sync.WaitGroup wg sync.WaitGroup
stopCh chan struct{} stopCh chan struct{}
@ -338,11 +362,14 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
// //
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, // opts can contain additional options. If opts is nil, then default options are used.
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
// //
// The returned aggregator must be stopped when no longer needed by calling MustStop(). // The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterval time.Duration) (*aggregator, error) { func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Options) (*aggregator, error) {
if opts == nil {
opts = &Options{}
}
// check cfg.Interval // check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval) interval, err := time.ParseDuration(cfg.Interval)
if err != nil { if err != nil {
@ -353,6 +380,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
} }
// check cfg.DedupInterval // check cfg.DedupInterval
dedupInterval := opts.DedupInterval
if cfg.DedupInterval != "" { if cfg.DedupInterval != "" {
di, err := time.ParseDuration(cfg.DedupInterval) di, err := time.ParseDuration(cfg.DedupInterval)
if err != nil { if err != nil {
@ -401,7 +429,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
} }
// check cfg.KeepMetricNames // check cfg.KeepMetricNames
if cfg.KeepMetricNames { keepMetricNames := opts.KeepMetricNames
if v := cfg.KeepMetricNames; v != nil {
keepMetricNames = *v
}
if keepMetricNames {
if len(cfg.Outputs) != 1 { if len(cfg.Outputs) != 1 {
return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs) return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs)
} }
@ -495,17 +527,15 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
inputRelabeling: inputRelabeling, inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling, outputRelabeling: outputRelabeling,
keepMetricNames: cfg.KeepMetricNames, keepMetricNames: keepMetricNames,
by: by, by: by,
without: without, without: without,
aggregateOnlyByTime: aggregateOnlyByTime, aggregateOnlyByTime: aggregateOnlyByTime,
aggrStates: aggrStates, aggrStates: aggrStates,
pushFunc: pushFunc,
suffix: suffix, suffix: suffix,
flushOnShutdown: cfg.FlushOnShutdown,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
@ -519,33 +549,90 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
a.da = newDedupAggr() a.da = newDedupAggr()
} }
alignFlushToInterval := !opts.NoAlignFlushToInterval
if v := cfg.NoAlignFlushToInterval; v != nil {
alignFlushToInterval = !*v
}
skipIncompleteFlush := !opts.FlushOnShutdown
if v := cfg.FlushOnShutdown; v != nil {
skipIncompleteFlush = !*v
}
a.wg.Add(1) a.wg.Add(1)
go func() { go func() {
a.runFlusher(interval, dedupInterval) a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval)
a.wg.Done() a.wg.Done()
}() }()
return a, nil return a, nil
} }
func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) { func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) {
tickerFlush := time.NewTicker(interval) flushTickerCh := make(chan *time.Ticker, 1)
defer tickerFlush.Stop() dedupFlushTickerCh := make(chan *time.Ticker, 1)
go func() {
if !alignFlushToInterval {
flushTickerCh <- time.NewTicker(interval)
if dedupInterval > 0 {
dedupFlushTickerCh <- time.NewTicker(dedupInterval)
}
return
}
var dedupTickerCh <-chan time.Time sleep := func(d time.Duration) {
if dedupInterval > 0 { timer := timerpool.Get(d)
t := time.NewTicker(dedupInterval) defer timerpool.Put(timer)
defer t.Stop() select {
dedupTickerCh = t.C case <-a.stopCh:
} case <-timer.C:
}
}
currentTime := time.Duration(time.Now().UnixNano())
if dedupInterval > 0 {
d := dedupInterval - (currentTime % dedupInterval)
if d < dedupInterval {
sleep(d)
}
dedupFlushTickerCh <- time.NewTicker(dedupInterval)
currentTime += d
}
d := interval - (currentTime % interval)
if d < interval {
sleep(d)
}
flushTickerCh <- time.NewTicker(interval)
}()
var flushTickerC <-chan time.Time
var dedupFlushTickerC <-chan time.Time
isFirstFlush := true
for { for {
select { select {
case <-a.stopCh: case <-a.stopCh:
if !skipIncompleteFlush {
if dedupInterval > 0 {
a.dedupFlush()
}
a.flush(pushFunc)
}
return return
case <-tickerFlush.C: case flushTicker := <-flushTickerCh:
flushTickerC = flushTicker.C
defer flushTicker.Stop()
case dedupFlushTicker := <-dedupFlushTickerCh:
dedupFlushTickerC = dedupFlushTicker.C
defer dedupFlushTicker.Stop()
case <-flushTickerC:
if isFirstFlush {
isFirstFlush = false
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil)
continue
}
}
startTime := time.Now() startTime := time.Now()
a.flush() a.flush(pushFunc)
d := time.Since(startTime) d := time.Since(startTime)
a.flushDuration.Update(d.Seconds()) a.flushDuration.Update(d.Seconds())
if d > interval { if d > interval {
@ -554,7 +641,7 @@ func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) {
"possible solutions: increase interval; use match filter matching smaller number of series; "+ "possible solutions: increase interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", interval, d) "reduce samples' ingestion rate to stream aggregation", interval, d)
} }
case <-dedupTickerCh: case <-dedupFlushTickerC:
startTime := time.Now() startTime := time.Now()
a.dedupFlush() a.dedupFlush()
d := time.Since(startTime) d := time.Since(startTime)
@ -573,7 +660,7 @@ func (a *aggregator) dedupFlush() {
a.da.flush(a.pushSamples) a.da.flush(a.pushSamples)
} }
func (a *aggregator) flush() { func (a *aggregator) flush(pushFunc PushFunc) {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, as := range a.aggrStates { for _, as := range a.aggrStates {
flushConcurrencyCh <- struct{}{} flushConcurrencyCh <- struct{}{}
@ -584,7 +671,7 @@ func (a *aggregator) flush() {
wg.Done() wg.Done()
}() }()
ctx := getFlushCtx(a) ctx := getFlushCtx(a, pushFunc)
as.flushState(ctx) as.flushState(ctx)
ctx.flushSeries() ctx.flushSeries()
putFlushCtx(ctx) putFlushCtx(ctx)
@ -601,16 +688,6 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func (a *aggregator) MustStop() { func (a *aggregator) MustStop() {
close(a.stopCh) close(a.stopCh)
a.wg.Wait() a.wg.Wait()
if !a.flushOnShutdown {
return
}
// Flush the remaining data from the last interval if needed.
if a.da != nil {
a.dedupFlush()
}
a.flush()
} }
// Push pushes tss to a. // Push pushes tss to a.
@ -770,13 +847,14 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by,
return dstInput, dstOutput return dstInput, dstOutput
} }
func getFlushCtx(a *aggregator) *flushCtx { func getFlushCtx(a *aggregator, pushFunc PushFunc) *flushCtx {
v := flushCtxPool.Get() v := flushCtxPool.Get()
if v == nil { if v == nil {
v = &flushCtx{} v = &flushCtx{}
} }
ctx := v.(*flushCtx) ctx := v.(*flushCtx)
ctx.a = a ctx.a = a
ctx.pushFunc = pushFunc
return ctx return ctx
} }
@ -788,7 +866,8 @@ func putFlushCtx(ctx *flushCtx) {
var flushCtxPool sync.Pool var flushCtxPool sync.Pool
type flushCtx struct { type flushCtx struct {
a *aggregator a *aggregator
pushFunc PushFunc
tss []prompbmarshal.TimeSeries tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label labels []prompbmarshal.Label
@ -797,6 +876,7 @@ type flushCtx struct {
func (ctx *flushCtx) reset() { func (ctx *flushCtx) reset() {
ctx.a = nil ctx.a = nil
ctx.pushFunc = nil
ctx.resetSeries() ctx.resetSeries()
} }
@ -819,7 +899,9 @@ func (ctx *flushCtx) flushSeries() {
outputRelabeling := ctx.a.outputRelabeling outputRelabeling := ctx.a.outputRelabeling
if outputRelabeling == nil { if outputRelabeling == nil {
// Fast path - push the output metrics. // Fast path - push the output metrics.
ctx.a.pushFunc(tss) if ctx.pushFunc != nil {
ctx.pushFunc(tss)
}
return return
} }
@ -838,7 +920,9 @@ func (ctx *flushCtx) flushSeries() {
ts.Labels = dstLabels[dstLabelsLen:] ts.Labels = dstLabels[dstLabelsLen:]
dst = append(dst, ts) dst = append(dst, ts)
} }
ctx.a.pushFunc(dst) if ctx.pushFunc != nil {
ctx.pushFunc(dst)
}
auxLabels.Labels = dstLabels auxLabels.Labels = dstLabels
promutils.PutLabels(auxLabels) promutils.PutLabels(auxLabels)

View file

@ -20,7 +20,7 @@ func TestAggregatorsFailure(t *testing.T) {
pushFunc := func(tss []prompbmarshal.TimeSeries) { pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called")) panic(fmt.Errorf("pushFunc shouldn't be called"))
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) a, err := newAggregatorsFromData([]byte(config), pushFunc, nil)
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
@ -158,11 +158,11 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper() t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {} pushFunc := func(tss []prompbmarshal.TimeSeries) {}
aa, err := newAggregatorsFromData([]byte(a), pushFunc, 0) aa, err := newAggregatorsFromData([]byte(a), pushFunc, nil)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
ab, err := newAggregatorsFromData([]byte(b), pushFunc, 0) ab, err := newAggregatorsFromData([]byte(b), pushFunc, nil)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
@ -220,15 +220,14 @@ func TestAggregatorsSuccess(t *testing.T) {
} }
tssOutputLock.Unlock() tssOutputLock.Unlock()
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) opts := &Options{
FlushOnShutdown: true,
NoAlignFlushToInterval: true,
}
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
for _, ag := range a.as {
// explicitly set flushOnShutdown, so aggregations results
// are immediately available after a.MustStop() call.
ag.flushOnShutdown = true
}
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) tssInput := mustParsePromMetrics(inputMetrics)
@ -862,8 +861,11 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
} }
tssOutputLock.Unlock() tssOutputLock.Unlock()
} }
const dedupInterval = 30 * time.Second opts := &Options{
a, err := newAggregatorsFromData([]byte(config), pushFunc, dedupInterval) DedupInterval: 30 * time.Second,
FlushOnShutdown: true,
}
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
@ -871,12 +873,6 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) tssInput := mustParsePromMetrics(inputMetrics)
matchIdxs := a.Push(tssInput, nil) matchIdxs := a.Push(tssInput, nil)
if a != nil {
for _, aggr := range a.as {
aggr.dedupFlush()
aggr.flush()
}
}
a.MustStop() a.MustStop()
// Verify matchIdxs equals to matchIdxsExpected // Verify matchIdxs equals to matchIdxsExpected

View file

@ -44,7 +44,8 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
} }
func benchmarkAggregatorsFlushSerial(b *testing.B, output string) { func benchmarkAggregatorsFlushSerial(b *testing.B, output string) {
a := newBenchAggregators(output) pushFunc := func(tss []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(output, pushFunc)
defer a.MustStop() defer a.MustStop()
var matchIdxs []byte var matchIdxs []byte
@ -54,13 +55,14 @@ func benchmarkAggregatorsFlushSerial(b *testing.B, output string) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs) matchIdxs = a.Push(benchSeries, matchIdxs)
for _, aggr := range a.as { for _, aggr := range a.as {
aggr.flush() aggr.flush(pushFunc)
} }
} }
} }
func benchmarkAggregatorsPush(b *testing.B, output string) { func benchmarkAggregatorsPush(b *testing.B, output string) {
a := newBenchAggregators(output) pushFunc := func(tss []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(output, pushFunc)
defer a.MustStop() defer a.MustStop()
const loops = 100 const loops = 100
@ -77,15 +79,14 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
}) })
} }
func newBenchAggregators(output string) *Aggregators { func newBenchAggregators(output string, pushFunc PushFunc) *Aggregators {
config := fmt.Sprintf(` config := fmt.Sprintf(`
- match: http_requests_total - match: http_requests_total
interval: 24h interval: 24h
without: [job] without: [job]
outputs: [%q] outputs: [%q]
`, output) `, output)
pushFunc := func(tss []prompbmarshal.TimeSeries) {} a, err := newAggregatorsFromData([]byte(config), pushFunc, nil)
a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
} }