app/vmagent/remotewrite,lib/streamaggr: re-use common code in tests after 879771808b

- Export streamaggr.LoadFromData() function, so it could be used in tests outside the lib/streamaggr package.
  This allows removing a hack with creation of temporary files at TestRemoteWriteContext_TryPush_ImmutableTimeseries.

- Move common code for mustParsePromMetrics() function into lib/prompbmarshal package,
  so it could be used in tests for building []prompbmarshal.TimeSeries from string.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6206
This commit is contained in:
Aliaksandr Valialkin 2024-07-03 15:10:09 +02:00
parent e9abeac803
commit cc4d57d650
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 64 additions and 103 deletions

View file

@ -3,14 +3,12 @@ package remotewrite
import ( import (
"fmt" "fmt"
"math" "math"
"os"
"reflect" "reflect"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -83,15 +81,15 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
} }
if len(streamAggrConfig) > 0 { if len(streamAggrConfig) > 0 {
f := createFile(t, []byte(streamAggrConfig)) sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), nil, streamaggr.Options{})
sas, err := streamaggr.LoadFromFile(f.Name(), nil, streamaggr.Options{})
if err != nil { if err != nil {
t.Fatalf("cannot load streamaggr configs: %s", err) t.Fatalf("cannot load streamaggr configs: %s", err)
} }
rwctx.sas.Store(sas) rwctx.sas.Store(sas)
} }
inputTss := mustParsePromMetrics(input) offsetMsecs := time.Now().UnixMilli()
inputTss := prompbmarshal.MustParsePromMetrics(input, offsetMsecs)
expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss)) expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss))
// copy inputTss to make sure it is not mutated during TryPush call // copy inputTss to make sure it is not mutated during TryPush call
@ -165,51 +163,3 @@ metric{env="dev"} 15
metric{env="bar"} 25 metric{env="bar"} 25
`) `)
} }
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
}
rows.UnmarshalWithErrLogger(s, errLogger)
var tss []prompbmarshal.TimeSeries
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
for _, row := range rows.Rows {
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: row.Metric,
})
for _, tag := range row.Tags {
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: row.Value,
Timestamp: row.Timestamp,
})
ts := prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples[len(samples)-1:],
}
tss = append(tss, ts)
}
return tss
}
func createFile(t *testing.T, data []byte) *os.File {
t.Helper()
f, err := os.CreateTemp("", "")
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(f.Name(), data, 0644); err != nil {
t.Fatal(err)
}
if err := f.Sync(); err != nil {
t.Fatal(err)
}
return f
}

View file

@ -3,6 +3,7 @@ package prompbmarshal
import ( import (
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
@ -28,3 +29,43 @@ func ResetTimeSeries(tss []TimeSeries) []TimeSeries {
clear(tss) clear(tss)
return tss[:0] return tss[:0]
} }
// MustParsePromMetrics parses metrics in Prometheus text exposition format from s and returns them.
//
// Metrics must be delimited with newlines.
//
// offsetMsecs is added to every timestamp in parsed metrics.
//
// This function is for testing purposes only. Do not use it in non-test code.
func MustParsePromMetrics(s string, offsetMsecs int64) []TimeSeries {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
}
rows.UnmarshalWithErrLogger(s, errLogger)
tss := make([]TimeSeries, 0, len(rows.Rows))
samples := make([]Sample, 0, len(rows.Rows))
for _, row := range rows.Rows {
labels := make([]Label, 0, len(row.Tags)+1)
labels = append(labels, Label{
Name: "__name__",
Value: row.Metric,
})
for _, tag := range row.Tags {
labels = append(labels, Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, Sample{
Value: row.Value,
Timestamp: row.Timestamp + offsetMsecs,
})
ts := TimeSeries{
Labels: labels,
Samples: samples[len(samples)-1:],
}
tss = append(tss, ts)
}
return tss
}

View file

@ -17,7 +17,8 @@ func TestDeduplicator(t *testing.T) {
tssResultLock.Unlock() tssResultLock.Unlock()
} }
tss := mustParsePromMetrics(` offsetMsecs := time.Now().UnixMilli()
tss := prompbmarshal.MustParsePromMetrics(`
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123 foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123
bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54 bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54
x 8943 1000 x 8943 1000
@ -27,7 +28,7 @@ x 433 1000
asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322 asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894 foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
`) `, offsetMsecs)
d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global") d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global")
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {

View file

@ -78,7 +78,7 @@ func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, e
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
} }
as, err := newAggregatorsFromData(data, pushFunc, opts) as, err := LoadFromData(data, pushFunc, opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err) return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err)
} }
@ -252,7 +252,8 @@ type Aggregators struct {
ms *metrics.Set ms *metrics.Set
} }
func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) { // LoadFromData loads aggregators from data.
func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) {
var cfgs []*Config var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)

View file

@ -11,7 +11,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
) )
func TestAggregatorsFailure(t *testing.T) { func TestAggregatorsFailure(t *testing.T) {
@ -20,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) {
pushFunc := func(_ []prompbmarshal.TimeSeries) { pushFunc := func(_ []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called")) panic(fmt.Errorf("pushFunc shouldn't be called"))
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) a, err := LoadFromData([]byte(config), pushFunc, Options{})
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
@ -158,11 +157,11 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper() t.Helper()
pushFunc := func(_ []prompbmarshal.TimeSeries) {} pushFunc := func(_ []prompbmarshal.TimeSeries) {}
aa, err := newAggregatorsFromData([]byte(a), pushFunc, Options{}) aa, err := LoadFromData([]byte(a), pushFunc, Options{})
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
ab, err := newAggregatorsFromData([]byte(b), pushFunc, Options{}) ab, err := LoadFromData([]byte(b), pushFunc, Options{})
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
@ -225,13 +224,14 @@ func TestAggregatorsSuccess(t *testing.T) {
FlushOnShutdown: true, FlushOnShutdown: true,
NoAlignFlushToInterval: true, NoAlignFlushToInterval: true,
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) a, err := LoadFromData([]byte(config), pushFunc, opts)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) offsetMsecs := time.Now().UnixMilli()
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
matchIdxs := a.Push(tssInput, nil) matchIdxs := a.Push(tssInput, nil)
a.MustStop() a.MustStop()
@ -839,7 +839,7 @@ foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
`, "1111") `, "1111")
// test rate_sum and rate_avg // test rate_sum and rate_avg
f(` f(`
- interval: 1m - interval: 1m
by: [cde] by: [cde]
outputs: [rate_sum, rate_avg] outputs: [rate_sum, rate_avg]
@ -853,7 +853,7 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.65
`, "1111") `, "1111")
// rate with duplicated events // rate with duplicated events
f(` f(`
- interval: 1m - interval: 1m
by: [cde] by: [cde]
outputs: [rate_sum, rate_avg] outputs: [rate_sum, rate_avg]
@ -921,13 +921,14 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
DedupInterval: 30 * time.Second, DedupInterval: 30 * time.Second,
FlushOnShutdown: true, FlushOnShutdown: true,
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) a, err := LoadFromData([]byte(config), pushFunc, opts)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) offsetMsecs := time.Now().UnixMilli()
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
matchIdxs := a.Push(tssInput, nil) matchIdxs := a.Push(tssInput, nil)
a.MustStop() a.MustStop()
@ -998,40 +999,6 @@ func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value) return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
} }
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
}
rows.UnmarshalWithErrLogger(s, errLogger)
var tss []prompbmarshal.TimeSeries
now := time.Now().UnixMilli()
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
for _, row := range rows.Rows {
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: row.Metric,
})
for _, tag := range row.Tags {
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: row.Value,
Timestamp: now + row.Timestamp,
})
ts := prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples[len(samples)-1:],
}
tss = append(tss, ts)
}
return tss
}
func appendClonedTimeseries(dst, src []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { func appendClonedTimeseries(dst, src []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
for _, ts := range src { for _, ts := range src {
dst = append(dst, prompbmarshal.TimeSeries{ dst = append(dst, prompbmarshal.TimeSeries{

View file

@ -92,7 +92,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputs: [%s] outputs: [%s]
`, strings.Join(outputsQuoted, ",")) `, strings.Join(outputsQuoted, ","))
a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) a, err := LoadFromData([]byte(config), pushFunc, Options{})
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
} }
@ -107,7 +107,8 @@ func newBenchSeries(seriesCount int) []prompbmarshal.TimeSeries {
a = append(a, s) a = append(a, s)
} }
metrics := strings.Join(a, "\n") metrics := strings.Join(a, "\n")
return mustParsePromMetrics(metrics) offsetMsecs := time.Now().UnixMilli()
return prompbmarshal.MustParsePromMetrics(metrics, offsetMsecs)
} }
const seriesCount = 10_000 const seriesCount = 10_000