diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7ac339177..5bb737770 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -41,6 +41,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): use the same logic in [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) as in [the deduplication at VictoriaMetrics](https://docs.victoriametrics.com/#deduplication). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. @@ -55,7 +56,6 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case. * BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests. -* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): take into account sample timestamp during [deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication). If multiple samples have the same timestamp on the configured deduplication interval, then the sample with the biggest value is kept. The logic is aligned with [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): properly parse TLS key and CA files for [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x) and [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb) migration modes. ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 94fc533fb..8c1eb0ed1 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -50,8 +50,8 @@ This behaviour can be changed via the following command-line flags: ## Deduplication -[vmagent](https://docs.victoriametrics.com/vmagent.html) supports de-duplication of samples before sending them -to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options: +[vmagent](https://docs.victoriametrics.com/vmagent.html) supports online [de-duplication](https://docs.victoriametrics.com/#deduplication) of samples +before sending them to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options: - By specifying the desired de-duplication interval via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url`. For example, `./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=30s` instructs `vmagent` to leave @@ -67,7 +67,7 @@ to the configured `-remoteWrite.url`. The de-duplication can be enabled via the - After storing the duplicate samples to local storage. See [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication) command-line option. - Before storing the duplicate samples to local storage. This type of de-duplication can be enabled via the following options: - By specifying the desired de-duplication interval via `-streamAggr.dedupInterval` command-line flag. - For example, `./victoria-metrics -streamAggr.dedupInterval=30s` instructs VicotriaMetrics to leave only the last sample per each + For example, `./victoria-metrics -streamAggr.dedupInterval=30s` instructs VictoriaMetrics to leave only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds. The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling). @@ -78,10 +78,7 @@ to the configured `-remoteWrite.url`. The de-duplication can be enabled via the It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels). -The online de-duplication takes into account timestamps associated with the de-duplicated samples - it keeps the sample -with the newest timestamp on the configured deduplication interval. If multiple samples have the same timestamp on the -configured deduplication interval, then the sample with the biggest value is kept. The logic is aligned with -[`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication). +The online de-duplication uses the same logic as [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication) at VictoriaMetrics. ## Flush time alignment diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 8f3349a8f..c9ac3dc67 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -181,8 +181,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { } continue } - // update the existing value according to logic described in - // https://docs.victoriametrics.com/#deduplication + // Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { m[sample.key] = dedupAggrSample{ value: sample.value, @@ -209,8 +208,9 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample dstSamples := ctx.samples for key, s := range m { dstSamples = append(dstSamples, pushSample{ - key: key, - value: s.value, + key: key, + value: s.value, + timestamp: s.timestamp, }) // Limit the number of samples per each flush in order to limit memory usage. diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go index 20858db83..96be55ef8 100644 --- a/lib/streamaggr/deduplicator_test.go +++ b/lib/streamaggr/deduplicator_test.go @@ -20,10 +20,10 @@ func TestDeduplicator(t *testing.T) { tss := mustParsePromMetrics(` foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123 bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54 -x 8943 +x 8943 1000 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34 -x 90984 -x 433 +x 90984 900 +x 433 1000 asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322 foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 @@ -41,7 +41,7 @@ baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",n bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54 baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3 foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894 -x 90984 +x 8943 ` if result != resultExpected { t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected)