VictoriaMetrics/lib/storage/dedup.go
Roman Khavronenko 7c0ae3a86a
lib/storage: keep sample with the biggest value on timestamp conflict (#3421)
The change leaves raw sample with the biggest value for identical
timestamps per each `-dedup.minScrapeInterval` discrete interval
when the deduplication is enabled.

```
benchstat old.txt new.txt
name                                         old time/op    new time/op    delta
DeduplicateSamples/minScrapeInterval=1s-10      817ns ± 2%     832ns ± 3%      ~     (p=0.052 n=10+10)
DeduplicateSamples/minScrapeInterval=2s-10     1.56µs ± 1%    2.12µs ± 0%   +35.19%  (p=0.000 n=9+7)
DeduplicateSamples/minScrapeInterval=5s-10     1.32µs ± 3%    1.65µs ± 2%   +25.57%  (p=0.000 n=10+10)
DeduplicateSamples/minScrapeInterval=10s-10    1.13µs ± 2%    1.50µs ± 1%   +32.85%  (p=0.000 n=10+10)

name                                         old speed      new speed      delta
DeduplicateSamples/minScrapeInterval=1s-10   10.0GB/s ± 2%   9.9GB/s ± 3%      ~     (p=0.052 n=10+10)
DeduplicateSamples/minScrapeInterval=2s-10   5.24GB/s ± 1%  3.87GB/s ± 0%   -26.03%  (p=0.000 n=9+7)
DeduplicateSamples/minScrapeInterval=5s-10   6.22GB/s ± 3%  4.96GB/s ± 2%   -20.37%  (p=0.000 n=10+10)
DeduplicateSamples/minScrapeInterval=10s-10  7.28GB/s ± 2%  5.48GB/s ± 1%   -24.74%  (p=0.000 n=10+10)
```

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
Signed-off-by: hagen1778 <roman@victoriametrics.com>

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2022-12-08 18:06:11 -08:00

130 lines
3.4 KiB
Go

package storage
import (
"time"
)
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
//
// De-duplication is disabled if dedupInterval is 0.
//
// This function must be called before initializing the storage.
func SetDedupInterval(dedupInterval time.Duration) {
globalDedupInterval = dedupInterval.Milliseconds()
}
// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval.
func GetDedupInterval() int64 {
return globalDedupInterval
}
var globalDedupInterval int64
func isDedupEnabled() bool {
return globalDedupInterval > 0
}
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
if !needsDedup(srcTimestamps, dedupInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
tsNext := srcTimestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
dstValues := srcValues[:0]
var tsPrev int64
var value, valuePrev float64
for i, ts := range srcTimestamps[1:] {
value = srcValues[i]
tsCur := srcTimestamps[i]
if tsCur == tsPrev && value < valuePrev {
// prefer biggest value on timestamp conflict
value = valuePrev
}
valuePrev = value
tsPrev = tsCur
if ts <= tsNext {
continue
}
dstTimestamps = append(dstTimestamps, tsCur)
dstValues = append(dstValues, value)
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
ts := srcTimestamps[len(srcTimestamps)-1]
v := srcValues[len(srcValues)-1]
dstTimestamps = append(dstTimestamps, ts)
if ts == tsPrev && v < value {
v = value
}
dstValues = append(dstValues, v)
return dstTimestamps, dstValues
}
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
if !needsDedup(srcTimestamps, dedupInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
tsNext := srcTimestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
dstValues := srcValues[:0]
var tsPrev int64
var value, valuePrev int64
for i, ts := range srcTimestamps[1:] {
value = srcValues[i]
tsCur := srcTimestamps[i]
if tsCur == tsPrev && value < valuePrev {
// prefer biggest value on timestamp conflict
value = valuePrev
}
valuePrev = value
tsPrev = tsCur
if ts <= tsNext {
continue
}
dstTimestamps = append(dstTimestamps, tsCur)
dstValues = append(dstValues, value)
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
ts := srcTimestamps[len(srcTimestamps)-1]
v := srcValues[len(srcValues)-1]
dstTimestamps = append(dstTimestamps, ts)
if ts == tsPrev && v < value {
v = value
}
dstValues = append(dstValues, v)
return dstTimestamps, dstValues
}
func needsDedup(timestamps []int64, dedupInterval int64) bool {
if len(timestamps) < 2 || dedupInterval <= 0 {
return false
}
tsNext := timestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
for _, ts := range timestamps[1:] {
if ts <= tsNext {
return true
}
tsNext += dedupInterval
if tsNext < ts {
tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
}
}
return false
}