diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go index cf061cc95..6179ed030 100644 --- a/lib/storage/dedup.go +++ b/lib/storage/dedup.go @@ -24,9 +24,11 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, [] // Fast path - nothing to deduplicate return srcTimestamps, srcValues } + return deduplicateInternal(minScrapeInterval, srcTimestamps, srcValues) +} - // Slow path - dedup data points. - tsNext := (srcTimestamps[0] - srcTimestamps[0]%minScrapeInterval) + minScrapeInterval +func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []float64) ([]int64, []float64) { + tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval dstTimestamps := srcTimestamps[:1] dstValues := srcValues[:1] for i := 1; i < len(srcTimestamps); i++ { @@ -38,10 +40,10 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, [] dstValues = append(dstValues, srcValues[i]) // Update tsNext - tsNext += minScrapeInterval + tsNext += interval if ts >= tsNext { // Slow path for updating ts. - tsNext = (ts - ts%minScrapeInterval) + minScrapeInterval + tsNext = (ts - ts%interval) + interval } } return dstTimestamps, dstValues @@ -55,9 +57,11 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, [ // Fast path - nothing to deduplicate return srcTimestamps, srcValues } + return deduplicateDuringMergeInternal(minScrapeInterval, srcTimestamps, srcValues) +} - // Slow path - dedup data points. - tsNext := (srcTimestamps[0] - srcTimestamps[0]%minScrapeInterval) + minScrapeInterval +func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []int64) ([]int64, []int64) { + tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval dstTimestamps := srcTimestamps[:1] dstValues := srcValues[:1] for i := 1; i < len(srcTimestamps); i++ { @@ -69,25 +73,28 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, [ dstValues = append(dstValues, srcValues[i]) // Update tsNext - tsNext += minScrapeInterval + tsNext += interval if ts >= tsNext { // Slow path for updating ts. - tsNext = (ts - ts%minScrapeInterval) + minScrapeInterval + tsNext = (ts - ts%interval) + interval } } return dstTimestamps, dstValues } -func needsDedup(timestamps []int64, minDelta int64) bool { - if len(timestamps) == 0 { +func needsDedup(timestamps []int64, interval int64) bool { + if len(timestamps) == 0 || interval <= 0 { return false } - prevTimestamp := timestamps[0] + tsNext := (timestamps[0] - timestamps[0]%interval) + interval for _, ts := range timestamps[1:] { - if ts-prevTimestamp < minDelta { + if ts < tsNext { return true } - prevTimestamp = ts + tsNext += interval + if ts >= tsNext { + tsNext = (ts - ts%interval) + interval + } } return false } diff --git a/lib/storage/dedup_test.go b/lib/storage/dedup_test.go index 8c7fd2348..a17813a16 100644 --- a/lib/storage/dedup_test.go +++ b/lib/storage/dedup_test.go @@ -6,6 +6,28 @@ import ( "time" ) +func TestNeedsDedup(t *testing.T) { + f := func(interval int64, timestamps []int64, expectedResult bool) { + t.Helper() + result := needsDedup(timestamps, interval) + if result != expectedResult { + t.Fatalf("unexpected result for needsDedup(%d, %d); got %v; want %v", timestamps, interval, result, expectedResult) + } + } + f(-1, nil, false) + f(-1, []int64{1}, false) + f(0, []int64{1, 2}, false) + f(10, []int64{1}, false) + f(10, []int64{1, 2}, true) + f(10, []int64{9, 10}, false) + f(10, []int64{9, 10, 19}, true) + f(10, []int64{9, 19}, false) + f(10, []int64{0, 9, 19}, true) + f(10, []int64{0, 19}, false) + f(10, []int64{0, 35, 40}, false) + f(10, []int64{0, 35, 40, 41}, true) +} + func TestDeduplicateSamples(t *testing.T) { // Disable deduplication before exit, since the rest of tests expect disabled dedup. defer SetMinScrapeIntervalForDeduplication(0)