lib/streamaggr: pick sample with bigger timestamp or value on deduplicator (#5939)

Apply the same deduplication logic as in https://docs.victoriametrics.com/#deduplication
This would require more memory for deduplication, since we need to track timestamp
for each record. However, deduplication should become more consistent.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643

---------

Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
This commit is contained in:
Andrii Chubatiuk 2024-03-12 23:47:29 +02:00 committed by GitHub
parent e80b44f19d
commit 15e33d56f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 31 additions and 13 deletions

View file

@ -53,6 +53,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
* BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): pick samples with bigger values and timestamps on deduplication interval
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)

View file

@ -28,7 +28,8 @@ type dedupAggrShardNopad struct {
}
type dedupAggrSample struct {
value float64
value float64
timestamp int64
}
func newDedupAggr() *dedupAggr {
@ -172,8 +173,21 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
das.m = m
}
for _, sample := range samples {
m[sample.key] = dedupAggrSample{
value: sample.value,
s, ok := m[sample.key]
if !ok {
m[sample.key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
continue
}
// update the existing value according to logic described in
// https://docs.victoriametrics.com/#deduplication
if sample.timestamp > s.timestamp || (sample.timestamp == s.timestamp && sample.value > s.value) {
m[sample.key] = dedupAggrSample{
value: sample.value,
timestamp: sample.timestamp,
}
}
}
}

View file

@ -23,8 +23,8 @@ func TestDedupAggrSerial(t *testing.T) {
da.pushSamples(samples)
}
if n := da.sizeBytes(); n > 3_400_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 3_400_000 bytes", n)
if n := da.sizeBytes(); n > 4_200_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n)
}
if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)

View file

@ -107,8 +107,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
key := bytesutil.InternBytes(buf)
for _, s := range ts.Samples {
pss = append(pss, pushSample{
key: key,
value: s.Value,
key: key,
value: s.Value,
timestamp: s.Timestamp,
})
}
}

View file

@ -41,7 +41,7 @@ baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",n
bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54
baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3
foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894
x 433
x 90984
`
if result != resultExpected {
t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected)

View file

@ -776,8 +776,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
continue
}
samples = append(samples, pushSample{
key: key,
value: sample.Value,
key: key,
value: sample.Value,
timestamp: sample.Timestamp,
})
}
}
@ -851,8 +852,9 @@ func (ctx *pushCtx) reset() {
}
type pushSample struct {
key string
value float64
key string
value float64
timestamp int64
}
func getPushCtx() *pushCtx {

View file

@ -939,7 +939,7 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_sum_samples{baz="qwe"} 2
`, `bar:1m_sum_samples{baz="qwe"} 4.34
bar:1m_sum_samples{baz="qwer"} 344
foo:1m_sum_samples 123
foo:1m_sum_samples{baz="qwe"} 10