mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: correctly handle -dedup.minScrapeInterval
values smaller than 8ms
Such small values may be used for removing samples with duplicate timestamps. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/409 for details.
This commit is contained in:
parent
7d73623c69
commit
ded0c0d3c7
2 changed files with 140 additions and 2 deletions
|
@ -1,6 +1,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -23,7 +24,15 @@ func getMinDelta() int64 {
|
|||
// 10 25 41
|
||||
// When using 7/8 of minScrapeInterval, it will be converted to the expected:
|
||||
// 10 19 30 41
|
||||
return (minScrapeInterval.Milliseconds() / 8) * 7
|
||||
ms := minScrapeInterval.Milliseconds()
|
||||
|
||||
// Try calculating scrape interval via integer arithmetic.
|
||||
d := (ms / 8) * 7
|
||||
if d > 0 {
|
||||
return d
|
||||
}
|
||||
// Too small scrape interval for integer arithmetic. Calculate d using floating-point arithmetic.
|
||||
return int64(math.Round(float64(ms) / 8 * 7))
|
||||
}
|
||||
|
||||
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
|
||||
|
@ -53,7 +62,7 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []
|
|||
return dstTimestamps, dstValues
|
||||
}
|
||||
|
||||
func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) {
|
||||
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, []int64) {
|
||||
if minScrapeInterval <= 0 {
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
|
|
129
lib/storage/dedup_test.go
Normal file
129
lib/storage/dedup_test.go
Normal file
|
@ -0,0 +1,129 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGetMinDelta(t *testing.T) {
|
||||
f := func(scrapeInterval time.Duration, dExpected int64) {
|
||||
t.Helper()
|
||||
SetMinScrapeIntervalForDeduplication(scrapeInterval)
|
||||
d := getMinDelta()
|
||||
if d != dExpected {
|
||||
t.Fatalf("unexpected getMinDelta(%s); got %d; want %d", scrapeInterval, d, dExpected)
|
||||
}
|
||||
}
|
||||
f(0, 0)
|
||||
f(time.Millisecond, 1)
|
||||
f(5*time.Millisecond, 4)
|
||||
f(8*time.Millisecond, 7)
|
||||
f(100*time.Millisecond, 84)
|
||||
f(time.Second, 875)
|
||||
f(10*time.Second, 8750)
|
||||
}
|
||||
|
||||
func TestDeduplicateSamples(t *testing.T) {
|
||||
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) {
|
||||
t.Helper()
|
||||
SetMinScrapeIntervalForDeduplication(scrapeInterval)
|
||||
timestampsCopy := make([]int64, len(timestamps))
|
||||
values := make([]float64, len(timestamps))
|
||||
for i, ts := range timestamps {
|
||||
timestampsCopy[i] = ts
|
||||
values[i] = float64(i)
|
||||
}
|
||||
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values)
|
||||
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||
t.Fatalf("invalid DeduplicateSamples(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||
}
|
||||
// Verify values
|
||||
if len(timestampsCopy) == 0 {
|
||||
if len(values) != 0 {
|
||||
t.Fatalf("values must be empty; got %v", values)
|
||||
}
|
||||
return
|
||||
}
|
||||
j := 0
|
||||
for i, ts := range timestamps {
|
||||
if ts != timestampsCopy[j] {
|
||||
continue
|
||||
}
|
||||
if values[j] != float64(i) {
|
||||
t.Fatalf("unexpected value at index %d; got %v; want %v; values: %v", j, values[j], i, values)
|
||||
}
|
||||
j++
|
||||
if j == len(timestampsCopy) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if j != len(timestampsCopy) {
|
||||
t.Fatalf("superflouos timestamps found starting from index %d: %v", j, timestampsCopy[j:])
|
||||
}
|
||||
}
|
||||
f(time.Millisecond, nil, []int64{})
|
||||
f(time.Millisecond, []int64{123}, []int64{123})
|
||||
f(time.Millisecond, []int64{123, 456}, []int64{123, 456})
|
||||
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4})
|
||||
f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4})
|
||||
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000})
|
||||
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 39e3})
|
||||
}
|
||||
|
||||
func TestDeduplicateSamplesDuringMerge(t *testing.T) {
|
||||
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) {
|
||||
t.Helper()
|
||||
SetMinScrapeIntervalForDeduplication(scrapeInterval)
|
||||
timestampsCopy := make([]int64, len(timestamps))
|
||||
values := make([]int64, len(timestamps))
|
||||
for i, ts := range timestamps {
|
||||
timestampsCopy[i] = ts
|
||||
values[i] = int64(i)
|
||||
}
|
||||
timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values)
|
||||
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||
}
|
||||
// Verify values
|
||||
if len(timestampsCopy) == 0 {
|
||||
if len(values) != 0 {
|
||||
t.Fatalf("values must be empty; got %v", values)
|
||||
}
|
||||
return
|
||||
}
|
||||
j := 0
|
||||
for i, ts := range timestamps {
|
||||
if ts != timestampsCopy[j] {
|
||||
continue
|
||||
}
|
||||
if values[j] != int64(i) {
|
||||
t.Fatalf("unexpected value at index %d; got %v; want %v; values: %v", j, values[j], i, values)
|
||||
}
|
||||
j++
|
||||
if j == len(timestampsCopy) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if j != len(timestampsCopy) {
|
||||
t.Fatalf("superflouos timestamps found starting from index %d: %v", j, timestampsCopy[j:])
|
||||
}
|
||||
}
|
||||
f(time.Millisecond, nil, []int64{})
|
||||
f(time.Millisecond, []int64{123}, []int64{123})
|
||||
f(time.Millisecond, []int64{123, 456}, []int64{123, 456})
|
||||
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4})
|
||||
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000})
|
||||
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3})
|
||||
|
||||
var timestamps, timestampsExpected []int64
|
||||
for i := 0; i < 40; i++ {
|
||||
timestamps = append(timestamps, int64(i*1000))
|
||||
if i%2 == 0 {
|
||||
timestampsExpected = append(timestampsExpected, int64(i*1000))
|
||||
}
|
||||
}
|
||||
f(0, timestamps, timestamps)
|
||||
f(time.Second, timestamps, timestamps)
|
||||
f(2*time.Second, timestamps, timestampsExpected)
|
||||
}
|
Loading…
Reference in a new issue