VictoriaMetrics/lib/storage/dedup.go
Andrei Baidarov 33a012c225
lib/storage: prefer stale markers over other values on dedup interval
Previously, during de-duplication staleness markers could be removed due to incorrect logic at
values equality check.
 During the evaluation of read query vmselect deduplicates samples using dedupInterval option. It picks the highest value across all points with the same timestamp next to the border of dedupInterval. The issue is any comparison with NaN via <, > returns false. This means that the position of NaN in srcValues could affect the result.


 This commit changes this logic with additional step, that explicitly checks for staleness marker for the following cases:
 1. Deduplication on vmselect
2. Deduplication in vmstorage during merges
3. Deduplication in stream aggregation

check performed only for stale markers, because other NaNs are rejected on ingestion
by vmstorage or by stream aggregation.

Checking for stale markers in general slows down dedup speed by 3%:
```
 benchstat old.txt new.txt

goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/storage
cpu: Apple M4 Pro
                                                       │   old.txt    │               new.txt                │
                                                       │    sec/op    │    sec/op     vs base                │
DeduplicateSamples/minScrapeInterval=1s-14               462.8n ± ∞ ¹   425.2n ± ∞ ¹       ~ (p=1.000 n=1) ²
DeduplicateSamples/minScrapeInterval=2s-14               905.6n ± ∞ ¹   903.3n ± ∞ ¹       ~ (p=1.000 n=1) ²
DeduplicateSamples/minScrapeInterval=5s-14               710.0n ± ∞ ¹   698.9n ± ∞ ¹       ~ (p=1.000 n=1) ²
DeduplicateSamples/minScrapeInterval=10s-14              632.7n ± ∞ ¹   638.5n ± ∞ ¹       ~ (p=1.000 n=1) ²
DeduplicateSamplesDuringMerge/minScrapeInterval=1s-14    439.7n ± ∞ ¹   409.9n ± ∞ ¹       ~ (p=1.000 n=1) ²
DeduplicateSamplesDuringMerge/minScrapeInterval=2s-14    908.9n ± ∞ ¹   882.2n ± ∞ ¹       ~ (p=1.000 n=1) ²
DeduplicateSamplesDuringMerge/minScrapeInterval=5s-14    721.2n ± ∞ ¹   684.7n ± ∞ ¹       ~ (p=1.000 n=1) ²
DeduplicateSamplesDuringMerge/minScrapeInterval=10s-14   659.1n ± ∞ ¹   630.6n ± ∞ ¹       ~ (p=1.000 n=1) ²
geomean                                                  659.5n         636.0n        -3.56%
```

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
---------
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2024-12-12 13:00:34 +01:00

172 lines
5.1 KiB
Go

package storage
import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
//
// De-duplication is disabled if dedupInterval is 0.
//
// This function must be called before initializing the storage.
func SetDedupInterval(dedupInterval time.Duration) {
globalDedupInterval = dedupInterval.Milliseconds()
}
// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval.
func GetDedupInterval() int64 {
return globalDedupInterval
}
var globalDedupInterval int64
func isDedupEnabled() bool {
return globalDedupInterval > 0
}
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in milliseconds.
// DeduplicateSamples treats StaleNaN (Prometheus stale markers) as values and doesn't skip them on purpose - see
// https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5587
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
if !needsDedup(srcTimestamps, dedupInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
tsNext := srcTimestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
dstValues := srcValues[:0]
for i, ts := range srcTimestamps[1:] {
if ts <= tsNext {
continue
}
// Choose the maximum value with the timestamp equal to tsPrev.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
j := i
tsPrev := srcTimestamps[j]
vPrev := srcValues[j]
for j > 0 && srcTimestamps[j-1] == tsPrev {
j--
if decimal.IsStaleNaN(srcValues[j]) {
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
vPrev = srcValues[j]
break
}
if srcValues[j] > vPrev {
vPrev = srcValues[j]
}
}
dstTimestamps = append(dstTimestamps, tsPrev)
dstValues = append(dstValues, vPrev)
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
j := len(srcTimestamps) - 1
tsPrev := srcTimestamps[j]
vPrev := srcValues[j]
// Invariant: vPrev > srcValues[j]
for j > 0 && srcTimestamps[j-1] == tsPrev {
j--
if decimal.IsStaleNaN(srcValues[j]) {
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
vPrev = srcValues[j]
break
}
if srcValues[j] > vPrev {
vPrev = srcValues[j]
}
}
dstTimestamps = append(dstTimestamps, tsPrev)
dstValues = append(dstValues, vPrev)
return dstTimestamps, dstValues
}
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
if !needsDedup(srcTimestamps, dedupInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
tsNext := srcTimestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
dstValues := srcValues[:0]
for i, ts := range srcTimestamps[1:] {
if ts <= tsNext {
continue
}
// Choose the maximum value with the timestamp equal to tsPrev.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
j := i
tsPrev := srcTimestamps[j]
vPrev := srcValues[j]
for j > 0 && srcTimestamps[j-1] == tsPrev {
j--
if decimal.IsStaleNaNInt64(srcValues[j]) {
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
vPrev = srcValues[j]
break
}
if srcValues[j] > vPrev {
vPrev = srcValues[j]
}
}
dstTimestamps = append(dstTimestamps, tsPrev)
dstValues = append(dstValues, vPrev)
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
j := len(srcTimestamps) - 1
tsPrev := srcTimestamps[j]
vPrev := srcValues[j]
if decimal.IsStaleNaNInt64(vPrev) {
// fast path - decimal.StaleNaN is always preferred to other values on interval
dstTimestamps = append(dstTimestamps, tsPrev)
dstValues = append(dstValues, vPrev)
return dstTimestamps, dstValues
}
for j > 0 && srcTimestamps[j-1] == tsPrev {
j--
if decimal.IsStaleNaNInt64(srcValues[j]) {
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
vPrev = srcValues[j]
break
}
if srcValues[j] > vPrev {
vPrev = srcValues[j]
}
}
dstTimestamps = append(dstTimestamps, tsPrev)
dstValues = append(dstValues, vPrev)
return dstTimestamps, dstValues
}
func needsDedup(timestamps []int64, dedupInterval int64) bool {
if len(timestamps) < 2 || dedupInterval <= 0 {
return false
}
tsNext := timestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
for _, ts := range timestamps[1:] {
if ts <= tsNext {
return true
}
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
return false
}