From a0000c3a6e32ad311b49c91303de29c331c02fa4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 26 Apr 2020 13:04:58 +0300 Subject: [PATCH] lib/storage: improve deduplication algorithm Now it leaves only the first data point on each `-dedup.minScrapeInterval` interval. Previously it may leave two data points on the interval. This could lead to unexpected results for `histogram_quantile(phi, sum(rate(buckets)) by (le))` query. --- lib/storage/dedup.go | 59 ++++++++++++-------------------- lib/storage/dedup_test.go | 31 +++-------------- lib/storage/dedup_timing_test.go | 36 +++++++++++++++++++ 3 files changed, 63 insertions(+), 63 deletions(-) create mode 100644 lib/storage/dedup_timing_test.go diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go index 70f060ac1..96cd1daef 100644 --- a/lib/storage/dedup.go +++ b/lib/storage/dedup.go @@ -1,7 +1,6 @@ package storage import ( - "math" "time" ) @@ -11,53 +10,39 @@ import ( // // This function must be called before initializing the storage. func SetMinScrapeIntervalForDeduplication(interval time.Duration) { - minScrapeInterval = interval + minScrapeInterval = interval.Milliseconds() } -var minScrapeInterval = time.Duration(0) - -func getMinDelta() int64 { - // Use 7/8 of minScrapeInterval in order to preserve proper data points. - // For instance, if minScrapeInterval=10, the following time series: - // 10 15 19 25 30 34 41 - // Would be unexpectedly converted to if using 100% of minScrapeInterval: - // 10 25 41 - // When using 7/8 of minScrapeInterval, it will be converted to the expected: - // 10 19 30 41 - ms := minScrapeInterval.Milliseconds() - - // Try calculating scrape interval via integer arithmetic. - d := (ms / 8) * 7 - if d > 0 { - return d - } - // Too small scrape interval for integer arithmetic. Calculate d using floating-point arithmetic. - return int64(math.Round(float64(ms) / 8 * 7)) -} +var minScrapeInterval = int64(0) // DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval. func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) { if minScrapeInterval <= 0 { return srcTimestamps, srcValues } - minDelta := getMinDelta() - if !needsDedup(srcTimestamps, minDelta) { + if !needsDedup(srcTimestamps, minScrapeInterval) { // Fast path - nothing to deduplicate return srcTimestamps, srcValues } // Slow path - dedup data points. - prevTimestamp := srcTimestamps[0] + tsNext := (srcTimestamps[0] - srcTimestamps[0] % minScrapeInterval) + minScrapeInterval dstTimestamps := srcTimestamps[:1] dstValues := srcValues[:1] for i := 1; i < len(srcTimestamps); i++ { ts := srcTimestamps[i] - if ts-prevTimestamp < minDelta { + if ts < tsNext { continue } dstTimestamps = append(dstTimestamps, ts) dstValues = append(dstValues, srcValues[i]) - prevTimestamp = ts + + // Update tsNext + tsNext += minScrapeInterval + if ts >= tsNext { + // Slow path for updating ts. + tsNext = (ts - ts % minScrapeInterval) + minScrapeInterval + } } return dstTimestamps, dstValues } @@ -66,29 +51,29 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, [ if minScrapeInterval <= 0 { return srcTimestamps, srcValues } - if len(srcTimestamps) < 32 { - // Do not de-duplicate small number of samples during merge - // in order to improve deduplication accuracy on later stages. - return srcTimestamps, srcValues - } - minDelta := getMinDelta() - if !needsDedup(srcTimestamps, minDelta) { + if !needsDedup(srcTimestamps, minScrapeInterval) { // Fast path - nothing to deduplicate return srcTimestamps, srcValues } // Slow path - dedup data points. - prevTimestamp := srcTimestamps[0] + tsNext := (srcTimestamps[0] - srcTimestamps[0] % minScrapeInterval) + minScrapeInterval dstTimestamps := srcTimestamps[:1] dstValues := srcValues[:1] for i := 1; i < len(srcTimestamps); i++ { ts := srcTimestamps[i] - if ts-prevTimestamp < minDelta { + if ts < tsNext { continue } dstTimestamps = append(dstTimestamps, ts) dstValues = append(dstValues, srcValues[i]) - prevTimestamp = ts + + // Update tsNext + tsNext += minScrapeInterval + if ts >= tsNext { + // Slow path for updating ts. + tsNext = (ts - ts % minScrapeInterval) + minScrapeInterval + } } return dstTimestamps, dstValues } diff --git a/lib/storage/dedup_test.go b/lib/storage/dedup_test.go index b33d5b5d0..9ac7ba0d5 100644 --- a/lib/storage/dedup_test.go +++ b/lib/storage/dedup_test.go @@ -6,27 +6,6 @@ import ( "time" ) -func TestGetMinDelta(t *testing.T) { - // Disable deduplication before exit, since the rest of tests expect disabled dedup. - defer SetMinScrapeIntervalForDeduplication(0) - - f := func(scrapeInterval time.Duration, dExpected int64) { - t.Helper() - SetMinScrapeIntervalForDeduplication(scrapeInterval) - d := getMinDelta() - if d != dExpected { - t.Fatalf("unexpected getMinDelta(%s); got %d; want %d", scrapeInterval, d, dExpected) - } - } - f(0, 0) - f(time.Millisecond, 1) - f(5*time.Millisecond, 4) - f(8*time.Millisecond, 7) - f(100*time.Millisecond, 84) - f(time.Second, 875) - f(10*time.Second, 8750) -} - func TestDeduplicateSamples(t *testing.T) { // Disable deduplication before exit, since the rest of tests expect disabled dedup. defer SetMinScrapeIntervalForDeduplication(0) @@ -73,8 +52,8 @@ func TestDeduplicateSamples(t *testing.T) { f(time.Millisecond, []int64{123, 456}, []int64{123, 456}) f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}) f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}) - f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000}) - f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 39e3}) + f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 205, 300, 1000}) + f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3}) } func TestDeduplicateSamplesDuringMerge(t *testing.T) { @@ -121,9 +100,9 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) { f(time.Millisecond, nil, []int64{}) f(time.Millisecond, []int64{123}, []int64{123}) f(time.Millisecond, []int64{123, 456}, []int64{123, 456}) - f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}) - f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}) - f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}) + f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}) + f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000}) + f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3}) var timestamps, timestampsExpected []int64 for i := 0; i < 40; i++ { diff --git a/lib/storage/dedup_timing_test.go b/lib/storage/dedup_timing_test.go new file mode 100644 index 000000000..9be079403 --- /dev/null +++ b/lib/storage/dedup_timing_test.go @@ -0,0 +1,36 @@ +package storage + +import ( + "fmt" + "testing" + "time" +) + +func BenchmarkDeduplicateSamples(b *testing.B) { + const blockSize = 8192 + timestamps := make([]int64, blockSize) + values := make([]float64, blockSize) + for i := 0; i < len(timestamps); i++ { + timestamps[i] = int64(i) * 1e3 + } + for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} { + b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) { + SetMinScrapeIntervalForDeduplication(minScrapeInterval) + defer SetMinScrapeIntervalForDeduplication(0) + b.ReportAllocs() + b.SetBytes(blockSize) + b.RunParallel(func(pb *testing.PB) { + timestampsCopy := make([]int64, 0, blockSize) + valuesCopy := make([]float64, 0, blockSize) + for pb.Next() { + timestampsCopy := append(timestampsCopy[:0], timestamps...) + valuesCopy := append(valuesCopy[:0], values...) + ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy) + if len(ts) == 0 || len(vs) == 0 { + panic(fmt.Errorf("expecting non-empty results; got\nts=%v\nvs=%v", ts, vs)) + } + } + }) + }) + } +}