diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 366bd32cf..306db79bf 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -50,6 +50,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): expose `vmauth_user_request_duration_seconds` and `vmauth_unauthorized_user_request_duration_seconds` summary metrics for measuring requests latency per user. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): show backup progress percentage in log during backup uploading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460). * FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): show restoring progress percentage in log during backup downloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details. * FEATURE: add ability to fine-tune Graphite API limits via the following command-line flags: `-search.maxGraphiteTagKeys` for limiting the number of tag keys returned from [Graphite API for tags](https://docs.victoriametrics.com/#graphite-tags-api-usage) `-search.maxGraphiteTagValues` for limiting the number of tag values returned from [Graphite API for tag values](https://docs.victoriametrics.com/#graphite-tags-api-usage) diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 244e7d2a5..dc81fcc3d 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -546,6 +546,16 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # The aggregated stats is sent to remote storage once per interval. interval: 1m + # staleness_interval defines an interval after which the series state will be reset if no samples have been sent during it. + # It means that: + # - no data point will be written for a resulting time series if it didn't receive any updates during configured interval, + # - if the series receives updates after the configured interval again, then the time series will be calculated from the initial state + # (it's like this series didn't exist until now). + # Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value. + # By default, is equal to x2 of the `interval` field. + # The parameter is only relevant for outputs: total, increase and histogram_bucket. + # staleness_interval: 2m + # without is an optional list of labels, which must be removed from the output aggregation. # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels without: [instance] diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 7d844d09e..7361b6238 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -12,7 +12,7 @@ import ( type histogramBucketAggrState struct { m sync.Map - intervalSecs uint64 + stalenessInterval uint64 } type histogramBucketStateValue struct { @@ -22,16 +22,15 @@ type histogramBucketStateValue struct { deleted bool } -func newHistogramBucketAggrState(interval time.Duration) *histogramBucketAggrState { - intervalSecs := uint64(interval.Seconds() + 1) +func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState { return &histogramBucketAggrState{ - intervalSecs: intervalSecs, + stalenessInterval: uint64(stalenessInterval.Seconds()), } } func (as *histogramBucketAggrState) pushSample(inputKey, outputKey string, value float64) { currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + 2*as.intervalSecs + deleteDeadline := currentTime + as.stalenessInterval again: v, ok := as.m.Load(outputKey) diff --git a/lib/streamaggr/increase.go b/lib/streamaggr/increase.go index e21bfedad..8a43e6443 100644 --- a/lib/streamaggr/increase.go +++ b/lib/streamaggr/increase.go @@ -12,7 +12,7 @@ type increaseAggrState struct { m sync.Map ignoreInputDeadline uint64 - intervalSecs uint64 + stalenessInterval uint64 } type increaseStateValue struct { @@ -23,18 +23,18 @@ type increaseStateValue struct { deleted bool } -func newIncreaseAggrState(interval time.Duration) *increaseAggrState { +func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *increaseAggrState { currentTime := fasttime.UnixTimestamp() intervalSecs := uint64(interval.Seconds() + 1) return &increaseAggrState{ ignoreInputDeadline: currentTime + intervalSecs, - intervalSecs: intervalSecs, + stalenessInterval: uint64(stalenessInterval.Seconds()), } } func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) { currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + 2*as.intervalSecs + deleteDeadline := currentTime + as.stalenessInterval again: v, ok := as.m.Load(outputKey) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 03a0e0210..03ca4f5ed 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -79,6 +79,10 @@ type Config struct { // Interval is the interval between aggregations. Interval string `yaml:"interval"` + // Staleness interval is interval after which the series state will be reset if no samples have been sent during it. + // The parameter is only relevant for outputs: total, increase and histogram_bucket. + StalenessInterval string `yaml:"staleness_interval,omitempty"` + // Outputs is a list of output aggregate functions to produce. // // The following names are allowed: @@ -254,6 +258,18 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval) } + // check cfg.StalenessInterval + stalenessInterval := interval * 2 + if cfg.StalenessInterval != "" { + stalenessInterval, err = time.ParseDuration(cfg.StalenessInterval) + if err != nil { + return nil, fmt.Errorf("cannot parse `staleness_interval: %q`: %w", cfg.StalenessInterval, err) + } + if stalenessInterval < interval { + return nil, fmt.Errorf("staleness_interval cannot be less than interval (%s); got %s", cfg.Interval, cfg.StalenessInterval) + } + } + // initialize input_relabel_configs and output_relabel_configs inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs) if err != nil { @@ -308,9 +324,9 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) } switch output { case "total": - aggrStates[i] = newTotalAggrState(interval) + aggrStates[i] = newTotalAggrState(interval, stalenessInterval) case "increase": - aggrStates[i] = newIncreaseAggrState(interval) + aggrStates[i] = newIncreaseAggrState(interval, stalenessInterval) case "count_series": aggrStates[i] = newCountSeriesAggrState() case "count_samples": @@ -330,7 +346,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) case "stdvar": aggrStates[i] = newStdvarAggrState() case "histogram_bucket": - aggrStates[i] = newHistogramBucketAggrState(interval) + aggrStates[i] = newHistogramBucketAggrState(stalenessInterval) default: return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+ "see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs) diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index b2e935acc..92b1cd2bd 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -13,7 +13,7 @@ type totalAggrState struct { m sync.Map ignoreInputDeadline uint64 - intervalSecs uint64 + stalenessInterval uint64 } type totalStateValue struct { @@ -29,18 +29,18 @@ type lastValueState struct { deleteDeadline uint64 } -func newTotalAggrState(interval time.Duration) *totalAggrState { +func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *totalAggrState { currentTime := fasttime.UnixTimestamp() intervalSecs := uint64(interval.Seconds() + 1) return &totalAggrState{ ignoreInputDeadline: currentTime + intervalSecs, - intervalSecs: intervalSecs, + stalenessInterval: uint64(stalenessInterval.Seconds()), } } func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) { currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1) + deleteDeadline := currentTime + as.stalenessInterval again: v, ok := as.m.Load(outputKey)