From cdfe854c9bf466d5e02bc5cc324d39633d1fd7cf Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 Dec 2021 20:49:08 +0200 Subject: [PATCH] lib/storage: explicitly pass dedupInterval to DeduplicateSamples() and deduplicateSamplesDuringMerge() This improves the code readability and debuggability, since the output of these functions stops depending on global state. --- app/vmselect/main.go | 2 +- app/vmselect/netstorage/netstorage.go | 9 ++-- app/vmstorage/main.go | 2 +- lib/storage/block.go | 4 +- lib/storage/block_stream_writer.go | 4 +- lib/storage/dedup.go | 61 +++++++++++++-------------- lib/storage/dedup_test.go | 18 ++++---- lib/storage/dedup_timing_test.go | 5 +-- 8 files changed, 52 insertions(+), 53 deletions(-) diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 9f4d82eda..f9a3343de 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -70,7 +70,7 @@ func main() { logger.Infof("starting netstorage at storageNodes %s", *storageNodes) startTime := time.Now() - storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) + storage.SetDedupInterval(*minScrapeInterval) if len(*storageNodes) == 0 { logger.Fatalf("missing -storageNode arg") } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 32b579f74..0f2363c8c 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -478,7 +478,9 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. if firstErr != nil { return firstErr } - mergeSortBlocks(dst, sbs) + di := storage.GetDedupInterval() + dedupInterval := di.Milliseconds() + mergeSortBlocks(dst, sbs, dedupInterval) return nil } @@ -499,7 +501,7 @@ var sbPool sync.Pool var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`) -func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) { +func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { // Skip empty sort blocks, since they cannot be passed to heap.Init. src := sbh sbh = sbh[:0] @@ -542,8 +544,7 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) { putSortBlock(top) } } - - timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values) + timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values, dedupInterval) dedups := len(dst.Timestamps) - len(timestamps) dedupsDuringSelect.Add(dedups) dst.Timestamps = timestamps diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 5da6b087a..fdcbbc8f5 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -58,7 +58,7 @@ func main() { buildinfo.Init() logger.Init() - storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) + storage.SetDedupInterval(*minScrapeInterval) storage.SetLogNewSeries(*logNewSeries) storage.SetFinalMergeDelay(*finalMergeDelay) storage.SetBigMergeWorkersCount(*bigMergeConcurrency) diff --git a/lib/storage/block.go b/lib/storage/block.go index c446c370e..1b67e1e7b 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -147,14 +147,14 @@ func (b *Block) tooBig() bool { return false } -func (b *Block) deduplicateSamplesDuringMerge() { +func (b *Block) deduplicateSamplesDuringMerge(dedupInterval int64) { if len(b.values) == 0 { // Nothing to dedup or the data is already marshaled. return } srcTimestamps := b.timestamps[b.nextIdx:] srcValues := b.values[b.nextIdx:] - timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues) + timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues, dedupInterval) dedups := len(srcTimestamps) - len(timestamps) atomic.AddUint64(&dedupsDuringMerge, uint64(dedups)) b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)] diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index 5421a6656..443ac5c91 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -187,7 +187,9 @@ func (bsw *blockStreamWriter) MustClose() { func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) { atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) if needDedup { - b.deduplicateSamplesDuringMerge() + di := GetDedupInterval() + dedupInterval := di.Milliseconds() + b.deduplicateSamplesDuringMerge(dedupInterval) } headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData) diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go index 6179ed030..9919924d8 100644 --- a/lib/storage/dedup.go +++ b/lib/storage/dedup.go @@ -4,31 +4,33 @@ import ( "time" ) -// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication. +// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying. // -// De-duplication is disabled if interval is 0. +// De-duplication is disabled if dedupInterval is 0. // // This function must be called before initializing the storage. -func SetMinScrapeIntervalForDeduplication(interval time.Duration) { - minScrapeInterval = interval.Milliseconds() +func SetDedupInterval(dedupInterval time.Duration) { + globalDedupInterval = dedupInterval } -var minScrapeInterval = int64(0) +// GetDedupInterval returns the dedup interval set via SetDedupInterval. +func GetDedupInterval() time.Duration { + return globalDedupInterval +} -// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval. -func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) { - if minScrapeInterval <= 0 { - return srcTimestamps, srcValues - } - if !needsDedup(srcTimestamps, minScrapeInterval) { +var globalDedupInterval time.Duration + +// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds. +func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) { + if !needsDedup(srcTimestamps, dedupInterval) { // Fast path - nothing to deduplicate return srcTimestamps, srcValues } - return deduplicateInternal(minScrapeInterval, srcTimestamps, srcValues) + return deduplicateInternal(srcTimestamps, srcValues, dedupInterval) } -func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []float64) ([]int64, []float64) { - tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval +func deduplicateInternal(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) { + tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval dstTimestamps := srcTimestamps[:1] dstValues := srcValues[:1] for i := 1; i < len(srcTimestamps); i++ { @@ -40,28 +42,25 @@ func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []floa dstValues = append(dstValues, srcValues[i]) // Update tsNext - tsNext += interval + tsNext += dedupInterval if ts >= tsNext { // Slow path for updating ts. - tsNext = (ts - ts%interval) + interval + tsNext = (ts - ts%dedupInterval) + dedupInterval } } return dstTimestamps, dstValues } -func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, []int64) { - if minScrapeInterval <= 0 { - return srcTimestamps, srcValues - } - if !needsDedup(srcTimestamps, minScrapeInterval) { +func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) { + if !needsDedup(srcTimestamps, dedupInterval) { // Fast path - nothing to deduplicate return srcTimestamps, srcValues } - return deduplicateDuringMergeInternal(minScrapeInterval, srcTimestamps, srcValues) + return deduplicateDuringMergeInternal(srcTimestamps, srcValues, dedupInterval) } -func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []int64) ([]int64, []int64) { - tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval +func deduplicateDuringMergeInternal(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) { + tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval dstTimestamps := srcTimestamps[:1] dstValues := srcValues[:1] for i := 1; i < len(srcTimestamps); i++ { @@ -73,27 +72,27 @@ func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []i dstValues = append(dstValues, srcValues[i]) // Update tsNext - tsNext += interval + tsNext += dedupInterval if ts >= tsNext { // Slow path for updating ts. - tsNext = (ts - ts%interval) + interval + tsNext = (ts - ts%dedupInterval) + dedupInterval } } return dstTimestamps, dstValues } -func needsDedup(timestamps []int64, interval int64) bool { - if len(timestamps) == 0 || interval <= 0 { +func needsDedup(timestamps []int64, dedupInterval int64) bool { + if len(timestamps) == 0 || dedupInterval <= 0 { return false } - tsNext := (timestamps[0] - timestamps[0]%interval) + interval + tsNext := (timestamps[0] - timestamps[0]%dedupInterval) + dedupInterval for _, ts := range timestamps[1:] { if ts < tsNext { return true } - tsNext += interval + tsNext += dedupInterval if ts >= tsNext { - tsNext = (ts - ts%interval) + interval + tsNext = (ts - ts%dedupInterval) + dedupInterval } } return false diff --git a/lib/storage/dedup_test.go b/lib/storage/dedup_test.go index a3f0d88bb..b206adaa2 100644 --- a/lib/storage/dedup_test.go +++ b/lib/storage/dedup_test.go @@ -30,18 +30,17 @@ func TestNeedsDedup(t *testing.T) { func TestDeduplicateSamples(t *testing.T) { // Disable deduplication before exit, since the rest of tests expect disabled dedup. - defer SetMinScrapeIntervalForDeduplication(0) f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) { t.Helper() - SetMinScrapeIntervalForDeduplication(scrapeInterval) timestampsCopy := make([]int64, len(timestamps)) values := make([]float64, len(timestamps)) for i, ts := range timestamps { timestampsCopy[i] = ts values[i] = float64(i) } - timestampsCopy, values = DeduplicateSamples(timestampsCopy, values) + dedupInterval := scrapeInterval.Milliseconds() + timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval) if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { t.Fatalf("invalid DeduplicateSamples(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) } @@ -69,9 +68,9 @@ func TestDeduplicateSamples(t *testing.T) { t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:]) } - // Verify that the second call to DeduplicatSamples doesn't modify samples. + // Verify that the second call to DeduplicateSamples doesn't modify samples. valuesCopy := append([]float64{}, values...) - timestampsCopy, valuesCopy = DeduplicateSamples(timestampsCopy, valuesCopy) + timestampsCopy, valuesCopy = DeduplicateSamples(timestampsCopy, valuesCopy, dedupInterval) if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { t.Fatalf("invalid DeduplicateSamples(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) } @@ -90,18 +89,17 @@ func TestDeduplicateSamples(t *testing.T) { func TestDeduplicateSamplesDuringMerge(t *testing.T) { // Disable deduplication before exit, since the rest of tests expect disabled dedup. - defer SetMinScrapeIntervalForDeduplication(0) f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) { t.Helper() - SetMinScrapeIntervalForDeduplication(scrapeInterval) timestampsCopy := make([]int64, len(timestamps)) values := make([]int64, len(timestamps)) for i, ts := range timestamps { timestampsCopy[i] = ts values[i] = int64(i) } - timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values) + dedupInterval := scrapeInterval.Milliseconds() + timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values, dedupInterval) if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) } @@ -129,9 +127,9 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) { t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:]) } - // Verify that the second call to DeduplicatSamples doesn't modify samples. + // Verify that the second call to DeduplicateSamples doesn't modify samples. valuesCopy := append([]int64{}, values...) - timestampsCopy, valuesCopy = deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy) + timestampsCopy, valuesCopy = deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy, dedupInterval) if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) } diff --git a/lib/storage/dedup_timing_test.go b/lib/storage/dedup_timing_test.go index 9be079403..6bdd4fc32 100644 --- a/lib/storage/dedup_timing_test.go +++ b/lib/storage/dedup_timing_test.go @@ -15,8 +15,7 @@ func BenchmarkDeduplicateSamples(b *testing.B) { } for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} { b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) { - SetMinScrapeIntervalForDeduplication(minScrapeInterval) - defer SetMinScrapeIntervalForDeduplication(0) + dedupInterval := minScrapeInterval.Milliseconds() b.ReportAllocs() b.SetBytes(blockSize) b.RunParallel(func(pb *testing.PB) { @@ -25,7 +24,7 @@ func BenchmarkDeduplicateSamples(b *testing.B) { for pb.Next() { timestampsCopy := append(timestampsCopy[:0], timestamps...) valuesCopy := append(valuesCopy[:0], values...) - ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy) + ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy, dedupInterval) if len(ts) == 0 || len(vs) == 0 { panic(fmt.Errorf("expecting non-empty results; got\nts=%v\nvs=%v", ts, vs)) }