2020-01-30 23:09:44 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
2020-02-10 11:03:52 +00:00
|
|
|
"time"
|
2020-01-30 23:09:44 +00:00
|
|
|
)
|
|
|
|
|
2021-12-14 18:49:08 +00:00
|
|
|
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
|
2020-02-10 11:03:52 +00:00
|
|
|
//
|
2021-12-14 18:49:08 +00:00
|
|
|
// De-duplication is disabled if dedupInterval is 0.
|
2020-02-10 11:03:52 +00:00
|
|
|
//
|
|
|
|
// This function must be called before initializing the storage.
|
2021-12-14 18:49:08 +00:00
|
|
|
func SetDedupInterval(dedupInterval time.Duration) {
|
2021-12-15 11:26:35 +00:00
|
|
|
globalDedupInterval = dedupInterval.Milliseconds()
|
2020-02-10 11:03:52 +00:00
|
|
|
}
|
|
|
|
|
2021-12-15 11:26:35 +00:00
|
|
|
// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval.
|
|
|
|
func GetDedupInterval() int64 {
|
2021-12-14 18:49:08 +00:00
|
|
|
return globalDedupInterval
|
|
|
|
}
|
2020-01-30 23:09:44 +00:00
|
|
|
|
2021-12-15 11:26:35 +00:00
|
|
|
var globalDedupInterval int64
|
2021-12-14 18:49:08 +00:00
|
|
|
|
2021-12-15 13:58:27 +00:00
|
|
|
func isDedupEnabled() bool {
|
|
|
|
return globalDedupInterval > 0
|
|
|
|
}
|
|
|
|
|
2021-12-14 18:49:08 +00:00
|
|
|
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds.
|
|
|
|
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
|
|
|
|
if !needsDedup(srcTimestamps, dedupInterval) {
|
2020-01-30 23:09:44 +00:00
|
|
|
// Fast path - nothing to deduplicate
|
|
|
|
return srcTimestamps, srcValues
|
|
|
|
}
|
2022-05-02 18:35:14 +00:00
|
|
|
tsNext := srcTimestamps[0] + dedupInterval - 1
|
|
|
|
tsNext -= tsNext % dedupInterval
|
|
|
|
dstTimestamps := srcTimestamps[:0]
|
|
|
|
dstValues := srcValues[:0]
|
|
|
|
for i, ts := range srcTimestamps[1:] {
|
|
|
|
if ts <= tsNext {
|
2020-01-30 23:09:44 +00:00
|
|
|
continue
|
|
|
|
}
|
2022-12-09 02:10:18 +00:00
|
|
|
// Choose the maximum value with the timestamp equal to tsPrev.
|
|
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
|
|
|
|
j := i
|
|
|
|
tsPrev := srcTimestamps[j]
|
|
|
|
vPrev := srcValues[j]
|
|
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
|
|
j--
|
|
|
|
if srcValues[j] > vPrev {
|
|
|
|
vPrev = srcValues[j]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
|
|
dstValues = append(dstValues, vPrev)
|
2021-12-14 18:49:08 +00:00
|
|
|
tsNext += dedupInterval
|
2022-05-02 18:35:14 +00:00
|
|
|
if tsNext < ts {
|
|
|
|
tsNext = ts + dedupInterval - 1
|
|
|
|
tsNext -= tsNext % dedupInterval
|
2020-04-26 10:04:58 +00:00
|
|
|
}
|
2020-01-30 23:09:44 +00:00
|
|
|
}
|
2022-12-09 02:10:18 +00:00
|
|
|
j := len(srcTimestamps) - 1
|
|
|
|
tsPrev := srcTimestamps[j]
|
|
|
|
vPrev := srcValues[j]
|
|
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
|
|
j--
|
|
|
|
if srcValues[j] > vPrev {
|
|
|
|
vPrev = srcValues[j]
|
|
|
|
}
|
2022-12-09 02:06:11 +00:00
|
|
|
}
|
2022-12-09 02:10:18 +00:00
|
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
|
|
dstValues = append(dstValues, vPrev)
|
2020-01-30 23:09:44 +00:00
|
|
|
return dstTimestamps, dstValues
|
|
|
|
}
|
|
|
|
|
2021-12-14 18:49:08 +00:00
|
|
|
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
|
|
|
|
if !needsDedup(srcTimestamps, dedupInterval) {
|
2020-01-30 23:09:44 +00:00
|
|
|
// Fast path - nothing to deduplicate
|
|
|
|
return srcTimestamps, srcValues
|
|
|
|
}
|
2022-05-02 18:35:14 +00:00
|
|
|
tsNext := srcTimestamps[0] + dedupInterval - 1
|
|
|
|
tsNext -= tsNext % dedupInterval
|
|
|
|
dstTimestamps := srcTimestamps[:0]
|
|
|
|
dstValues := srcValues[:0]
|
|
|
|
for i, ts := range srcTimestamps[1:] {
|
|
|
|
if ts <= tsNext {
|
2020-01-30 23:09:44 +00:00
|
|
|
continue
|
|
|
|
}
|
2022-12-09 02:10:18 +00:00
|
|
|
// Choose the maximum value with the timestamp equal to tsPrev.
|
|
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
|
|
|
|
j := i
|
|
|
|
tsPrev := srcTimestamps[j]
|
|
|
|
vPrev := srcValues[j]
|
|
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
|
|
j--
|
|
|
|
if srcValues[j] > vPrev {
|
|
|
|
vPrev = srcValues[j]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
|
|
dstValues = append(dstValues, vPrev)
|
2021-12-14 18:49:08 +00:00
|
|
|
tsNext += dedupInterval
|
2022-05-02 18:35:14 +00:00
|
|
|
if tsNext < ts {
|
|
|
|
tsNext = ts + dedupInterval - 1
|
|
|
|
tsNext -= tsNext % dedupInterval
|
2020-04-26 10:04:58 +00:00
|
|
|
}
|
2020-01-30 23:09:44 +00:00
|
|
|
}
|
2022-12-09 02:10:18 +00:00
|
|
|
j := len(srcTimestamps) - 1
|
|
|
|
tsPrev := srcTimestamps[j]
|
|
|
|
vPrev := srcValues[j]
|
|
|
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
|
|
|
j--
|
|
|
|
if srcValues[j] > vPrev {
|
|
|
|
vPrev = srcValues[j]
|
|
|
|
}
|
2022-12-09 02:06:11 +00:00
|
|
|
}
|
2022-12-09 02:10:18 +00:00
|
|
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
|
|
|
dstValues = append(dstValues, vPrev)
|
2020-01-30 23:09:44 +00:00
|
|
|
return dstTimestamps, dstValues
|
|
|
|
}
|
|
|
|
|
2021-12-14 18:49:08 +00:00
|
|
|
func needsDedup(timestamps []int64, dedupInterval int64) bool {
|
2022-05-02 18:35:14 +00:00
|
|
|
if len(timestamps) < 2 || dedupInterval <= 0 {
|
2020-01-30 23:09:44 +00:00
|
|
|
return false
|
|
|
|
}
|
2022-05-02 18:35:14 +00:00
|
|
|
tsNext := timestamps[0] + dedupInterval - 1
|
|
|
|
tsNext -= tsNext % dedupInterval
|
2020-01-30 23:09:44 +00:00
|
|
|
for _, ts := range timestamps[1:] {
|
2022-05-02 18:35:14 +00:00
|
|
|
if ts <= tsNext {
|
2020-01-30 23:09:44 +00:00
|
|
|
return true
|
|
|
|
}
|
2021-12-14 18:49:08 +00:00
|
|
|
tsNext += dedupInterval
|
2022-05-02 18:35:14 +00:00
|
|
|
if tsNext < ts {
|
|
|
|
tsNext = ts + dedupInterval - 1
|
|
|
|
tsNext -= tsNext % dedupInterval
|
2021-07-12 07:42:54 +00:00
|
|
|
}
|
2020-01-30 23:09:44 +00:00
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|