mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: follow-up after 7c0ae3a86a
- Update docs at https://docs.victoriametrics.com/#deduplication - Optimize the deduplication loop a bit Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
This commit is contained in:
parent
909cd04c55
commit
e56d5e1918
6 changed files with 159 additions and 94 deletions
|
@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
|
|
||||||
**Update note 2:** this release splits `type="indexdb"` metrics into `type="indexdb/inmemory"` and `type="indexdb/file"` metrics. This may break old dashboards and alerting rules, which contain [label filter](https://docs.victoriametrics.com/keyConcepts.html#filtering) on `{type="indexdb"}`. Such label filter must be substituted with `{type=~"indexdb.*"}`, so it matches `indexdb` from the previous releases and `indexdb/inmemory` + `indexdb/file` from new releases. It is recommended upgrading to the latest available dashboards and alerting rules mentioned in [these docs](https://docs.victoriametrics.com/#monitoring), since they already contain fixed label filters.
|
**Update note 2:** this release splits `type="indexdb"` metrics into `type="indexdb/inmemory"` and `type="indexdb/file"` metrics. This may break old dashboards and alerting rules, which contain [label filter](https://docs.victoriametrics.com/keyConcepts.html#filtering) on `{type="indexdb"}`. Such label filter must be substituted with `{type=~"indexdb.*"}`, so it matches `indexdb` from the previous releases and `indexdb/inmemory` + `indexdb/file` from new releases. It is recommended upgrading to the latest available dashboards and alerting rules mentioned in [these docs](https://docs.victoriametrics.com/#monitoring), since they already contain fixed label filters.
|
||||||
|
|
||||||
|
* FEATURE: leave a sample with the biggest value for identical timestamps per each `-dedup.minScrapeInterval` discrete interval when the [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333).
|
||||||
* FEATURE: add `-inmemoryDataFlushInterval` command-line flag, which can be used for controlling the frequency of in-memory data flush to disk. The data flush frequency can be reduced when VictoriaMetrics stores data to low-end flash device with limited number of write cycles (for example, on Raspberry PI). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337).
|
* FEATURE: add `-inmemoryDataFlushInterval` command-line flag, which can be used for controlling the frequency of in-memory data flush to disk. The data flush frequency can be reduced when VictoriaMetrics stores data to low-end flash device with limited number of write cycles (for example, on Raspberry PI). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337).
|
||||||
* FEATURE: expose additional metrics for `indexdb` and `storage` parts stored in memory and for `indexdb` parts stored in files (see [storage docs](https://docs.victoriametrics.com/#storage) for technical details):
|
* FEATURE: expose additional metrics for `indexdb` and `storage` parts stored in memory and for `indexdb` parts stored in files (see [storage docs](https://docs.victoriametrics.com/#storage) for technical details):
|
||||||
* `vm_active_merges{type="storage/inmemory"}` - active merges for in-memory `storage` parts
|
* `vm_active_merges{type="storage/inmemory"}` - active merges for in-memory `storage` parts
|
||||||
|
@ -56,7 +57,6 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-remoteWrite.sendTimeout` command-line flag, which allows configuring timeout for sending data to `-remoteWrite.url`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3408).
|
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-remoteWrite.sendTimeout` command-line flag, which allows configuring timeout for sending data to `-remoteWrite.url`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3408).
|
||||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to migrate data between VictoriaMetrics clusters with automatic tenants discovery. See [these docs](https://docs.victoriametrics.com/vmctl.html#cluster-to-cluster-migration-mode) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2930).
|
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to migrate data between VictoriaMetrics clusters with automatic tenants discovery. See [these docs](https://docs.victoriametrics.com/vmctl.html#cluster-to-cluster-migration-mode) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2930).
|
||||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to copy data from sources via Prometheus `remote_read` protocol. See [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-by-remote-read-protocol). The related issues: [one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3132) and [two](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1101).
|
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to copy data from sources via Prometheus `remote_read` protocol. See [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-by-remote-read-protocol). The related issues: [one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3132) and [two](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1101).
|
||||||
* FEATURE: [deduplication](https://docs.victoriametrics.com/#deduplication): leave raw sample with the biggest value for identical timestamps per each `-dedup.minScrapeInterval` discrete interval when the [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333).
|
|
||||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): allow changing timezones for the requested data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3075).
|
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): allow changing timezones for the requested data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3075).
|
||||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): provide fast path for hiding results for all the queries except the given one by clicking `eye` icon with `ctrl` key pressed. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3446).
|
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): provide fast path for hiding results for all the queries except the given one by clicking `eye` icon with `ctrl` key pressed. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3446).
|
||||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_spikes(phi, q)` function for trimming `phi` percent of the largest spikes per each time series returned by `q`. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#range_trim_spikes).
|
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_spikes(phi, q)` function for trimming `phi` percent of the largest spikes per each time series returned by `q`. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#range_trim_spikes).
|
||||||
|
|
|
@ -1352,7 +1352,12 @@ with the enabled de-duplication. See [this section](#deduplication) for details.
|
||||||
|
|
||||||
## Deduplication
|
## Deduplication
|
||||||
|
|
||||||
VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single raw sample with the biggest timestamp per each discrete 60s interval. If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then an arbitrary sample out of these samples is left. This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness).
|
VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval
|
||||||
|
if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single
|
||||||
|
raw sample with the biggest timestamp per each discrete 60s interval.
|
||||||
|
This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness).
|
||||||
|
|
||||||
|
If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then the sample with the biggest value is left.
|
||||||
|
|
||||||
The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. So it is safe to use deduplication and downsampling simultaneously.
|
The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. So it is safe to use deduplication and downsampling simultaneously.
|
||||||
|
|
||||||
|
|
|
@ -1355,7 +1355,12 @@ with the enabled de-duplication. See [this section](#deduplication) for details.
|
||||||
|
|
||||||
## Deduplication
|
## Deduplication
|
||||||
|
|
||||||
VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single raw sample with the biggest timestamp per each discrete 60s interval. If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then an arbitrary sample out of these samples is left. This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness).
|
VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval
|
||||||
|
if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single
|
||||||
|
raw sample with the biggest timestamp per each discrete 60s interval.
|
||||||
|
This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness).
|
||||||
|
|
||||||
|
If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then the sample with the biggest value is left.
|
||||||
|
|
||||||
The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. So it is safe to use deduplication and downsampling simultaneously.
|
The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. So it is safe to use deduplication and downsampling simultaneously.
|
||||||
|
|
||||||
|
|
|
@ -34,37 +34,40 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterva
|
||||||
tsNext -= tsNext % dedupInterval
|
tsNext -= tsNext % dedupInterval
|
||||||
dstTimestamps := srcTimestamps[:0]
|
dstTimestamps := srcTimestamps[:0]
|
||||||
dstValues := srcValues[:0]
|
dstValues := srcValues[:0]
|
||||||
|
|
||||||
var tsPrev int64
|
|
||||||
var value, valuePrev float64
|
|
||||||
for i, ts := range srcTimestamps[1:] {
|
for i, ts := range srcTimestamps[1:] {
|
||||||
value = srcValues[i]
|
|
||||||
tsCur := srcTimestamps[i]
|
|
||||||
if tsCur == tsPrev && value < valuePrev {
|
|
||||||
// prefer biggest value on timestamp conflict
|
|
||||||
value = valuePrev
|
|
||||||
}
|
|
||||||
valuePrev = value
|
|
||||||
tsPrev = tsCur
|
|
||||||
if ts <= tsNext {
|
if ts <= tsNext {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
dstTimestamps = append(dstTimestamps, tsCur)
|
// Choose the maximum value with the timestamp equal to tsPrev.
|
||||||
dstValues = append(dstValues, value)
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
|
||||||
|
j := i
|
||||||
|
tsPrev := srcTimestamps[j]
|
||||||
|
vPrev := srcValues[j]
|
||||||
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
||||||
|
j--
|
||||||
|
if srcValues[j] > vPrev {
|
||||||
|
vPrev = srcValues[j]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
||||||
|
dstValues = append(dstValues, vPrev)
|
||||||
tsNext += dedupInterval
|
tsNext += dedupInterval
|
||||||
if tsNext < ts {
|
if tsNext < ts {
|
||||||
tsNext = ts + dedupInterval - 1
|
tsNext = ts + dedupInterval - 1
|
||||||
tsNext -= tsNext % dedupInterval
|
tsNext -= tsNext % dedupInterval
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
j := len(srcTimestamps) - 1
|
||||||
ts := srcTimestamps[len(srcTimestamps)-1]
|
tsPrev := srcTimestamps[j]
|
||||||
v := srcValues[len(srcValues)-1]
|
vPrev := srcValues[j]
|
||||||
dstTimestamps = append(dstTimestamps, ts)
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
||||||
if ts == tsPrev && v < value {
|
j--
|
||||||
v = value
|
if srcValues[j] > vPrev {
|
||||||
|
vPrev = srcValues[j]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
dstValues = append(dstValues, v)
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
||||||
|
dstValues = append(dstValues, vPrev)
|
||||||
return dstTimestamps, dstValues
|
return dstTimestamps, dstValues
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,36 +80,40 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterv
|
||||||
tsNext -= tsNext % dedupInterval
|
tsNext -= tsNext % dedupInterval
|
||||||
dstTimestamps := srcTimestamps[:0]
|
dstTimestamps := srcTimestamps[:0]
|
||||||
dstValues := srcValues[:0]
|
dstValues := srcValues[:0]
|
||||||
|
|
||||||
var tsPrev int64
|
|
||||||
var value, valuePrev int64
|
|
||||||
for i, ts := range srcTimestamps[1:] {
|
for i, ts := range srcTimestamps[1:] {
|
||||||
value = srcValues[i]
|
|
||||||
tsCur := srcTimestamps[i]
|
|
||||||
if tsCur == tsPrev && value < valuePrev {
|
|
||||||
// prefer biggest value on timestamp conflict
|
|
||||||
value = valuePrev
|
|
||||||
}
|
|
||||||
valuePrev = value
|
|
||||||
tsPrev = tsCur
|
|
||||||
if ts <= tsNext {
|
if ts <= tsNext {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
dstTimestamps = append(dstTimestamps, tsCur)
|
// Choose the maximum value with the timestamp equal to tsPrev.
|
||||||
dstValues = append(dstValues, value)
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333
|
||||||
|
j := i
|
||||||
|
tsPrev := srcTimestamps[j]
|
||||||
|
vPrev := srcValues[j]
|
||||||
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
||||||
|
j--
|
||||||
|
if srcValues[j] > vPrev {
|
||||||
|
vPrev = srcValues[j]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
||||||
|
dstValues = append(dstValues, vPrev)
|
||||||
tsNext += dedupInterval
|
tsNext += dedupInterval
|
||||||
if tsNext < ts {
|
if tsNext < ts {
|
||||||
tsNext = ts + dedupInterval - 1
|
tsNext = ts + dedupInterval - 1
|
||||||
tsNext -= tsNext % dedupInterval
|
tsNext -= tsNext % dedupInterval
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ts := srcTimestamps[len(srcTimestamps)-1]
|
j := len(srcTimestamps) - 1
|
||||||
v := srcValues[len(srcValues)-1]
|
tsPrev := srcTimestamps[j]
|
||||||
dstTimestamps = append(dstTimestamps, ts)
|
vPrev := srcValues[j]
|
||||||
if ts == tsPrev && v < value {
|
for j > 0 && srcTimestamps[j-1] == tsPrev {
|
||||||
v = value
|
j--
|
||||||
|
if srcValues[j] > vPrev {
|
||||||
|
vPrev = srcValues[j]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
dstValues = append(dstValues, v)
|
dstTimestamps = append(dstTimestamps, tsPrev)
|
||||||
|
dstValues = append(dstValues, vPrev)
|
||||||
return dstTimestamps, dstValues
|
return dstTimestamps, dstValues
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,13 +35,11 @@ func TestNeedsDedup(t *testing.T) {
|
||||||
f(10, []int64{0, 31, 49}, false)
|
f(10, []int64{0, 31, 49}, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeduplicateSamples(t *testing.T) {
|
func TestDeduplicateSamplesWithIdenticalTimestamps(t *testing.T) {
|
||||||
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
f := func(scrapeInterval time.Duration, timestamps []int64, values []float64, timestampsExpected []int64, valuesExpected []float64) {
|
||||||
|
|
||||||
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64, values, valuesExpected []float64) {
|
|
||||||
t.Helper()
|
t.Helper()
|
||||||
timestampsCopy := make([]int64, len(timestamps))
|
timestampsCopy := append([]int64{}, timestamps...)
|
||||||
copy(timestampsCopy, timestamps)
|
|
||||||
dedupInterval := scrapeInterval.Milliseconds()
|
dedupInterval := scrapeInterval.Milliseconds()
|
||||||
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval)
|
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval)
|
||||||
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||||
|
@ -61,57 +59,77 @@ func TestDeduplicateSamples(t *testing.T) {
|
||||||
t.Fatalf("invalid DeduplicateSamples(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy)
|
t.Fatalf("invalid DeduplicateSamples(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
f(time.Millisecond, nil, []int64{}, []float64{}, []float64{})
|
f(time.Second, []int64{1000, 1000}, []float64{2, 1}, []int64{1000}, []float64{2})
|
||||||
f(time.Millisecond, []int64{123}, []int64{123}, []float64{0}, []float64{0})
|
f(time.Second, []int64{1001, 1001}, []float64{2, 1}, []int64{1001}, []float64{2})
|
||||||
f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []float64{0, 1}, []float64{0, 1})
|
f(time.Second, []int64{1000, 1001, 1001, 1001, 2001}, []float64{1, 2, 5, 3, 0}, []int64{1000, 1001, 2001}, []float64{1, 5, 0})
|
||||||
|
}
|
||||||
|
|
||||||
// pick the biggest value on the interval, no matter what order is
|
func TestDeduplicateSamplesDuringMergeWithIdenticalTimestamps(t *testing.T) {
|
||||||
f(time.Millisecond,
|
f := func(scrapeInterval time.Duration, timestamps, values, timestampsExpected, valuesExpected []int64) {
|
||||||
[]int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4},
|
t.Helper()
|
||||||
[]int64{0, 1, 2, 3, 4},
|
timestampsCopy := append([]int64{}, timestamps...)
|
||||||
[]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
|
||||||
[]float64{2, 4, 5, 8, 9})
|
|
||||||
f(time.Millisecond,
|
|
||||||
[]int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4},
|
|
||||||
[]int64{0, 1, 2, 3, 4},
|
|
||||||
[]float64{2, 1, 0, 3, 4, 5, 7, 6, 8, 9},
|
|
||||||
[]float64{2, 4, 5, 8, 9})
|
|
||||||
f(time.Millisecond,
|
|
||||||
[]int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4},
|
|
||||||
[]int64{0, 1, 2, 3, 4},
|
|
||||||
[]float64{1, 2, 0, 4, 3, 5, 8, 6, 7, 9},
|
|
||||||
[]float64{2, 4, 5, 8, 9})
|
|
||||||
|
|
||||||
// descending values
|
dedupInterval := scrapeInterval.Milliseconds()
|
||||||
f(time.Millisecond,
|
timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values, dedupInterval)
|
||||||
[]int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4},
|
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||||
[]int64{0, 1, 2, 3, 4},
|
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) timestamps;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||||
[]float64{9, 8, 7, 6, 5, 4, 3, 2, 1, 0},
|
}
|
||||||
[]float64{9, 6, 4, 3, 0})
|
if !reflect.DeepEqual(values, valuesExpected) {
|
||||||
|
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) values;\ngot\n%v\nwant\n%v", timestamps, values, valuesExpected)
|
||||||
|
}
|
||||||
|
|
||||||
f(10*time.Millisecond,
|
// Verify that the second call to deduplicateSamplesDuringMerge doesn't modify samples.
|
||||||
[]int64{0, 9, 11, 13, 13, 29, 29, 29},
|
valuesCopy := append([]int64{}, values...)
|
||||||
[]int64{0, 9, 13, 29},
|
timestampsCopy, valuesCopy = deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy, dedupInterval)
|
||||||
[]float64{5, 1, 0, 4, 1, 3, 0, 5},
|
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||||
[]float64{5, 1, 4, 5})
|
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(valuesCopy, values) {
|
||||||
|
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f(time.Second, []int64{1000, 1000}, []int64{2, 1}, []int64{1000}, []int64{2})
|
||||||
|
f(time.Second, []int64{1001, 1001}, []int64{2, 1}, []int64{1001}, []int64{2})
|
||||||
|
f(time.Second, []int64{1000, 1001, 1001, 1001, 2001}, []int64{1, 2, 5, 3, 0}, []int64{1000, 1001, 2001}, []int64{1, 5, 0})
|
||||||
|
}
|
||||||
|
|
||||||
// too small dedup interval
|
func TestDeduplicateSamples(t *testing.T) {
|
||||||
f(0,
|
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
||||||
[]int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4},
|
|
||||||
[]int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4},
|
|
||||||
[]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
|
||||||
[]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
|
||||||
|
|
||||||
f(100*time.Millisecond,
|
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64, valuesExpected []float64) {
|
||||||
[]int64{0, 100, 100, 101, 150, 180, 205, 300, 1000},
|
t.Helper()
|
||||||
[]int64{0, 100, 180, 300, 1000},
|
timestampsCopy := make([]int64, len(timestamps))
|
||||||
[]float64{0, 1, 2, 3, 4, 5, 6, 7, 8},
|
values := make([]float64, len(timestamps))
|
||||||
[]float64{0, 2, 5, 7, 8})
|
for i, ts := range timestamps {
|
||||||
f(10*time.Second,
|
timestampsCopy[i] = ts
|
||||||
[]int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3},
|
values[i] = float64(i)
|
||||||
[]int64{10e3, 13e3, 30e3, 39e3, 45e3},
|
}
|
||||||
[]float64{0, 1, 2, 3, 4, 5, 6, 7},
|
dedupInterval := scrapeInterval.Milliseconds()
|
||||||
[]float64{0, 1, 4, 6, 7})
|
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval)
|
||||||
|
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||||
|
t.Fatalf("invalid DeduplicateSamples(%v) timestamps;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(values, valuesExpected) {
|
||||||
|
t.Fatalf("invalid DeduplicateSamples(%v) values;\ngot\n%v\nwant\n%v", timestamps, values, valuesExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that the second call to DeduplicateSamples doesn't modify samples.
|
||||||
|
valuesCopy := append([]float64{}, values...)
|
||||||
|
timestampsCopy, valuesCopy = DeduplicateSamples(timestampsCopy, valuesCopy, dedupInterval)
|
||||||
|
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||||
|
t.Fatalf("invalid DeduplicateSamples(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(valuesCopy, values) {
|
||||||
|
t.Fatalf("invalid DeduplicateSamples(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f(time.Millisecond, nil, []int64{}, []float64{})
|
||||||
|
f(time.Millisecond, []int64{123}, []int64{123}, []float64{0})
|
||||||
|
f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []float64{0, 1})
|
||||||
|
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}, []float64{2, 4, 5, 8, 9})
|
||||||
|
f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||||
|
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 180, 300, 1000}, []float64{0, 2, 5, 7, 8})
|
||||||
|
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 30e3, 39e3, 45e3}, []float64{0, 1, 4, 6, 7})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeduplicateSamplesDuringMerge(t *testing.T) {
|
func TestDeduplicateSamplesDuringMerge(t *testing.T) {
|
||||||
|
|
|
@ -12,6 +12,7 @@ func BenchmarkDeduplicateSamples(b *testing.B) {
|
||||||
values := make([]float64, blockSize)
|
values := make([]float64, blockSize)
|
||||||
for i := 0; i < len(timestamps); i++ {
|
for i := 0; i < len(timestamps); i++ {
|
||||||
timestamps[i] = int64(i) * 1e3
|
timestamps[i] = int64(i) * 1e3
|
||||||
|
values[i] = float64(i)
|
||||||
}
|
}
|
||||||
for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} {
|
for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} {
|
||||||
b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) {
|
b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) {
|
||||||
|
@ -33,3 +34,32 @@ func BenchmarkDeduplicateSamples(b *testing.B) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkDeduplicateSamplesDuringMerge(b *testing.B) {
|
||||||
|
const blockSize = 8192
|
||||||
|
timestamps := make([]int64, blockSize)
|
||||||
|
values := make([]int64, blockSize)
|
||||||
|
for i := 0; i < len(timestamps); i++ {
|
||||||
|
timestamps[i] = int64(i) * 1e3
|
||||||
|
values[i] = int64(i)
|
||||||
|
}
|
||||||
|
for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} {
|
||||||
|
b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) {
|
||||||
|
dedupInterval := minScrapeInterval.Milliseconds()
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.SetBytes(blockSize)
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
timestampsCopy := make([]int64, 0, blockSize)
|
||||||
|
valuesCopy := make([]int64, 0, blockSize)
|
||||||
|
for pb.Next() {
|
||||||
|
timestampsCopy := append(timestampsCopy[:0], timestamps...)
|
||||||
|
valuesCopy := append(valuesCopy[:0], values...)
|
||||||
|
ts, vs := deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy, dedupInterval)
|
||||||
|
if len(ts) == 0 || len(vs) == 0 {
|
||||||
|
panic(fmt.Errorf("expecting non-empty results; got\nts=%v\nvs=%v", ts, vs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue