lib/storage: explicitly pass dedupInterval to DeduplicateSamples() and deduplicateSamplesDuringMerge()

This improves the code readability and debuggability, since the output of these functions
stops depending on global state.
This commit is contained in:
Aliaksandr Valialkin 2021-12-14 20:49:08 +02:00
parent e1a715b0f5
commit 1d20a19c7d
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 51 additions and 52 deletions

View file

@ -51,7 +51,7 @@ func main() {
logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr)
startTime := time.Now()
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)
storage.SetDedupInterval(*minScrapeInterval)
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init()
vminsert.Init()

View file

@ -468,7 +468,9 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
if firstErr != nil {
return firstErr
}
mergeSortBlocks(dst, sbs)
di := storage.GetDedupInterval()
dedupInterval := di.Milliseconds()
mergeSortBlocks(dst, sbs, dedupInterval)
return nil
}
@ -489,7 +491,7 @@ var sbPool sync.Pool
var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`)
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
// Skip empty sort blocks, since they cannot be passed to heap.Init.
src := sbh
sbh = sbh[:0]
@ -532,8 +534,7 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
putSortBlock(top)
}
}
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values)
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values, dedupInterval)
dedups := len(dst.Timestamps) - len(timestamps)
dedupsDuringSelect.Add(dedups)
dst.Timestamps = timestamps

View file

@ -147,14 +147,14 @@ func (b *Block) tooBig() bool {
return false
}
func (b *Block) deduplicateSamplesDuringMerge() {
func (b *Block) deduplicateSamplesDuringMerge(dedupInterval int64) {
if len(b.values) == 0 {
// Nothing to dedup or the data is already marshaled.
return
}
srcTimestamps := b.timestamps[b.nextIdx:]
srcValues := b.values[b.nextIdx:]
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues)
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues, dedupInterval)
dedups := len(srcTimestamps) - len(timestamps)
atomic.AddUint64(&dedupsDuringMerge, uint64(dedups))
b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)]

View file

@ -187,7 +187,9 @@ func (bsw *blockStreamWriter) MustClose() {
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) {
atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
if needDedup {
b.deduplicateSamplesDuringMerge()
di := GetDedupInterval()
dedupInterval := di.Milliseconds()
b.deduplicateSamplesDuringMerge(dedupInterval)
}
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData)

View file

@ -4,31 +4,33 @@ import (
"time"
)
// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication.
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
//
// De-duplication is disabled if interval is 0.
// De-duplication is disabled if dedupInterval is 0.
//
// This function must be called before initializing the storage.
func SetMinScrapeIntervalForDeduplication(interval time.Duration) {
minScrapeInterval = interval.Milliseconds()
func SetDedupInterval(dedupInterval time.Duration) {
globalDedupInterval = dedupInterval
}
var minScrapeInterval = int64(0)
// GetDedupInterval returns the dedup interval set via SetDedupInterval.
func GetDedupInterval() time.Duration {
return globalDedupInterval
}
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
if !needsDedup(srcTimestamps, minScrapeInterval) {
var globalDedupInterval time.Duration
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
if !needsDedup(srcTimestamps, dedupInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
return deduplicateInternal(minScrapeInterval, srcTimestamps, srcValues)
return deduplicateInternal(srcTimestamps, srcValues, dedupInterval)
}
func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
func deduplicateInternal(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
@ -40,28 +42,25 @@ func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []floa
dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += interval
tsNext += dedupInterval
if ts >= tsNext {
// Slow path for updating ts.
tsNext = (ts - ts%interval) + interval
tsNext = (ts - ts%dedupInterval) + dedupInterval
}
}
return dstTimestamps, dstValues
}
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, []int64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
if !needsDedup(srcTimestamps, minScrapeInterval) {
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
if !needsDedup(srcTimestamps, dedupInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
return deduplicateDuringMergeInternal(minScrapeInterval, srcTimestamps, srcValues)
return deduplicateDuringMergeInternal(srcTimestamps, srcValues, dedupInterval)
}
func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []int64) ([]int64, []int64) {
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
func deduplicateDuringMergeInternal(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
@ -73,27 +72,27 @@ func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []i
dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += interval
tsNext += dedupInterval
if ts >= tsNext {
// Slow path for updating ts.
tsNext = (ts - ts%interval) + interval
tsNext = (ts - ts%dedupInterval) + dedupInterval
}
}
return dstTimestamps, dstValues
}
func needsDedup(timestamps []int64, interval int64) bool {
if len(timestamps) == 0 || interval <= 0 {
func needsDedup(timestamps []int64, dedupInterval int64) bool {
if len(timestamps) == 0 || dedupInterval <= 0 {
return false
}
tsNext := (timestamps[0] - timestamps[0]%interval) + interval
tsNext := (timestamps[0] - timestamps[0]%dedupInterval) + dedupInterval
for _, ts := range timestamps[1:] {
if ts < tsNext {
return true
}
tsNext += interval
tsNext += dedupInterval
if ts >= tsNext {
tsNext = (ts - ts%interval) + interval
tsNext = (ts - ts%dedupInterval) + dedupInterval
}
}
return false

View file

@ -30,18 +30,17 @@ func TestNeedsDedup(t *testing.T) {
func TestDeduplicateSamples(t *testing.T) {
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
defer SetMinScrapeIntervalForDeduplication(0)
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) {
t.Helper()
SetMinScrapeIntervalForDeduplication(scrapeInterval)
timestampsCopy := make([]int64, len(timestamps))
values := make([]float64, len(timestamps))
for i, ts := range timestamps {
timestampsCopy[i] = ts
values[i] = float64(i)
}
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values)
dedupInterval := scrapeInterval.Milliseconds()
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval)
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
t.Fatalf("invalid DeduplicateSamples(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
}
@ -69,9 +68,9 @@ func TestDeduplicateSamples(t *testing.T) {
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
}
// Verify that the second call to DeduplicatSamples doesn't modify samples.
// Verify that the second call to DeduplicateSamples doesn't modify samples.
valuesCopy := append([]float64{}, values...)
timestampsCopy, valuesCopy = DeduplicateSamples(timestampsCopy, valuesCopy)
timestampsCopy, valuesCopy = DeduplicateSamples(timestampsCopy, valuesCopy, dedupInterval)
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
t.Fatalf("invalid DeduplicateSamples(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
}
@ -90,18 +89,17 @@ func TestDeduplicateSamples(t *testing.T) {
func TestDeduplicateSamplesDuringMerge(t *testing.T) {
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
defer SetMinScrapeIntervalForDeduplication(0)
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) {
t.Helper()
SetMinScrapeIntervalForDeduplication(scrapeInterval)
timestampsCopy := make([]int64, len(timestamps))
values := make([]int64, len(timestamps))
for i, ts := range timestamps {
timestampsCopy[i] = ts
values[i] = int64(i)
}
timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values)
dedupInterval := scrapeInterval.Milliseconds()
timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values, dedupInterval)
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
}
@ -129,9 +127,9 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) {
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
}
// Verify that the second call to DeduplicatSamples doesn't modify samples.
// Verify that the second call to DeduplicateSamples doesn't modify samples.
valuesCopy := append([]int64{}, values...)
timestampsCopy, valuesCopy = deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy)
timestampsCopy, valuesCopy = deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy, dedupInterval)
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
}

View file

@ -15,8 +15,7 @@ func BenchmarkDeduplicateSamples(b *testing.B) {
}
for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} {
b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) {
SetMinScrapeIntervalForDeduplication(minScrapeInterval)
defer SetMinScrapeIntervalForDeduplication(0)
dedupInterval := minScrapeInterval.Milliseconds()
b.ReportAllocs()
b.SetBytes(blockSize)
b.RunParallel(func(pb *testing.PB) {
@ -25,7 +24,7 @@ func BenchmarkDeduplicateSamples(b *testing.B) {
for pb.Next() {
timestampsCopy := append(timestampsCopy[:0], timestamps...)
valuesCopy := append(valuesCopy[:0], values...)
ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy)
ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy, dedupInterval)
if len(ts) == 0 || len(vs) == 0 {
panic(fmt.Errorf("expecting non-empty results; got\nts=%v\nvs=%v", ts, vs))
}