From cc4d57d6509196a42d6c674e387da0e20cc56da5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 3 Jul 2024 15:10:09 +0200 Subject: [PATCH] app/vmagent/remotewrite,lib/streamaggr: re-use common code in tests after 879771808b7f7fe6bd3020957fb6835a02424846 - Export streamaggr.LoadFromData() function, so it could be used in tests outside the lib/streamaggr package. This allows removing a hack with creation of temporary files at TestRemoteWriteContext_TryPush_ImmutableTimeseries. - Move common code for mustParsePromMetrics() function into lib/prompbmarshal package, so it could be used in tests for building []prompbmarshal.TimeSeries from string. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6206 --- app/vmagent/remotewrite/remotewrite_test.go | 56 ++------------------- lib/prompbmarshal/util.go | 41 +++++++++++++++ lib/streamaggr/deduplicator_test.go | 5 +- lib/streamaggr/streamaggr.go | 5 +- lib/streamaggr/streamaggr_test.go | 55 ++++---------------- lib/streamaggr/streamaggr_timing_test.go | 5 +- 6 files changed, 64 insertions(+), 103 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 7fd2179a1..d86b20eda 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -3,14 +3,12 @@ package remotewrite import ( "fmt" "math" - "os" "reflect" "testing" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/metrics" ) @@ -83,15 +81,15 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { } if len(streamAggrConfig) > 0 { - f := createFile(t, []byte(streamAggrConfig)) - sas, err := streamaggr.LoadFromFile(f.Name(), nil, streamaggr.Options{}) + sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), nil, streamaggr.Options{}) if err != nil { t.Fatalf("cannot load streamaggr configs: %s", err) } rwctx.sas.Store(sas) } - inputTss := mustParsePromMetrics(input) + offsetMsecs := time.Now().UnixMilli() + inputTss := prompbmarshal.MustParsePromMetrics(input, offsetMsecs) expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss)) // copy inputTss to make sure it is not mutated during TryPush call @@ -165,51 +163,3 @@ metric{env="dev"} 15 metric{env="bar"} 25 `) } - -func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { - var rows prometheus.Rows - errLogger := func(s string) { - panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) - } - rows.UnmarshalWithErrLogger(s, errLogger) - var tss []prompbmarshal.TimeSeries - samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) - for _, row := range rows.Rows { - labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) - labels = append(labels, prompbmarshal.Label{ - Name: "__name__", - Value: row.Metric, - }) - for _, tag := range row.Tags { - labels = append(labels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, - }) - } - samples = append(samples, prompbmarshal.Sample{ - Value: row.Value, - Timestamp: row.Timestamp, - }) - ts := prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples[len(samples)-1:], - } - tss = append(tss, ts) - } - return tss -} - -func createFile(t *testing.T, data []byte) *os.File { - t.Helper() - f, err := os.CreateTemp("", "") - if err != nil { - t.Fatal(err) - } - if err := os.WriteFile(f.Name(), data, 0644); err != nil { - t.Fatal(err) - } - if err := f.Sync(); err != nil { - t.Fatal(err) - } - return f -} diff --git a/lib/prompbmarshal/util.go b/lib/prompbmarshal/util.go index 688c450a5..eb281fed1 100644 --- a/lib/prompbmarshal/util.go +++ b/lib/prompbmarshal/util.go @@ -3,6 +3,7 @@ package prompbmarshal import ( "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) @@ -28,3 +29,43 @@ func ResetTimeSeries(tss []TimeSeries) []TimeSeries { clear(tss) return tss[:0] } + +// MustParsePromMetrics parses metrics in Prometheus text exposition format from s and returns them. +// +// Metrics must be delimited with newlines. +// +// offsetMsecs is added to every timestamp in parsed metrics. +// +// This function is for testing purposes only. Do not use it in non-test code. +func MustParsePromMetrics(s string, offsetMsecs int64) []TimeSeries { + var rows prometheus.Rows + errLogger := func(s string) { + panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) + } + rows.UnmarshalWithErrLogger(s, errLogger) + tss := make([]TimeSeries, 0, len(rows.Rows)) + samples := make([]Sample, 0, len(rows.Rows)) + for _, row := range rows.Rows { + labels := make([]Label, 0, len(row.Tags)+1) + labels = append(labels, Label{ + Name: "__name__", + Value: row.Metric, + }) + for _, tag := range row.Tags { + labels = append(labels, Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + samples = append(samples, Sample{ + Value: row.Value, + Timestamp: row.Timestamp + offsetMsecs, + }) + ts := TimeSeries{ + Labels: labels, + Samples: samples[len(samples)-1:], + } + tss = append(tss, ts) + } + return tss +} diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go index fe32cbb7a..1d304f6dd 100644 --- a/lib/streamaggr/deduplicator_test.go +++ b/lib/streamaggr/deduplicator_test.go @@ -17,7 +17,8 @@ func TestDeduplicator(t *testing.T) { tssResultLock.Unlock() } - tss := mustParsePromMetrics(` + offsetMsecs := time.Now().UnixMilli() + tss := prompbmarshal.MustParsePromMetrics(` foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123 bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54 x 8943 1000 @@ -27,7 +28,7 @@ x 433 1000 asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322 foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 -`) +`, offsetMsecs) d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global") for i := 0; i < 10; i++ { diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c020793c3..3fa074fbc 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -78,7 +78,7 @@ func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, e return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) } - as, err := newAggregatorsFromData(data, pushFunc, opts) + as, err := LoadFromData(data, pushFunc, opts) if err != nil { return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err) } @@ -252,7 +252,8 @@ type Aggregators struct { ms *metrics.Set } -func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) { +// LoadFromData loads aggregators from data. +func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index c6d17b832..105caa00c 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" ) func TestAggregatorsFailure(t *testing.T) { @@ -20,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(_ []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) + a, err := LoadFromData([]byte(config), pushFunc, Options{}) if err == nil { t.Fatalf("expecting non-nil error") } @@ -158,11 +157,11 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(_ []prompbmarshal.TimeSeries) {} - aa, err := newAggregatorsFromData([]byte(a), pushFunc, Options{}) + aa, err := LoadFromData([]byte(a), pushFunc, Options{}) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := newAggregatorsFromData([]byte(b), pushFunc, Options{}) + ab, err := LoadFromData([]byte(b), pushFunc, Options{}) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -225,13 +224,14 @@ func TestAggregatorsSuccess(t *testing.T) { FlushOnShutdown: true, NoAlignFlushToInterval: true, } - a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) + a, err := LoadFromData([]byte(config), pushFunc, opts) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators - tssInput := mustParsePromMetrics(inputMetrics) + offsetMsecs := time.Now().UnixMilli() + tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs) matchIdxs := a.Push(tssInput, nil) a.MustStop() @@ -839,7 +839,7 @@ foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5 `, "1111") // test rate_sum and rate_avg - f(` + f(` - interval: 1m by: [cde] outputs: [rate_sum, rate_avg] @@ -853,7 +853,7 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.65 `, "1111") // rate with duplicated events - f(` + f(` - interval: 1m by: [cde] outputs: [rate_sum, rate_avg] @@ -921,13 +921,14 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { DedupInterval: 30 * time.Second, FlushOnShutdown: true, } - a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) + a, err := LoadFromData([]byte(config), pushFunc, opts) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators - tssInput := mustParsePromMetrics(inputMetrics) + offsetMsecs := time.Now().UnixMilli() + tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs) matchIdxs := a.Push(tssInput, nil) a.MustStop() @@ -998,40 +999,6 @@ func timeSeriesToString(ts prompbmarshal.TimeSeries) string { return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value) } -func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { - var rows prometheus.Rows - errLogger := func(s string) { - panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) - } - rows.UnmarshalWithErrLogger(s, errLogger) - var tss []prompbmarshal.TimeSeries - now := time.Now().UnixMilli() - samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) - for _, row := range rows.Rows { - labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) - labels = append(labels, prompbmarshal.Label{ - Name: "__name__", - Value: row.Metric, - }) - for _, tag := range row.Tags { - labels = append(labels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, - }) - } - samples = append(samples, prompbmarshal.Sample{ - Value: row.Value, - Timestamp: now + row.Timestamp, - }) - ts := prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples[len(samples)-1:], - } - tss = append(tss, ts) - } - return tss -} - func appendClonedTimeseries(dst, src []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { for _, ts := range src { dst = append(dst, prompbmarshal.TimeSeries{ diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index b328a61a0..26d3446bd 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -92,7 +92,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { outputs: [%s] `, strings.Join(outputsQuoted, ",")) - a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) + a, err := LoadFromData([]byte(config), pushFunc, Options{}) if err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) } @@ -107,7 +107,8 @@ func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries { a = append(a, s) } metrics := strings.Join(a, "\n") - return mustParsePromMetrics(metrics) + offsetMsecs := time.Now().UnixMilli() + return prompbmarshal.MustParsePromMetrics(metrics, offsetMsecs) } const seriesCount = 10_000