package streamaggr import ( "fmt" "sort" "strings" "sync" "testing" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" ) func TestAggregatorsFailure(t *testing.T) { f := func(config string) { t.Helper() pushFunc := func(tss []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err == nil { t.Fatalf("expecting non-nil error") } if a != nil { t.Fatalf("expecting nil a") } } // Invalid config f(`foobar`) // Unknown option f(` - interval: 1m outputs: [total] foobar: baz `) // missing interval f(` - outputs: [total] `) // missing outputs f(` - interval: 1m `) // Invalid output f(` - interval: 1m outputs: [foobar] `) // Negative interval f(`- interval: -5m`) // Too small interval f(`- interval: 10ms`) // Invalid input_relabel_configs f(` - interval: 1m outputs: [total] input_relabel_configs: - foo: bar `) f(` - interval: 1m outputs: [total] input_relabel_configs: - action: replace `) // Invalid output_relabel_configs f(` - interval: 1m outputs: [total] output_relabel_configs: - foo: bar `) f(` - interval: 1m outputs: [total] output_relabel_configs: - action: replace `) // Both by and without are non-empty f(` - interval: 1m outputs: [total] by: [foo] without: [bar] `) // Invalid quantiles() f(` - interval: 1m outputs: ["quantiles("] `) f(` - interval: 1m outputs: ["quantiles()"] `) f(` - interval: 1m outputs: ["quantiles(foo)"] `) f(` - interval: 1m outputs: ["quantiles(-0.5)"] `) f(` - interval: 1m outputs: ["quantiles(1.5)"] `) } func TestAggregatorsSuccess(t *testing.T) { f := func(config, inputMetrics, outputMetricsExpected string) { t.Helper() // Initialize Aggregators var tssOutput []prompbmarshal.TimeSeries var tssOutputLock sync.Mutex pushFunc := func(tss []prompbmarshal.TimeSeries) { tssOutputLock.Lock() for _, ts := range tss { labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ Labels: labelsCopy, Samples: samplesCopy, }) } tssOutputLock.Unlock() } a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { for _, aggr := range *a.as.Load() { aggr.flush() } } a.MustStop() // Verify the tssOutput contains the expected metrics tsStrings := make([]string, len(tssOutput)) for i, ts := range tssOutput { tsStrings[i] = timeSeriesToString(ts) } sort.Strings(tsStrings) outputMetrics := strings.Join(tsStrings, "") if outputMetrics != outputMetricsExpected { t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) } } // Empty config f(``, ``, ``) f(``, `foo{bar="baz"} 1`, ``) f(``, "foo 1\nbaz 2", ``) // Empty by list - aggregate only by time f(` - interval: 1m outputs: [count_samples, sum_samples, count_series, last] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_count_samples 1 bar:1m_count_series 1 bar:1m_last 5 bar:1m_sum_samples 5 foo:1m_count_samples{abc="123"} 2 foo:1m_count_samples{abc="456",de="fg"} 1 foo:1m_count_series{abc="123"} 1 foo:1m_count_series{abc="456",de="fg"} 1 foo:1m_last{abc="123"} 8.5 foo:1m_last{abc="456",de="fg"} 8 foo:1m_sum_samples{abc="123"} 12.5 foo:1m_sum_samples{abc="456",de="fg"} 8 `) // Special case: __name__ in `by` list - this is the same as empty `by` list f(` - interval: 1m by: [__name__] outputs: [count_samples, sum_samples, count_series] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_count_samples 1 bar:1m_count_series 1 bar:1m_sum_samples 5 foo:1m_count_samples 3 foo:1m_count_series 2 foo:1m_sum_samples 20.5 `) // Non-empty `by` list with non-existing labels f(` - interval: 1m by: [foo, bar] outputs: [count_samples, sum_samples, count_series] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_by_bar_foo_count_samples 1 bar:1m_by_bar_foo_count_series 1 bar:1m_by_bar_foo_sum_samples 5 foo:1m_by_bar_foo_count_samples 3 foo:1m_by_bar_foo_count_series 2 foo:1m_by_bar_foo_sum_samples 20.5 `) // Non-empty `by` list with existing label f(` - interval: 1m by: [abc] outputs: [count_samples, sum_samples, count_series] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_by_abc_count_samples 1 bar:1m_by_abc_count_series 1 bar:1m_by_abc_sum_samples 5 foo:1m_by_abc_count_samples{abc="123"} 2 foo:1m_by_abc_count_samples{abc="456"} 1 foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 `) // Non-empty `by` list with duplicate existing label f(` - interval: 1m by: [abc, abc] outputs: [count_samples, sum_samples, count_series] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_by_abc_count_samples 1 bar:1m_by_abc_count_series 1 bar:1m_by_abc_sum_samples 5 foo:1m_by_abc_count_samples{abc="123"} 2 foo:1m_by_abc_count_samples{abc="456"} 1 foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 `) // Non-empty `without` list with non-existing labels f(` - interval: 1m without: [foo] outputs: [count_samples, sum_samples, count_series] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_without_foo_count_samples 1 bar:1m_without_foo_count_series 1 bar:1m_without_foo_sum_samples 5 foo:1m_without_foo_count_samples{abc="123"} 2 foo:1m_without_foo_count_samples{abc="456",de="fg"} 1 foo:1m_without_foo_count_series{abc="123"} 1 foo:1m_without_foo_count_series{abc="456",de="fg"} 1 foo:1m_without_foo_sum_samples{abc="123"} 12.5 foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8 `) // Non-empty `without` list with existing labels f(` - interval: 1m without: [abc] outputs: [count_samples, sum_samples, count_series] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_without_abc_count_samples 1 bar:1m_without_abc_count_series 1 bar:1m_without_abc_sum_samples 5 foo:1m_without_abc_count_samples 2 foo:1m_without_abc_count_samples{de="fg"} 1 foo:1m_without_abc_count_series 1 foo:1m_without_abc_count_series{de="fg"} 1 foo:1m_without_abc_sum_samples 12.5 foo:1m_without_abc_sum_samples{de="fg"} 8 `) // Special case: __name__ in `without` list f(` - interval: 1m without: [__name__] outputs: [count_samples, sum_samples, count_series] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `:1m_count_samples 1 :1m_count_samples{abc="123"} 2 :1m_count_samples{abc="456",de="fg"} 1 :1m_count_series 1 :1m_count_series{abc="123"} 1 :1m_count_series{abc="456",de="fg"} 1 :1m_sum_samples 5 :1m_sum_samples{abc="123"} 12.5 :1m_sum_samples{abc="456",de="fg"} 8 `) // drop some input metrics f(` - interval: 1m without: [abc] outputs: [count_samples, sum_samples, count_series] input_relabel_configs: - if: 'foo' action: drop `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_without_abc_count_samples 1 bar:1m_without_abc_count_series 1 bar:1m_without_abc_sum_samples 5 `) // rename output metrics f(` - interval: 1m without: [abc] outputs: [count_samples, sum_samples, count_series] output_relabel_configs: - action: replace_all source_labels: [__name__] regex: ":|_" replacement: "-" target_label: __name__ - action: drop source_labels: [de] regex: fg `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar-1m-without-abc-count-samples 1 bar-1m-without-abc-count-series 1 bar-1m-without-abc-sum-samples 5 foo-1m-without-abc-count-samples 2 foo-1m-without-abc-count-series 1 foo-1m-without-abc-sum-samples 12.5 `) // match doesn't match anything f(` - interval: 1m without: [abc] outputs: [count_samples, sum_samples, count_series] match: '{non_existing_label!=""}' `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, ``) // match matches foo series with non-empty abc label f(` - interval: 1m by: [abc] outputs: [count_samples, sum_samples, count_series] match: 'foo{abc=~".+"}' `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `foo:1m_by_abc_count_samples{abc="123"} 2 foo:1m_by_abc_count_samples{abc="456"} 1 foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 `) // total output for non-repeated series f(` - interval: 1m outputs: [total] `, ` foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_total{baz="qwe"} 0 foo:1m_total 0 `) // total output for repeated series f(` - interval: 1m outputs: [total] `, ` foo 123 bar{baz="qwe"} 1.32 bar{baz="qwe"} 4.34 bar{baz="qwe"} 2 foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 `, `bar:1m_total{baz="qwe"} 5.02 bar:1m_total{baz="qwer"} 1 foo:1m_total 0 foo:1m_total{baz="qwe"} 15 `) // total output for repeated series with group by __name__ f(` - interval: 1m by: [__name__] outputs: [total] `, ` foo 123 bar{baz="qwe"} 1.32 bar{baz="qwe"} 4.34 bar{baz="qwe"} 2 foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 `, `bar:1m_total 6.02 foo:1m_total 15 `) // increase output for non-repeated series f(` - interval: 1m outputs: [increase] `, ` foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_increase{baz="qwe"} 0 foo:1m_increase 0 `) // increase output for repeated series f(` - interval: 1m outputs: [increase] `, ` foo 123 bar{baz="qwe"} 1.32 bar{baz="qwe"} 4.34 bar{baz="qwe"} 2 foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 `, `bar:1m_increase{baz="qwe"} 5.02 bar:1m_increase{baz="qwer"} 1 foo:1m_increase 0 foo:1m_increase{baz="qwe"} 15 `) // multiple aggregate configs f(` - interval: 1m outputs: [count_series, sum_samples] - interval: 5m by: [bar] outputs: [sum_samples] `, ` foo 1 foo{bar="baz"} 2 foo 3.3 `, `foo:1m_count_series 1 foo:1m_count_series{bar="baz"} 1 foo:1m_sum_samples 4.3 foo:1m_sum_samples{bar="baz"} 2 foo:5m_by_bar_sum_samples 4.3 foo:5m_by_bar_sum_samples{bar="baz"} 2 `) // min and max outputs f(` - interval: 1m outputs: [min, max] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_max 5 bar:1m_min 5 foo:1m_max{abc="123"} 8.5 foo:1m_max{abc="456",de="fg"} 8 foo:1m_min{abc="123"} 4 foo:1m_min{abc="456",de="fg"} 8 `) // avg output f(` - interval: 1m outputs: [avg] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_avg 5 foo:1m_avg{abc="123"} 6.25 foo:1m_avg{abc="456",de="fg"} 8 `) // stddev output f(` - interval: 1m outputs: [stddev] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_stddev 0 foo:1m_stddev{abc="123"} 2.25 foo:1m_stddev{abc="456",de="fg"} 0 `) // stdvar output f(` - interval: 1m outputs: [stdvar] `, ` foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 `, `bar:1m_stdvar 0 foo:1m_stdvar{abc="123"} 5.0625 foo:1m_stdvar{abc="456",de="fg"} 0 `) // histogram_bucket output f(` - interval: 1m outputs: [histogram_bucket] `, ` cpu_usage{cpu="1"} 12.5 cpu_usage{cpu="1"} 13.3 cpu_usage{cpu="1"} 13 cpu_usage{cpu="1"} 12 cpu_usage{cpu="1"} 14 cpu_usage{cpu="1"} 25 cpu_usage{cpu="2"} 90 `, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2 cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3 cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1 cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1 `) // histogram_bucket output without cpu f(` - interval: 1m without: [cpu] outputs: [histogram_bucket] `, ` cpu_usage{cpu="1"} 12.5 cpu_usage{cpu="1"} 13.3 cpu_usage{cpu="1"} 13 cpu_usage{cpu="1"} 12 cpu_usage{cpu="1"} 14 cpu_usage{cpu="1"} 25 cpu_usage{cpu="2"} 90 `, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1 `) // quantiles output f(` - interval: 1m outputs: ["quantiles(0, 0.5, 1)"] `, ` cpu_usage{cpu="1"} 12.5 cpu_usage{cpu="1"} 13.3 cpu_usage{cpu="1"} 13 cpu_usage{cpu="1"} 12 cpu_usage{cpu="1"} 14 cpu_usage{cpu="1"} 25 cpu_usage{cpu="2"} 90 `, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12 cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3 cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25 cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90 cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90 cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90 `) // quantiles output without cpu f(` - interval: 1m without: [cpu] outputs: ["quantiles(0, 0.5, 1)"] `, ` cpu_usage{cpu="1"} 12.5 cpu_usage{cpu="1"} 13.3 cpu_usage{cpu="1"} 13 cpu_usage{cpu="1"} 12 cpu_usage{cpu="1"} 14 cpu_usage{cpu="1"} 25 cpu_usage{cpu="2"} 90 `, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12 cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3 cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 `) } func TestAggregatorsWithDedupInterval(t *testing.T) { f := func(config, inputMetrics, outputMetricsExpected string) { t.Helper() // Initialize Aggregators var tssOutput []prompbmarshal.TimeSeries var tssOutputLock sync.Mutex pushFunc := func(tss []prompbmarshal.TimeSeries) { tssOutputLock.Lock() for _, ts := range tss { labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ Labels: labelsCopy, Samples: samplesCopy, }) } tssOutputLock.Unlock() } const dedupInterval = time.Hour a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { for _, aggr := range *a.as.Load() { aggr.dedupFlush() aggr.flush() } } a.MustStop() // Verify the tssOutput contains the expected metrics tsStrings := make([]string, len(tssOutput)) for i, ts := range tssOutput { tsStrings[i] = timeSeriesToString(ts) } sort.Strings(tsStrings) outputMetrics := strings.Join(tsStrings, "") if outputMetrics != outputMetricsExpected { t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) } } f(` - interval: 1m outputs: [sum_samples] `, ` foo 123 bar 567 `, `bar:1m_sum_samples 567 foo:1m_sum_samples 123 `) f(` - interval: 1m outputs: [sum_samples] `, ` foo 123 bar{baz="qwe"} 1.32 bar{baz="qwe"} 4.34 bar{baz="qwe"} 2 foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 `, `bar:1m_sum_samples{baz="qwe"} 2 bar:1m_sum_samples{baz="qwer"} 344 foo:1m_sum_samples 123 foo:1m_sum_samples{baz="qwe"} 10 `) } func TestAggregatorsReinit(t *testing.T) { f := func(config, newConfig, inputMetrics, outputMetricsExpected string) { t.Helper() // Initialize Aggregators var tssOutput []prompbmarshal.TimeSeries var tssOutputLock sync.Mutex pushFunc := func(tss []prompbmarshal.TimeSeries) { tssOutputLock.Lock() for _, ts := range tss { labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ Labels: labelsCopy, Samples: samplesCopy, }) } tssOutputLock.Unlock() } a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) // Reinitialize Aggregators nc, _, err := ParseConfig([]byte(newConfig)) if err != nil { t.Fatalf("cannot parse new config: %s", err) } err = a.ReInitConfigs(nc) if err != nil { t.Fatalf("cannot reinit aggregators: %s", err) } // Push the inputMetrics to Aggregators a.Push(tssInput) if a != nil { for _, aggr := range *a.as.Load() { aggr.flush() } } a.MustStop() // Verify the tssOutput contains the expected metrics tsStrings := make([]string, len(tssOutput)) for i, ts := range tssOutput { tsStrings[i] = timeSeriesToString(ts) } sort.Strings(tsStrings) outputMetrics := strings.Join(tsStrings, "") if outputMetrics != outputMetricsExpected { t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) } } f(` - interval: 1m outputs: [count_samples] `, ` - interval: 1m outputs: [sum_samples] `, ` foo 123 bar 567 foo 234 `, `bar:1m_count_samples 1 bar:1m_sum_samples 567 foo:1m_count_samples 2 foo:1m_sum_samples 357 `) f(` - interval: 1m outputs: [total] - interval: 2m outputs: [count_samples] `, ` - interval: 1m outputs: [sum_samples] - interval: 2m outputs: [count_samples] `, ` foo 123 bar 567 foo 234 `, `bar:1m_sum_samples 567 bar:1m_total 0 bar:2m_count_samples 2 foo:1m_sum_samples 357 foo:1m_total 111 foo:2m_count_samples 4 `) } func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 { panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples))) } return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value) } func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { var rows prometheus.Rows errLogger := func(s string) { panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) } rows.UnmarshalWithErrLogger(s, errLogger) var tss []prompbmarshal.TimeSeries samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) for _, row := range rows.Rows { labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) labels = append(labels, prompbmarshal.Label{ Name: "__name__", Value: row.Metric, }) for _, tag := range row.Tags { labels = append(labels, prompbmarshal.Label{ Name: tag.Key, Value: tag.Value, }) } samples = append(samples, prompbmarshal.Sample{ Value: row.Value, Timestamp: row.Timestamp, }) ts := prompbmarshal.TimeSeries{ Labels: labels, Samples: samples[len(samples)-1:], } tss = append(tss, ts) } return tss }