mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
cdfe854c9b
This improves the code readability and debuggability, since the output of these functions stops depending on global state.
99 lines
2.9 KiB
Go
99 lines
2.9 KiB
Go
package storage
|
|
|
|
import (
|
|
"time"
|
|
)
|
|
|
|
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
|
|
//
|
|
// De-duplication is disabled if dedupInterval is 0.
|
|
//
|
|
// This function must be called before initializing the storage.
|
|
func SetDedupInterval(dedupInterval time.Duration) {
|
|
globalDedupInterval = dedupInterval
|
|
}
|
|
|
|
// GetDedupInterval returns the dedup interval set via SetDedupInterval.
|
|
func GetDedupInterval() time.Duration {
|
|
return globalDedupInterval
|
|
}
|
|
|
|
var globalDedupInterval time.Duration
|
|
|
|
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds.
|
|
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
|
|
if !needsDedup(srcTimestamps, dedupInterval) {
|
|
// Fast path - nothing to deduplicate
|
|
return srcTimestamps, srcValues
|
|
}
|
|
return deduplicateInternal(srcTimestamps, srcValues, dedupInterval)
|
|
}
|
|
|
|
func deduplicateInternal(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
|
|
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval
|
|
dstTimestamps := srcTimestamps[:1]
|
|
dstValues := srcValues[:1]
|
|
for i := 1; i < len(srcTimestamps); i++ {
|
|
ts := srcTimestamps[i]
|
|
if ts < tsNext {
|
|
continue
|
|
}
|
|
dstTimestamps = append(dstTimestamps, ts)
|
|
dstValues = append(dstValues, srcValues[i])
|
|
|
|
// Update tsNext
|
|
tsNext += dedupInterval
|
|
if ts >= tsNext {
|
|
// Slow path for updating ts.
|
|
tsNext = (ts - ts%dedupInterval) + dedupInterval
|
|
}
|
|
}
|
|
return dstTimestamps, dstValues
|
|
}
|
|
|
|
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
|
|
if !needsDedup(srcTimestamps, dedupInterval) {
|
|
// Fast path - nothing to deduplicate
|
|
return srcTimestamps, srcValues
|
|
}
|
|
return deduplicateDuringMergeInternal(srcTimestamps, srcValues, dedupInterval)
|
|
}
|
|
|
|
func deduplicateDuringMergeInternal(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
|
|
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval
|
|
dstTimestamps := srcTimestamps[:1]
|
|
dstValues := srcValues[:1]
|
|
for i := 1; i < len(srcTimestamps); i++ {
|
|
ts := srcTimestamps[i]
|
|
if ts < tsNext {
|
|
continue
|
|
}
|
|
dstTimestamps = append(dstTimestamps, ts)
|
|
dstValues = append(dstValues, srcValues[i])
|
|
|
|
// Update tsNext
|
|
tsNext += dedupInterval
|
|
if ts >= tsNext {
|
|
// Slow path for updating ts.
|
|
tsNext = (ts - ts%dedupInterval) + dedupInterval
|
|
}
|
|
}
|
|
return dstTimestamps, dstValues
|
|
}
|
|
|
|
func needsDedup(timestamps []int64, dedupInterval int64) bool {
|
|
if len(timestamps) == 0 || dedupInterval <= 0 {
|
|
return false
|
|
}
|
|
tsNext := (timestamps[0] - timestamps[0]%dedupInterval) + dedupInterval
|
|
for _, ts := range timestamps[1:] {
|
|
if ts < tsNext {
|
|
return true
|
|
}
|
|
tsNext += dedupInterval
|
|
if ts >= tsNext {
|
|
tsNext = (ts - ts%dedupInterval) + dedupInterval
|
|
}
|
|
}
|
|
return false
|
|
}
|