VictoriaMetrics/lib/storage/dedup.go
Aliaksandr Valialkin 8ca2799478 lib/storage: properly determine when the deduplication is needed in needsDedup
Previously needsDedup() could return true if the de-duplication wasn't needed for the following case:

         d < interval
           /     \
   |        v | v        |
     interval   interval

Now it properly returns false for this case
2021-07-12 10:53:30 +03:00

100 lines
2.8 KiB
Go

package storage
import (
"time"
)
// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication.
//
// De-duplication is disabled if interval is 0.
//
// This function must be called before initializing the storage.
func SetMinScrapeIntervalForDeduplication(interval time.Duration) {
minScrapeInterval = interval.Milliseconds()
}
var minScrapeInterval = int64(0)
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
if !needsDedup(srcTimestamps, minScrapeInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
return deduplicateInternal(minScrapeInterval, srcTimestamps, srcValues)
}
func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts < tsNext {
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += interval
if ts >= tsNext {
// Slow path for updating ts.
tsNext = (ts - ts%interval) + interval
}
}
return dstTimestamps, dstValues
}
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, []int64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
if !needsDedup(srcTimestamps, minScrapeInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
return deduplicateDuringMergeInternal(minScrapeInterval, srcTimestamps, srcValues)
}
func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []int64) ([]int64, []int64) {
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts < tsNext {
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += interval
if ts >= tsNext {
// Slow path for updating ts.
tsNext = (ts - ts%interval) + interval
}
}
return dstTimestamps, dstValues
}
func needsDedup(timestamps []int64, interval int64) bool {
if len(timestamps) == 0 || interval <= 0 {
return false
}
tsNext := (timestamps[0] - timestamps[0]%interval) + interval
for _, ts := range timestamps[1:] {
if ts < tsNext {
return true
}
tsNext += interval
if ts >= tsNext {
tsNext = (ts - ts%interval) + interval
}
}
return false
}