diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a3eb69b46..726e9c9b3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -53,6 +53,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes. * BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests. +* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): pick samples with bigger values and timestamps on deduplication interval ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index e7fd336eb..8f3349a8f 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -28,7 +28,8 @@ type dedupAggrShardNopad struct { } type dedupAggrSample struct { - value float64 + value float64 + timestamp int64 } func newDedupAggr() *dedupAggr { @@ -172,8 +173,21 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) { das.m = m } for _, sample := range samples { - m[sample.key] = dedupAggrSample{ - value: sample.value, + s, ok := m[sample.key] + if !ok { + m[sample.key] = dedupAggrSample{ + value: sample.value, + timestamp: sample.timestamp, + } + continue + } + // update the existing value according to logic described in + // https://docs.victoriametrics.com/#deduplication + if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) { + m[sample.key] = dedupAggrSample{ + value: sample.value, + timestamp: sample.timestamp, + } } } } diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go index 685bba6f7..f0b4cc951 100644 --- a/lib/streamaggr/dedup_test.go +++ b/lib/streamaggr/dedup_test.go @@ -23,8 +23,8 @@ func TestDedupAggrSerial(t *testing.T) { da.pushSamples(samples) } - if n := da.sizeBytes(); n > 3_400_000 { - t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 3_400_000 bytes", n) + if n := da.sizeBytes(); n > 4_200_000 { + t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n) } if n := da.itemsCount(); n != seriesCount { t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount) diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 9a97ac60c..9cc3fee22 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -107,8 +107,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { key := bytesutil.InternBytes(buf) for _, s := range ts.Samples { pss = append(pss, pushSample{ - key: key, - value: s.Value, + key: key, + value: s.Value, + timestamp: s.Timestamp, }) } } diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go index ad860bd26..20858db83 100644 --- a/lib/streamaggr/deduplicator_test.go +++ b/lib/streamaggr/deduplicator_test.go @@ -41,7 +41,7 @@ baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",n bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54 baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3 foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894 -x 433 +x 90984 ` if result != resultExpected { t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index f0ea7c06a..e1418c948 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -776,8 +776,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { continue } samples = append(samples, pushSample{ - key: key, - value: sample.Value, + key: key, + value: sample.Value, + timestamp: sample.Timestamp, }) } } @@ -851,8 +852,9 @@ func (ctx *pushCtx) reset() { } type pushSample struct { - key string - value float64 + key string + value float64 + timestamp int64 } func getPushCtx() *pushCtx { diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 731657bab..7d49b40c5 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -939,7 +939,7 @@ foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 -`, `bar:1m_sum_samples{baz="qwe"} 2 +`, `bar:1m_sum_samples{baz="qwe"} 4.34 bar:1m_sum_samples{baz="qwer"} 344 foo:1m_sum_samples 123 foo:1m_sum_samples{baz="qwe"} 10