mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
41f7940f97
* lib/streamaggr: properly reference slice with labels by limiting slice capacity. It must fix issues with slice modification, in case of append new slice will be allocated, instead of modifying refrenced slice https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402 * Reduce memory allocations when output_relabel_configs adds new labels to output samples --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
845 lines
19 KiB
Go
845 lines
19 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
|
)
|
|
|
|
func TestAggregatorsFailure(t *testing.T) {
|
|
f := func(config string) {
|
|
t.Helper()
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
panic(fmt.Errorf("pushFunc shouldn't be called"))
|
|
}
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
|
if err == nil {
|
|
t.Fatalf("expecting non-nil error")
|
|
}
|
|
if a != nil {
|
|
t.Fatalf("expecting nil a")
|
|
}
|
|
}
|
|
|
|
// Invalid config
|
|
f(`foobar`)
|
|
|
|
// Unknown option
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
foobar: baz
|
|
`)
|
|
|
|
// missing interval
|
|
f(`
|
|
- outputs: [total]
|
|
`)
|
|
|
|
// missing outputs
|
|
f(`
|
|
- interval: 1m
|
|
`)
|
|
|
|
// Invalid output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [foobar]
|
|
`)
|
|
|
|
// Negative interval
|
|
f(`- interval: -5m`)
|
|
// Too small interval
|
|
f(`- interval: 10ms`)
|
|
|
|
// Invalid input_relabel_configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
input_relabel_configs:
|
|
- foo: bar
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
input_relabel_configs:
|
|
- action: replace
|
|
`)
|
|
|
|
// Invalid output_relabel_configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
output_relabel_configs:
|
|
- foo: bar
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
output_relabel_configs:
|
|
- action: replace
|
|
`)
|
|
|
|
// Both by and without are non-empty
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
by: [foo]
|
|
without: [bar]
|
|
`)
|
|
|
|
// Invalid quantiles()
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles("]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles()"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(foo)"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(-0.5)"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(1.5)"]
|
|
`)
|
|
}
|
|
|
|
func TestAggregatorsEqual(t *testing.T) {
|
|
f := func(a, b string, expectedResult bool) {
|
|
t.Helper()
|
|
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
|
|
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
result := aa.Equal(ab)
|
|
if result != expectedResult {
|
|
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
|
|
}
|
|
}
|
|
f("", "", true)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, ``, false)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, `
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, true)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 3m
|
|
`, `
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, false)
|
|
}
|
|
|
|
func TestAggregatorsSuccess(t *testing.T) {
|
|
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
|
|
t.Helper()
|
|
|
|
// Initialize Aggregators
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
var tssOutputLock sync.Mutex
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
tssOutputLock.Lock()
|
|
for _, ts := range tss {
|
|
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
|
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
|
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
|
Labels: labelsCopy,
|
|
Samples: samplesCopy,
|
|
})
|
|
}
|
|
tssOutputLock.Unlock()
|
|
}
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
|
|
// Push the inputMetrics to Aggregators
|
|
tssInput := mustParsePromMetrics(inputMetrics)
|
|
matchIdxs := a.Push(tssInput, nil)
|
|
a.MustStop()
|
|
|
|
// Verify matchIdxs equals to matchIdxsExpected
|
|
matchIdxsStr := ""
|
|
for _, v := range matchIdxs {
|
|
matchIdxsStr += strconv.Itoa(int(v))
|
|
}
|
|
if matchIdxsStr != matchIdxsStrExpected {
|
|
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
|
|
}
|
|
|
|
// Verify the tssOutput contains the expected metrics
|
|
tsStrings := make([]string, len(tssOutput))
|
|
for i, ts := range tssOutput {
|
|
tsStrings[i] = timeSeriesToString(ts)
|
|
}
|
|
sort.Strings(tsStrings)
|
|
outputMetrics := strings.Join(tsStrings, "")
|
|
if outputMetrics != outputMetricsExpected {
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
}
|
|
}
|
|
|
|
// Empty config
|
|
f(``, ``, ``, "")
|
|
f(``, `foo{bar="baz"} 1`, ``, "0")
|
|
f(``, "foo 1\nbaz 2", ``, "00")
|
|
|
|
// Empty by list - aggregate only by time
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [count_samples, sum_samples, count_series, last]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_count_samples 1
|
|
bar:1m_count_series 1
|
|
bar:1m_last 5
|
|
bar:1m_sum_samples 5
|
|
foo:1m_count_samples{abc="123"} 2
|
|
foo:1m_count_samples{abc="456",de="fg"} 1
|
|
foo:1m_count_series{abc="123"} 1
|
|
foo:1m_count_series{abc="456",de="fg"} 1
|
|
foo:1m_last{abc="123"} 8.5
|
|
foo:1m_last{abc="456",de="fg"} 8
|
|
foo:1m_sum_samples{abc="123"} 12.5
|
|
foo:1m_sum_samples{abc="456",de="fg"} 8
|
|
`, "1111")
|
|
|
|
// Special case: __name__ in `by` list - this is the same as empty `by` list
|
|
f(`
|
|
- interval: 1m
|
|
by: [__name__]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_count_samples 1
|
|
bar:1m_count_series 1
|
|
bar:1m_sum_samples 5
|
|
foo:1m_count_samples 3
|
|
foo:1m_count_series 2
|
|
foo:1m_sum_samples 20.5
|
|
`, "1111")
|
|
|
|
// Non-empty `by` list with non-existing labels
|
|
f(`
|
|
- interval: 1m
|
|
by: [foo, bar]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_bar_foo_count_samples 1
|
|
bar:1m_by_bar_foo_count_series 1
|
|
bar:1m_by_bar_foo_sum_samples 5
|
|
foo:1m_by_bar_foo_count_samples 3
|
|
foo:1m_by_bar_foo_count_series 2
|
|
foo:1m_by_bar_foo_sum_samples 20.5
|
|
`, "1111")
|
|
|
|
// Non-empty `by` list with existing label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
bar:1m_by_abc_count_series 1
|
|
bar:1m_by_abc_sum_samples 5
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`, "1111")
|
|
|
|
// Non-empty `by` list with duplicate existing label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc, abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
bar:1m_by_abc_count_series 1
|
|
bar:1m_by_abc_sum_samples 5
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`, "1111")
|
|
|
|
// Non-empty `without` list with non-existing labels
|
|
f(`
|
|
- interval: 1m
|
|
without: [foo]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_foo_count_samples 1
|
|
bar:1m_without_foo_count_series 1
|
|
bar:1m_without_foo_sum_samples 5
|
|
foo:1m_without_foo_count_samples{abc="123"} 2
|
|
foo:1m_without_foo_count_samples{abc="456",de="fg"} 1
|
|
foo:1m_without_foo_count_series{abc="123"} 1
|
|
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
|
|
foo:1m_without_foo_sum_samples{abc="123"} 12.5
|
|
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
|
|
`, "1111")
|
|
|
|
// Non-empty `without` list with existing labels
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
bar:1m_without_abc_count_series 1
|
|
bar:1m_without_abc_sum_samples 5
|
|
foo:1m_without_abc_count_samples 2
|
|
foo:1m_without_abc_count_samples{de="fg"} 1
|
|
foo:1m_without_abc_count_series 1
|
|
foo:1m_without_abc_count_series{de="fg"} 1
|
|
foo:1m_without_abc_sum_samples 12.5
|
|
foo:1m_without_abc_sum_samples{de="fg"} 8
|
|
`, "1111")
|
|
|
|
// Special case: __name__ in `without` list
|
|
f(`
|
|
- interval: 1m
|
|
without: [__name__]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `:1m_count_samples 1
|
|
:1m_count_samples{abc="123"} 2
|
|
:1m_count_samples{abc="456",de="fg"} 1
|
|
:1m_count_series 1
|
|
:1m_count_series{abc="123"} 1
|
|
:1m_count_series{abc="456",de="fg"} 1
|
|
:1m_sum_samples 5
|
|
:1m_sum_samples{abc="123"} 12.5
|
|
:1m_sum_samples{abc="456",de="fg"} 8
|
|
`, "1111")
|
|
|
|
// drop some input metrics
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
input_relabel_configs:
|
|
- if: 'foo'
|
|
action: drop
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
bar:1m_without_abc_count_series 1
|
|
bar:1m_without_abc_sum_samples 5
|
|
`, "1111")
|
|
|
|
// rename output metrics
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
output_relabel_configs:
|
|
- action: replace_all
|
|
source_labels: [__name__]
|
|
regex: ":|_"
|
|
replacement: "-"
|
|
target_label: __name__
|
|
- action: drop
|
|
source_labels: [de]
|
|
regex: fg
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar-1m-without-abc-count-samples 1
|
|
bar-1m-without-abc-count-series 1
|
|
bar-1m-without-abc-sum-samples 5
|
|
foo-1m-without-abc-count-samples 2
|
|
foo-1m-without-abc-count-series 1
|
|
foo-1m-without-abc-sum-samples 12.5
|
|
`, "1111")
|
|
|
|
// match doesn't match anything
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
match: '{non_existing_label!=""}'
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, ``, "0000")
|
|
|
|
// match matches foo series with non-empty abc label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
match:
|
|
- foo{abc=~".+"}
|
|
- '{non_existing_label!=""}'
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`, "1011")
|
|
|
|
// total output for non-repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 4.34
|
|
`, `bar:1m_total{baz="qwe"} 0
|
|
foo:1m_total 0
|
|
`, "11")
|
|
|
|
// total output for repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_total{baz="qwe"} 5.02
|
|
bar:1m_total{baz="qwer"} 1
|
|
foo:1m_total 0
|
|
foo:1m_total{baz="qwe"} 15
|
|
`, "11111111")
|
|
|
|
// total output for repeated series with group by __name__
|
|
f(`
|
|
- interval: 1m
|
|
by: [__name__]
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_total 6.02
|
|
foo:1m_total 15
|
|
`, "11111111")
|
|
|
|
// increase output for non-repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [increase]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 4.34
|
|
`, `bar:1m_increase{baz="qwe"} 0
|
|
foo:1m_increase 0
|
|
`, "11")
|
|
|
|
// increase output for repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [increase]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_increase{baz="qwe"} 5.02
|
|
bar:1m_increase{baz="qwer"} 1
|
|
foo:1m_increase 0
|
|
foo:1m_increase{baz="qwe"} 15
|
|
`, "11111111")
|
|
|
|
// multiple aggregate configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [count_series, sum_samples]
|
|
- interval: 5m
|
|
by: [bar]
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 1
|
|
foo{bar="baz"} 2
|
|
foo 3.3
|
|
`, `foo:1m_count_series 1
|
|
foo:1m_count_series{bar="baz"} 1
|
|
foo:1m_sum_samples 4.3
|
|
foo:1m_sum_samples{bar="baz"} 2
|
|
foo:5m_by_bar_sum_samples 4.3
|
|
foo:5m_by_bar_sum_samples{bar="baz"} 2
|
|
`, "111")
|
|
|
|
// min and max outputs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [min, max]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_max 5
|
|
bar:1m_min 5
|
|
foo:1m_max{abc="123"} 8.5
|
|
foo:1m_max{abc="456",de="fg"} 8
|
|
foo:1m_min{abc="123"} 4
|
|
foo:1m_min{abc="456",de="fg"} 8
|
|
`, "1111")
|
|
|
|
// avg output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [avg]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_avg 5
|
|
foo:1m_avg{abc="123"} 6.25
|
|
foo:1m_avg{abc="456",de="fg"} 8
|
|
`, "1111")
|
|
|
|
// stddev output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [stddev]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_stddev 0
|
|
foo:1m_stddev{abc="123"} 2.25
|
|
foo:1m_stddev{abc="456",de="fg"} 0
|
|
`, "1111")
|
|
|
|
// stdvar output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [stdvar]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_stdvar 0
|
|
foo:1m_stdvar{abc="123"} 5.0625
|
|
foo:1m_stdvar{abc="456",de="fg"} 0
|
|
`, "1111")
|
|
|
|
// histogram_bucket output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [histogram_bucket]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
|
|
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
|
|
`, "1111111")
|
|
|
|
// histogram_bucket output without cpu
|
|
f(`
|
|
- interval: 1m
|
|
without: [cpu]
|
|
outputs: [histogram_bucket]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
|
|
`, "1111111")
|
|
|
|
// quantiles output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
|
|
`, "1111111")
|
|
|
|
// quantiles output without cpu
|
|
f(`
|
|
- interval: 1m
|
|
without: [cpu]
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
|
|
`, "1111111")
|
|
|
|
// append additional label
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
output_relabel_configs:
|
|
- action: replace_all
|
|
source_labels: [__name__]
|
|
regex: ":|_"
|
|
replacement: "-"
|
|
target_label: __name__
|
|
- action: drop
|
|
source_labels: [de]
|
|
regex: fg
|
|
- target_label: new_label
|
|
replacement: must_keep_metric_name
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1
|
|
bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
|
bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5
|
|
foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2
|
|
foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
|
foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
|
|
`, "1111")
|
|
}
|
|
|
|
func TestAggregatorsWithDedupInterval(t *testing.T) {
|
|
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
|
|
t.Helper()
|
|
|
|
// Initialize Aggregators
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
var tssOutputLock sync.Mutex
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
tssOutputLock.Lock()
|
|
for _, ts := range tss {
|
|
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
|
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
|
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
|
Labels: labelsCopy,
|
|
Samples: samplesCopy,
|
|
})
|
|
}
|
|
tssOutputLock.Unlock()
|
|
}
|
|
const dedupInterval = time.Hour
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
|
|
// Push the inputMetrics to Aggregators
|
|
tssInput := mustParsePromMetrics(inputMetrics)
|
|
matchIdxs := a.Push(tssInput, nil)
|
|
if a != nil {
|
|
for _, aggr := range a.as {
|
|
aggr.dedupFlush()
|
|
aggr.flush()
|
|
}
|
|
}
|
|
a.MustStop()
|
|
|
|
// Verify matchIdxs equals to matchIdxsExpected
|
|
matchIdxsStr := ""
|
|
for _, v := range matchIdxs {
|
|
matchIdxsStr += strconv.Itoa(int(v))
|
|
}
|
|
if matchIdxsStr != matchIdxsStrExpected {
|
|
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
|
|
}
|
|
|
|
// Verify the tssOutput contains the expected metrics
|
|
tsStrings := make([]string, len(tssOutput))
|
|
for i, ts := range tssOutput {
|
|
tsStrings[i] = timeSeriesToString(ts)
|
|
}
|
|
sort.Strings(tsStrings)
|
|
outputMetrics := strings.Join(tsStrings, "")
|
|
if outputMetrics != outputMetricsExpected {
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
}
|
|
}
|
|
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 123
|
|
bar 567
|
|
`, `bar:1m_sum_samples 567
|
|
foo:1m_sum_samples 123
|
|
`, "11")
|
|
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_sum_samples{baz="qwe"} 2
|
|
bar:1m_sum_samples{baz="qwer"} 344
|
|
foo:1m_sum_samples 123
|
|
foo:1m_sum_samples{baz="qwe"} 10
|
|
`, "11111111")
|
|
}
|
|
|
|
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
|
|
labelsString := promrelabel.LabelsToString(ts.Labels)
|
|
if len(ts.Samples) != 1 {
|
|
panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples)))
|
|
}
|
|
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
|
|
}
|
|
|
|
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
|
var rows prometheus.Rows
|
|
errLogger := func(s string) {
|
|
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
|
|
}
|
|
rows.UnmarshalWithErrLogger(s, errLogger)
|
|
var tss []prompbmarshal.TimeSeries
|
|
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
|
|
for _, row := range rows.Rows {
|
|
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: "__name__",
|
|
Value: row.Metric,
|
|
})
|
|
for _, tag := range row.Tags {
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: tag.Key,
|
|
Value: tag.Value,
|
|
})
|
|
}
|
|
samples = append(samples, prompbmarshal.Sample{
|
|
Value: row.Value,
|
|
Timestamp: row.Timestamp,
|
|
})
|
|
ts := prompbmarshal.TimeSeries{
|
|
Labels: labels,
|
|
Samples: samples[len(samples)-1:],
|
|
}
|
|
tss = append(tss, ts)
|
|
}
|
|
return tss
|
|
}
|