diff --git a/app/vmselect/netstorage/netstorage_test.go b/app/vmselect/netstorage/netstorage_test.go index 6de0d268d..4b20553be 100644 --- a/app/vmselect/netstorage/netstorage_test.go +++ b/app/vmselect/netstorage/netstorage_test.go @@ -144,7 +144,7 @@ func TestMergeSortBlocks(t *testing.T) { }, }, 1, &Result{ Timestamps: []int64{1, 2, 4, 5, 10, 11, 12}, - Values: []float64{21, 22, 23, 7, 24, 5, 26}, + Values: []float64{21, 22, 23, 7, 24, 25, 26}, }) // Multiple blocks with identical timestamp ranges, no deduplication. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 08981a6b5..cfd13f028 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -56,6 +56,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-remoteWrite.sendTimeout` command-line flag, which allows configuring timeout for sending data to `-remoteWrite.url`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3408). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to migrate data between VictoriaMetrics clusters with automatic tenants discovery. See [these docs](https://docs.victoriametrics.com/vmctl.html#cluster-to-cluster-migration-mode) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2930). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to copy data from sources via Prometheus `remote_read` protocol. See [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-by-remote-read-protocol). The related issues: [one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3132) and [two](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1101). +* FEATURE: [deduplication](https://docs.victoriametrics.com/#deduplication): leave raw sample with the biggest value for identical timestamps per each `-dedup.minScrapeInterval` discrete interval when the [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): allow changing timezones for the requested data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3075). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): provide fast path for hiding results for all the queries except the given one by clicking `eye` icon with `ctrl` key pressed. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3446). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_spikes(phi, q)` function for trimming `phi` percent of the largest spikes per each time series returned by `q`. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#range_trim_spikes). diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go index 05e1d2ec6..5f72ff3a6 100644 --- a/lib/storage/dedup.go +++ b/lib/storage/dedup.go @@ -34,20 +34,37 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterva tsNext -= tsNext % dedupInterval dstTimestamps := srcTimestamps[:0] dstValues := srcValues[:0] + + var tsPrev int64 + var value, valuePrev float64 for i, ts := range srcTimestamps[1:] { + value = srcValues[i] + tsCur := srcTimestamps[i] + if tsCur == tsPrev && value < valuePrev { + // prefer biggest value on timestamp conflict + value = valuePrev + } + valuePrev = value + tsPrev = tsCur if ts <= tsNext { continue } - dstTimestamps = append(dstTimestamps, srcTimestamps[i]) - dstValues = append(dstValues, srcValues[i]) + dstTimestamps = append(dstTimestamps, tsCur) + dstValues = append(dstValues, value) tsNext += dedupInterval if tsNext < ts { tsNext = ts + dedupInterval - 1 tsNext -= tsNext % dedupInterval } } - dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1]) - dstValues = append(dstValues, srcValues[len(srcValues)-1]) + + ts := srcTimestamps[len(srcTimestamps)-1] + v := srcValues[len(srcValues)-1] + dstTimestamps = append(dstTimestamps, ts) + if ts == tsPrev && v < value { + v = value + } + dstValues = append(dstValues, v) return dstTimestamps, dstValues } @@ -60,20 +77,36 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterv tsNext -= tsNext % dedupInterval dstTimestamps := srcTimestamps[:0] dstValues := srcValues[:0] + + var tsPrev int64 + var value, valuePrev int64 for i, ts := range srcTimestamps[1:] { + value = srcValues[i] + tsCur := srcTimestamps[i] + if tsCur == tsPrev && value < valuePrev { + // prefer biggest value on timestamp conflict + value = valuePrev + } + valuePrev = value + tsPrev = tsCur if ts <= tsNext { continue } - dstTimestamps = append(dstTimestamps, srcTimestamps[i]) - dstValues = append(dstValues, srcValues[i]) + dstTimestamps = append(dstTimestamps, tsCur) + dstValues = append(dstValues, value) tsNext += dedupInterval if tsNext < ts { tsNext = ts + dedupInterval - 1 tsNext -= tsNext % dedupInterval } } - dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1]) - dstValues = append(dstValues, srcValues[len(srcValues)-1]) + ts := srcTimestamps[len(srcTimestamps)-1] + v := srcValues[len(srcValues)-1] + dstTimestamps = append(dstTimestamps, ts) + if ts == tsPrev && v < value { + v = value + } + dstValues = append(dstValues, v) return dstTimestamps, dstValues } diff --git a/lib/storage/dedup_test.go b/lib/storage/dedup_test.go index b5ba85ba0..0e4eb96a1 100644 --- a/lib/storage/dedup_test.go +++ b/lib/storage/dedup_test.go @@ -38,14 +38,10 @@ func TestNeedsDedup(t *testing.T) { func TestDeduplicateSamples(t *testing.T) { // Disable deduplication before exit, since the rest of tests expect disabled dedup. - f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64, valuesExpected []float64) { + f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64, values, valuesExpected []float64) { t.Helper() timestampsCopy := make([]int64, len(timestamps)) - values := make([]float64, len(timestamps)) - for i, ts := range timestamps { - timestampsCopy[i] = ts - values[i] = float64(i) - } + copy(timestampsCopy, timestamps) dedupInterval := scrapeInterval.Milliseconds() timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval) if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { @@ -65,13 +61,57 @@ func TestDeduplicateSamples(t *testing.T) { t.Fatalf("invalid DeduplicateSamples(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy) } } - f(time.Millisecond, nil, []int64{}, []float64{}) - f(time.Millisecond, []int64{123}, []int64{123}, []float64{0}) - f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []float64{0, 1}) - f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}, []float64{2, 4, 5, 8, 9}) - f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) - f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 180, 300, 1000}, []float64{0, 2, 5, 7, 8}) - f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 30e3, 39e3, 45e3}, []float64{0, 1, 4, 6, 7}) + f(time.Millisecond, nil, []int64{}, []float64{}, []float64{}) + f(time.Millisecond, []int64{123}, []int64{123}, []float64{0}, []float64{0}) + f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []float64{0, 1}, []float64{0, 1}) + + // pick the biggest value on the interval, no matter what order is + f(time.Millisecond, + []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, + []int64{0, 1, 2, 3, 4}, + []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []float64{2, 4, 5, 8, 9}) + f(time.Millisecond, + []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, + []int64{0, 1, 2, 3, 4}, + []float64{2, 1, 0, 3, 4, 5, 7, 6, 8, 9}, + []float64{2, 4, 5, 8, 9}) + f(time.Millisecond, + []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, + []int64{0, 1, 2, 3, 4}, + []float64{1, 2, 0, 4, 3, 5, 8, 6, 7, 9}, + []float64{2, 4, 5, 8, 9}) + + // descending values + f(time.Millisecond, + []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, + []int64{0, 1, 2, 3, 4}, + []float64{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}, + []float64{9, 6, 4, 3, 0}) + + f(10*time.Millisecond, + []int64{0, 9, 11, 13, 13, 29, 29, 29}, + []int64{0, 9, 13, 29}, + []float64{5, 1, 0, 4, 1, 3, 0, 5}, + []float64{5, 1, 4, 5}) + + // too small dedup interval + f(0, + []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, + []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, + []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + + f(100*time.Millisecond, + []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, + []int64{0, 100, 180, 300, 1000}, + []float64{0, 1, 2, 3, 4, 5, 6, 7, 8}, + []float64{0, 2, 5, 7, 8}) + f(10*time.Second, + []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, + []int64{10e3, 13e3, 30e3, 39e3, 45e3}, + []float64{0, 1, 2, 3, 4, 5, 6, 7}, + []float64{0, 1, 4, 6, 7}) } func TestDeduplicateSamplesDuringMerge(t *testing.T) {