mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00

Previously, during de-duplication staleness markers could be removed due to incorrect logic at values equality check. During the evaluation of read query vmselect deduplicates samples using dedupInterval option. It picks the highest value across all points with the same timestamp next to the border of dedupInterval. The issue is any comparison with NaN via <, > returns false. This means that the position of NaN in srcValues could affect the result. This commit changes this logic with additional step, that explicitly checks for staleness marker for the following cases: 1. Deduplication on vmselect 2. Deduplication in vmstorage during merges 3. Deduplication in stream aggregation check performed only for stale markers, because other NaNs are rejected on ingestion by vmstorage or by stream aggregation. Checking for stale markers in general slows down dedup speed by 3%: ``` benchstat old.txt new.txt goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/storage cpu: Apple M4 Pro │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ DeduplicateSamples/minScrapeInterval=1s-14 462.8n ± ∞ ¹ 425.2n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamples/minScrapeInterval=2s-14 905.6n ± ∞ ¹ 903.3n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamples/minScrapeInterval=5s-14 710.0n ± ∞ ¹ 698.9n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamples/minScrapeInterval=10s-14 632.7n ± ∞ ¹ 638.5n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=1s-14 439.7n ± ∞ ¹ 409.9n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=2s-14 908.9n ± ∞ ¹ 882.2n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=5s-14 721.2n ± ∞ ¹ 684.7n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=10s-14 659.1n ± ∞ ¹ 630.6n ± ∞ ¹ ~ (p=1.000 n=1) ² geomean 659.5n 636.0n -3.56% ``` Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674 --------- Co-authored-by: hagen1778 <roman@victoriametrics.com>
172 lines
5.1 KiB
Go
172 lines
5.1 KiB
Go
package storage
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
)
|
|
|
|
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
|
|
//
|
|
// De-duplication is disabled if dedupInterval is 0.
|
|
//
|
|
// This function must be called before initializing the storage.
|
|
func SetDedupInterval(dedupInterval time.Duration) {
|
|
globalDedupInterval = dedupInterval.Milliseconds()
|
|
}
|
|
|
|
// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval.
|
|
func GetDedupInterval() int64 {
|
|
return globalDedupInterval
|
|
}
|
|
|
|
var globalDedupInterval int64
|
|
|
|
func isDedupEnabled() bool {
|
|
return globalDedupInterval > 0
|
|
}
|
|
|
|
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in milliseconds.
|
|
// DeduplicateSamples treats StaleNaN (Prometheus stale markers) as values and doesn't skip them on purpose - see
|
|
// https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5587
|
|
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
|
|
if !needsDedup(srcTimestamps, dedupInterval) {
|
|
// Fast path - nothing to deduplicate
|
|
return srcTimestamps, srcValues
|
|
}
|
|
tsNext := srcTimestamps[0] + dedupInterval - 1
|
|
tsNext -= tsNext % dedupInterval
|
|
dstTimestamps := srcTimestamps[:0]
|
|
dstValues := srcValues[:0]
|
|
for i, ts := range srcTimestamps[1:] {
|
|
if ts <= tsNext {
|
|
continue
|
|
}
|
|
// Choose the maximum value with the timestamp equal to tsPrev.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
|
|
j := i
|
|
tsPrev := srcTimestamps[j]
|
|
vPrev := srcValues[j]
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
j--
|
|
if decimal.IsStaleNaN(srcValues[j]) {
|
|
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
|
|
vPrev = srcValues[j]
|
|
break
|
|
}
|
|
if srcValues[j] > vPrev {
|
|
vPrev = srcValues[j]
|
|
}
|
|
}
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
dstValues = append(dstValues, vPrev)
|
|
tsNext += dedupInterval
|
|
if tsNext < ts {
|
|
tsNext = ts + dedupInterval - 1
|
|
tsNext -= tsNext % dedupInterval
|
|
}
|
|
}
|
|
j := len(srcTimestamps) - 1
|
|
tsPrev := srcTimestamps[j]
|
|
vPrev := srcValues[j]
|
|
// Invariant: vPrev > srcValues[j]
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
j--
|
|
if decimal.IsStaleNaN(srcValues[j]) {
|
|
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
|
|
vPrev = srcValues[j]
|
|
break
|
|
}
|
|
if srcValues[j] > vPrev {
|
|
vPrev = srcValues[j]
|
|
}
|
|
}
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
dstValues = append(dstValues, vPrev)
|
|
return dstTimestamps, dstValues
|
|
}
|
|
|
|
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
|
|
if !needsDedup(srcTimestamps, dedupInterval) {
|
|
// Fast path - nothing to deduplicate
|
|
return srcTimestamps, srcValues
|
|
}
|
|
tsNext := srcTimestamps[0] + dedupInterval - 1
|
|
tsNext -= tsNext % dedupInterval
|
|
dstTimestamps := srcTimestamps[:0]
|
|
dstValues := srcValues[:0]
|
|
for i, ts := range srcTimestamps[1:] {
|
|
if ts <= tsNext {
|
|
continue
|
|
}
|
|
// Choose the maximum value with the timestamp equal to tsPrev.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
|
|
j := i
|
|
tsPrev := srcTimestamps[j]
|
|
vPrev := srcValues[j]
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
j--
|
|
if decimal.IsStaleNaNInt64(srcValues[j]) {
|
|
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
|
|
vPrev = srcValues[j]
|
|
break
|
|
}
|
|
if srcValues[j] > vPrev {
|
|
vPrev = srcValues[j]
|
|
}
|
|
}
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
dstValues = append(dstValues, vPrev)
|
|
tsNext += dedupInterval
|
|
if tsNext < ts {
|
|
tsNext = ts + dedupInterval - 1
|
|
tsNext -= tsNext % dedupInterval
|
|
}
|
|
}
|
|
j := len(srcTimestamps) - 1
|
|
tsPrev := srcTimestamps[j]
|
|
vPrev := srcValues[j]
|
|
if decimal.IsStaleNaNInt64(vPrev) {
|
|
// fast path - decimal.StaleNaN is always preferred to other values on interval
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
dstValues = append(dstValues, vPrev)
|
|
return dstTimestamps, dstValues
|
|
}
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
j--
|
|
if decimal.IsStaleNaNInt64(srcValues[j]) {
|
|
// always prefer decimal.IsStaleNaN to avoid inconsistency when comparing values
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674
|
|
vPrev = srcValues[j]
|
|
break
|
|
}
|
|
if srcValues[j] > vPrev {
|
|
vPrev = srcValues[j]
|
|
}
|
|
}
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
dstValues = append(dstValues, vPrev)
|
|
return dstTimestamps, dstValues
|
|
}
|
|
|
|
func needsDedup(timestamps []int64, dedupInterval int64) bool {
|
|
if len(timestamps) < 2 || dedupInterval <= 0 {
|
|
return false
|
|
}
|
|
tsNext := timestamps[0] + dedupInterval - 1
|
|
tsNext -= tsNext % dedupInterval
|
|
for _, ts := range timestamps[1:] {
|
|
if ts <= tsNext {
|
|
return true
|
|
}
|
|
tsNext += dedupInterval
|
|
if tsNext < ts {
|
|
tsNext = ts + dedupInterval - 1
|
|
tsNext -= tsNext % dedupInterval
|
|
}
|
|
}
|
|
return false
|
|
}
|