lib/streamaggr: follow-up for 15e33d56f1

- Properly set pushSample.timestamp when flushing de-duplicated samples to stream aggregation
  This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931

- Re-classify this change as feature instead of bugfix at docs/CHANGELOG.md

- Verify de-duplication logic for samples with different timestamps

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5939
This commit is contained in:
Aliaksandr Valialkin 2024-03-17 21:37:14 +02:00
parent ab2b3f1785
commit cbd80efcc1
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 13 additions and 16 deletions

View file

@ -41,6 +41,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): use the same logic in [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) as in [the deduplication at VictoriaMetrics](https://docs.victoriametrics.com/#deduplication). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`.
@ -55,7 +56,6 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case.
* BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests. * BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): take into account sample timestamp during [deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication). If multiple samples have the same timestamp on the configured deduplication interval, then the sample with the biggest value is kept. The logic is aligned with [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): properly parse TLS key and CA files for [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x) and [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb) migration modes. * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): properly parse TLS key and CA files for [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x) and [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb) migration modes.
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)

View file

@ -50,8 +50,8 @@ This behaviour can be changed via the following command-line flags:
## Deduplication ## Deduplication
[vmagent](https://docs.victoriametrics.com/vmagent.html) supports de-duplication of samples before sending them [vmagent](https://docs.victoriametrics.com/vmagent.html) supports online [de-duplication](https://docs.victoriametrics.com/#deduplication) of samples
to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options: before sending them to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options:
- By specifying the desired de-duplication interval via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url`. - By specifying the desired de-duplication interval via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url`.
For example, `./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=30s` instructs `vmagent` to leave For example, `./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=30s` instructs `vmagent` to leave
@ -67,7 +67,7 @@ to the configured `-remoteWrite.url`. The de-duplication can be enabled via the
- After storing the duplicate samples to local storage. See [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication) command-line option. - After storing the duplicate samples to local storage. See [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication) command-line option.
- Before storing the duplicate samples to local storage. This type of de-duplication can be enabled via the following options: - Before storing the duplicate samples to local storage. This type of de-duplication can be enabled via the following options:
- By specifying the desired de-duplication interval via `-streamAggr.dedupInterval` command-line flag. - By specifying the desired de-duplication interval via `-streamAggr.dedupInterval` command-line flag.
For example, `./victoria-metrics -streamAggr.dedupInterval=30s` instructs VicotriaMetrics to leave only the last sample per each For example, `./victoria-metrics -streamAggr.dedupInterval=30s` instructs VictoriaMetrics to leave only the last sample per each
seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds. seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling). The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling).
@ -78,10 +78,7 @@ to the configured `-remoteWrite.url`. The de-duplication can be enabled via the
It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels). It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels).
The online de-duplication takes into account timestamps associated with the de-duplicated samples - it keeps the sample The online de-duplication uses the same logic as [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication) at VictoriaMetrics.
with the newest timestamp on the configured deduplication interval. If multiple samples have the same timestamp on the
configured deduplication interval, then the sample with the biggest value is kept. The logic is aligned with
[`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication).
## Flush time alignment ## Flush time alignment

View file

@ -181,8 +181,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
} }
continue continue
} }
// update the existing value according to logic described in // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
// https://docs.victoriametrics.com/#deduplication
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
m[sample.key] = dedupAggrSample{ m[sample.key] = dedupAggrSample{
value: sample.value, value: sample.value,
@ -211,6 +210,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
dstSamples = append(dstSamples, pushSample{ dstSamples = append(dstSamples, pushSample{
key: key, key: key,
value: s.value, value: s.value,
timestamp: s.timestamp,
}) })
// Limit the number of samples per each flush in order to limit memory usage. // Limit the number of samples per each flush in order to limit memory usage.

View file

@ -20,10 +20,10 @@ func TestDeduplicator(t *testing.T) {
tss := mustParsePromMetrics(` tss := mustParsePromMetrics(`
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123 foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123
bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54 bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54
x 8943 x 8943 1000
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34
x 90984 x 90984 900
x 433 x 433 1000
asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322 asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894 foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
@ -41,7 +41,7 @@ baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",n
bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54 bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54
baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3 baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3
foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894 foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894
x 90984 x 8943
` `
if result != resultExpected { if result != resultExpected {
t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected) t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected)