lib/storage: leave the last sample per each discrete interval during the deduplicaton

This aligns better with staleness logic in Prometheus - https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness
This commit is contained in:
Aliaksandr Valialkin 2022-05-02 21:35:14 +03:00
parent 7ca32c21c8
commit 361b08c30e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
9 changed files with 75 additions and 124 deletions

View file

@ -639,7 +639,7 @@ Below is the output for `/path/to/vmselect -help`:
-cluster.tlsKeyFile string -cluster.tlsKeyFile string
Path to client-side TLS key file to use when connecting to -storageNode if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection Path to client-side TLS key file to use when connecting to -storageNode if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection
-dedup.minScrapeInterval duration -dedup.minScrapeInterval duration
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
-downsampling.period array -downsampling.period array
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
@ -791,7 +791,7 @@ Below is the output for `/path/to/vmstorage -help`:
-cluster.tlsKeyFile string -cluster.tlsKeyFile string
Path to server-side TLS key file to use when accepting connections from vminsert and vmselect if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection Path to server-side TLS key file to use when accepting connections from vminsert and vmselect if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection
-dedup.minScrapeInterval duration -dedup.minScrapeInterval duration
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
-denyQueriesOutsideRetention -denyQueriesOutsideRetention
Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee
-downsampling.period array -downsampling.period array

View file

@ -37,7 +37,7 @@ var (
"It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration") "It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+ maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration") "limit is reached; see also -search.maxQueryDuration")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the first sample in every time series per each discrete interval "+ minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details") "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details")
resetCacheAuthKey = flag.String("search.resetCacheAuthKey", "", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call") resetCacheAuthKey = flag.String("search.resetCacheAuthKey", "", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call")
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging") logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging")

View file

@ -39,7 +39,7 @@ var (
"Zero value disables final merge") "Zero value disables final merge")
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the first sample in every time series per each discrete interval "+ minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details") "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details")
logNewSeries = flag.Bool("logNewSeries", false, "Whether to log new series. This option is for debug purposes only. It can lead to performance issues "+ logNewSeries = flag.Bool("logNewSeries", false, "Whether to log new series. This option is for debug purposes only. It can lead to performance issues "+

View file

@ -33,6 +33,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): show data pocessing speed during data migration. * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): show data pocessing speed during data migration.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `drop_common_labels()` function, which drops common `label="name"` pairs from the passed time series. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#drop_common_labels). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `drop_common_labels()` function, which drops common `label="name"` pairs from the passed time series. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#drop_common_labels).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `tlast_change_over_time(m[d])` function, which returns the timestamp of the last change of `m` on the given lookbehind window `d`. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#tlast_change_over_time). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `tlast_change_over_time(m[d])` function, which returns the timestamp of the last change of `m` on the given lookbehind window `d`. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#tlast_change_over_time).
* FEATURE: leave the last raw sample per each `-dedup.minScrapeInterval` discrete interval when the [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. This aligns better with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness) comparing to the previous behaviour when the first sample per each `-dedup.minScrapeInterval` was left.
* FEATURE: add a handler for `/api/v1/status/buildinfo` endpoint, which is used by Grafana starting from v8.5.0 . See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2515). * FEATURE: add a handler for `/api/v1/status/buildinfo` endpoint, which is used by Grafana starting from v8.5.0 . See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2515).
* BUGFIX: export staleness markers as `null` values from [JSON export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format). Previously they were exported as `NaN` values. This could break the exported JSON parsing, since `NaN` values aren't supported by [JSON specification](https://www.json.org/). * BUGFIX: export staleness markers as `null` values from [JSON export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format). Previously they were exported as `NaN` values. This could break the exported JSON parsing, since `NaN` values aren't supported by [JSON specification](https://www.json.org/).

View file

@ -643,7 +643,7 @@ Below is the output for `/path/to/vmselect -help`:
-cluster.tlsKeyFile string -cluster.tlsKeyFile string
Path to client-side TLS key file to use when connecting to -storageNode if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection Path to client-side TLS key file to use when connecting to -storageNode if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection
-dedup.minScrapeInterval duration -dedup.minScrapeInterval duration
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
-downsampling.period array -downsampling.period array
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
@ -795,7 +795,7 @@ Below is the output for `/path/to/vmstorage -help`:
-cluster.tlsKeyFile string -cluster.tlsKeyFile string
Path to server-side TLS key file to use when accepting connections from vminsert and vmselect if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection Path to server-side TLS key file to use when accepting connections from vminsert and vmselect if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection
-dedup.minScrapeInterval duration -dedup.minScrapeInterval duration
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
-denyQueriesOutsideRetention -denyQueriesOutsideRetention
Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee
-downsampling.period array -downsampling.period array

View file

@ -1097,7 +1097,7 @@ with the enabled de-duplication. See [this section](#deduplication) for details.
## Deduplication ## Deduplication
VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-line flag is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2112#issuecomment-1032587618) for more details on how downsampling works. VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single raw sample with the biggest timestamp per each discrete 60s interval. If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then an arbitrary sample out of these samples is left. This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness).
The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. It is safe to use deduplication and downsampling simultaneously. The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. It is safe to use deduplication and downsampling simultaneously.
@ -1622,7 +1622,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
The maximum size in bytes of a single DataDog POST request to /api/v1/series The maximum size in bytes of a single DataDog POST request to /api/v1/series
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864)
-dedup.minScrapeInterval duration -dedup.minScrapeInterval duration
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling
-deleteAuthKey string -deleteAuthKey string
authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries
-denyQueriesOutsideRetention -denyQueriesOutsideRetention

View file

@ -1101,7 +1101,7 @@ with the enabled de-duplication. See [this section](#deduplication) for details.
## Deduplication ## Deduplication
VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-line flag is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2112#issuecomment-1032587618) for more details on how downsampling works. VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single raw sample with the biggest timestamp per each discrete 60s interval. If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then an arbitrary sample out of these samples is left. This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness).
The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. It is safe to use deduplication and downsampling simultaneously. The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. It is safe to use deduplication and downsampling simultaneously.
@ -1626,7 +1626,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
The maximum size in bytes of a single DataDog POST request to /api/v1/series The maximum size in bytes of a single DataDog POST request to /api/v1/series
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864)
-dedup.minScrapeInterval duration -dedup.minScrapeInterval duration
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling
-deleteAuthKey string -deleteAuthKey string
authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries
-denyQueriesOutsideRetention -denyQueriesOutsideRetention

View file

@ -30,28 +30,24 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterva
// Fast path - nothing to deduplicate // Fast path - nothing to deduplicate
return srcTimestamps, srcValues return srcTimestamps, srcValues
} }
return deduplicateInternal(srcTimestamps, srcValues, dedupInterval) tsNext := srcTimestamps[0] + dedupInterval - 1
} tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
func deduplicateInternal(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) { dstValues := srcValues[:0]
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval for i, ts := range srcTimestamps[1:] {
dstTimestamps := srcTimestamps[:1] if ts <= tsNext {
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts < tsNext {
continue continue
} }
dstTimestamps = append(dstTimestamps, ts) dstTimestamps = append(dstTimestamps, srcTimestamps[i])
dstValues = append(dstValues, srcValues[i]) dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += dedupInterval tsNext += dedupInterval
if ts >= tsNext { if tsNext < ts {
// Slow path for updating ts. tsNext = ts + dedupInterval - 1
tsNext = (ts - ts%dedupInterval) + dedupInterval tsNext -= tsNext % dedupInterval
} }
} }
dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1])
dstValues = append(dstValues, srcValues[len(srcValues)-1])
return dstTimestamps, dstValues return dstTimestamps, dstValues
} }
@ -60,43 +56,41 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterv
// Fast path - nothing to deduplicate // Fast path - nothing to deduplicate
return srcTimestamps, srcValues return srcTimestamps, srcValues
} }
return deduplicateDuringMergeInternal(srcTimestamps, srcValues, dedupInterval) tsNext := srcTimestamps[0] + dedupInterval - 1
} tsNext -= tsNext % dedupInterval
dstTimestamps := srcTimestamps[:0]
func deduplicateDuringMergeInternal(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) { dstValues := srcValues[:0]
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval for i, ts := range srcTimestamps[1:] {
dstTimestamps := srcTimestamps[:1] if ts <= tsNext {
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts < tsNext {
continue continue
} }
dstTimestamps = append(dstTimestamps, ts) dstTimestamps = append(dstTimestamps, srcTimestamps[i])
dstValues = append(dstValues, srcValues[i]) dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += dedupInterval tsNext += dedupInterval
if ts >= tsNext { if tsNext < ts {
// Slow path for updating ts. tsNext = ts + dedupInterval - 1
tsNext = (ts - ts%dedupInterval) + dedupInterval tsNext -= tsNext % dedupInterval
} }
} }
dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1])
dstValues = append(dstValues, srcValues[len(srcValues)-1])
return dstTimestamps, dstValues return dstTimestamps, dstValues
} }
func needsDedup(timestamps []int64, dedupInterval int64) bool { func needsDedup(timestamps []int64, dedupInterval int64) bool {
if len(timestamps) == 0 || dedupInterval <= 0 { if len(timestamps) < 2 || dedupInterval <= 0 {
return false return false
} }
tsNext := (timestamps[0] - timestamps[0]%dedupInterval) + dedupInterval tsNext := timestamps[0] + dedupInterval - 1
tsNext -= tsNext % dedupInterval
for _, ts := range timestamps[1:] { for _, ts := range timestamps[1:] {
if ts < tsNext { if ts <= tsNext {
return true return true
} }
tsNext += dedupInterval tsNext += dedupInterval
if ts >= tsNext { if tsNext < ts {
tsNext = (ts - ts%dedupInterval) + dedupInterval tsNext = ts + dedupInterval - 1
tsNext -= tsNext % dedupInterval
} }
} }
return false return false

View file

@ -19,19 +19,26 @@ func TestNeedsDedup(t *testing.T) {
f(0, []int64{1, 2}, false) f(0, []int64{1, 2}, false)
f(10, []int64{1}, false) f(10, []int64{1}, false)
f(10, []int64{1, 2}, true) f(10, []int64{1, 2}, true)
f(10, []int64{9, 10}, false) f(10, []int64{9, 11}, false)
f(10, []int64{9, 10, 19}, true) f(10, []int64{10, 11}, false)
f(10, []int64{0, 10, 11}, false)
f(10, []int64{9, 10}, true)
f(10, []int64{0, 10, 19}, false)
f(10, []int64{9, 19}, false) f(10, []int64{9, 19}, false)
f(10, []int64{0, 9, 19}, true) f(10, []int64{0, 11, 19}, true)
f(10, []int64{0, 11, 20}, true)
f(10, []int64{0, 11, 21}, false)
f(10, []int64{0, 19}, false) f(10, []int64{0, 19}, false)
f(10, []int64{0, 35, 40}, false) f(10, []int64{0, 30, 40}, false)
f(10, []int64{0, 35, 40, 41}, true) f(10, []int64{0, 31, 40}, true)
f(10, []int64{0, 31, 41}, false)
f(10, []int64{0, 31, 49}, false)
} }
func TestDeduplicateSamples(t *testing.T) { func TestDeduplicateSamples(t *testing.T) {
// Disable deduplication before exit, since the rest of tests expect disabled dedup. // Disable deduplication before exit, since the rest of tests expect disabled dedup.
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) { f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64, valuesExpected []float64) {
t.Helper() t.Helper()
timestampsCopy := make([]int64, len(timestamps)) timestampsCopy := make([]int64, len(timestamps))
values := make([]float64, len(timestamps)) values := make([]float64, len(timestamps))
@ -42,30 +49,10 @@ func TestDeduplicateSamples(t *testing.T) {
dedupInterval := scrapeInterval.Milliseconds() dedupInterval := scrapeInterval.Milliseconds()
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval) timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval)
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
t.Fatalf("invalid DeduplicateSamples(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) t.Fatalf("invalid DeduplicateSamples(%v) timestamps;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
} }
// Verify values if !reflect.DeepEqual(values, valuesExpected) {
if len(timestampsCopy) == 0 { t.Fatalf("invalid DeduplicateSamples(%v) values;\ngot\n%v\nwant\n%v", timestamps, values, valuesExpected)
if len(values) != 0 {
t.Fatalf("values must be empty; got %v", values)
}
return
}
j := 0
for i, ts := range timestamps {
if ts != timestampsCopy[j] {
continue
}
if values[j] != float64(i) {
t.Fatalf("unexpected value at index %d; got %v; want %v; values: %v", j, values[j], i, values)
}
j++
if j == len(timestampsCopy) {
break
}
}
if j != len(timestampsCopy) {
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
} }
// Verify that the second call to DeduplicateSamples doesn't modify samples. // Verify that the second call to DeduplicateSamples doesn't modify samples.
@ -78,19 +65,19 @@ func TestDeduplicateSamples(t *testing.T) {
t.Fatalf("invalid DeduplicateSamples(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy) t.Fatalf("invalid DeduplicateSamples(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy)
} }
} }
f(time.Millisecond, nil, []int64{}) f(time.Millisecond, nil, []int64{}, []float64{})
f(time.Millisecond, []int64{123}, []int64{123}) f(time.Millisecond, []int64{123}, []int64{123}, []float64{0})
f(time.Millisecond, []int64{123, 456}, []int64{123, 456}) f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []float64{0, 1})
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}) f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}, []float64{2, 4, 5, 8, 9})
f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}) f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 205, 300, 1000}) f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 180, 300, 1000}, []float64{0, 2, 5, 7, 8})
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3}) f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 30e3, 39e3, 45e3}, []float64{0, 1, 4, 6, 7})
} }
func TestDeduplicateSamplesDuringMerge(t *testing.T) { func TestDeduplicateSamplesDuringMerge(t *testing.T) {
// Disable deduplication before exit, since the rest of tests expect disabled dedup. // Disable deduplication before exit, since the rest of tests expect disabled dedup.
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) { f := func(scrapeInterval time.Duration, timestamps, timestampsExpected, valuesExpected []int64) {
t.Helper() t.Helper()
timestampsCopy := make([]int64, len(timestamps)) timestampsCopy := make([]int64, len(timestamps))
values := make([]int64, len(timestamps)) values := make([]int64, len(timestamps))
@ -101,30 +88,10 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) {
dedupInterval := scrapeInterval.Milliseconds() dedupInterval := scrapeInterval.Milliseconds()
timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values, dedupInterval) timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values, dedupInterval)
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) timestamps;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
} }
// Verify values if !reflect.DeepEqual(values, valuesExpected) {
if len(timestampsCopy) == 0 { t.Fatalf("invalid DeduplicateSamples(%v) values;\ngot\n%v\nwant\n%v", timestamps, values, valuesExpected)
if len(values) != 0 {
t.Fatalf("values must be empty; got %v", values)
}
return
}
j := 0
for i, ts := range timestamps {
if ts != timestampsCopy[j] {
continue
}
if values[j] != int64(i) {
t.Fatalf("unexpected value at index %d; got %v; want %v; values: %v", j, values[j], i, values)
}
j++
if j == len(timestampsCopy) {
break
}
}
if j != len(timestampsCopy) {
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
} }
// Verify that the second call to DeduplicateSamples doesn't modify samples. // Verify that the second call to DeduplicateSamples doesn't modify samples.
@ -137,21 +104,10 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) {
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy) t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy)
} }
} }
f(time.Millisecond, nil, []int64{}) f(time.Millisecond, nil, []int64{}, []int64{})
f(time.Millisecond, []int64{123}, []int64{123}) f(time.Millisecond, []int64{123}, []int64{123}, []int64{0})
f(time.Millisecond, []int64{123, 456}, []int64{123, 456}) f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []int64{0, 1})
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}) f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}, []int64{2, 4, 5, 8, 9})
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000}) f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000}, []int64{0, 2, 6, 7, 8})
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3}) f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 30e3, 39e3, 45e3}, []int64{0, 1, 4, 6, 7})
var timestamps, timestampsExpected []int64
for i := 0; i < 40; i++ {
timestamps = append(timestamps, int64(i*1000))
if i%2 == 0 {
timestampsExpected = append(timestampsExpected, int64(i*1000))
}
}
f(0, timestamps, timestamps)
f(time.Second, timestamps, timestamps)
f(2*time.Second, timestamps, timestampsExpected)
} }