lib/streamaggr: skip unfinished aggregation state on shutdown by default (#5689)

Sending unfinished aggregate states tend to produce unexpected anomalies with lower values than expected.
The old behavior can be restored by specifying `flush_on_shutdown: true` setting in streaming aggregation config

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2024-01-26 22:45:23 +01:00 committed by GitHub
parent df59ac7f0e
commit aaa526e8ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 40 additions and 2 deletions

View file

@ -37,6 +37,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to set `attach_metadata.node=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/vmagent.html#quick-start) via `-promscrape.kubernetes.attachNodeMetadataAll` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4640). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5593). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to set `attach_metadata.node=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/vmagent.html#quick-start) via `-promscrape.kubernetes.attachNodeMetadataAll` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4640). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5593).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send unfinished [aggregation state](https://docs.victoriametrics.com/stream-aggregation.html) on shutdown or [hot config reload](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) by default, as it tend to produce unexpected anomalies with lower values. The old behavior can be restored by specifying `flush_on_shutdown: true` setting in streaming aggregation config. See more details [here](https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs).
* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation.html): expand `%{ENV_VAR}` placeholders in config files with the corresponding environment variable values. * FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation.html): expand `%{ENV_VAR}` placeholders in config files with the corresponding environment variable values.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags: * FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags:
- `-datasource.oauth2.endpointParams` for `-datasource.url` - `-datasource.oauth2.endpointParams` for `-datasource.url`

View file

@ -410,11 +410,15 @@ For example, the following config removes the `:1m_sum_samples` suffix added [to
## Aggregation outputs ## Aggregation outputs
The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config) The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config)
and then sent to the storage. and then sent to the storage once per `interval`.
If `by` and `without` lists are specified in the [config](#stream-aggregation-config), If `by` and `without` lists are specified in the [config](#stream-aggregation-config),
then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`. then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`.
On vmagent shutdown or [configuration reload](#configuration-update) unfinished aggregated states are discarded,
as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in
aggregation config to make vmagent to send unfinished states to the remote storage.
Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config). Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config).
### total ### total
@ -642,6 +646,12 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# #
# staleness_interval: 2m # staleness_interval: 2m
# flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts
# or config reloads. It is not recommended changing this setting, unless unfinished aggregations states
# are preferred to missing data points.
# Is `false` by default.
# flush_on_shutdown: false
# without is an optional list of labels, which must be removed from the output aggregation. # without is an optional list of labels, which must be removed from the output aggregation.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
# #

View file

@ -131,6 +131,10 @@ type Config struct {
// OutputRelabelConfigs is an optional relabeling rules, which are applied // OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage. // on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
// FlushOnShutdown defines whether to flush the aggregation state on process termination
// or config reload. Is `false` by default.
FlushOnShutdown bool `yaml:"flush_on_shutdown,omitempty"`
} }
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
@ -240,6 +244,10 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]` // for `interval: 1m`, `by: [job]`
suffix string suffix string
// flushOnShutdown defines whether to flush the state of aggregation
// on MustStop call.
flushOnShutdown bool
wg sync.WaitGroup wg sync.WaitGroup
stopCh chan struct{} stopCh chan struct{}
} }
@ -394,6 +402,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
pushFunc: pushFunc, pushFunc: pushFunc,
suffix: suffix, suffix: suffix,
flushOnShutdown: cfg.FlushOnShutdown,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
@ -506,6 +515,10 @@ func (a *aggregator) MustStop() {
close(a.stopCh) close(a.stopCh)
a.wg.Wait() a.wg.Wait()
if !a.flushOnShutdown {
return
}
// Flush the remaining data from the last interval if needed. // Flush the remaining data from the last interval if needed.
flushConcurrencyCh <- struct{}{} flushConcurrencyCh <- struct{}{}
if a.dedupAggr != nil { if a.dedupAggr != nil {

View file

@ -155,6 +155,15 @@ func TestAggregatorsEqual(t *testing.T) {
`, ` `, `
- outputs: [total] - outputs: [total]
interval: 5m interval: 5m
`, false)
f(`
- outputs: [total]
interval: 5m
flush_on_shutdown: true
`, `
- outputs: [total]
interval: 5m
flush_on_shutdown: false
`, false) `, false)
} }
@ -181,6 +190,11 @@ func TestAggregatorsSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
for _, ag := range a.as {
// explicitly set flushOnShutdown, so aggregations results
// are immediately available after a.MustStop() call.
ag.flushOnShutdown = true
}
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) tssInput := mustParsePromMetrics(inputMetrics)