diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 93e41b50d..b9d99d1c7 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -910,6 +910,10 @@ func (rwctx *remoteWriteCtx) MustStop() { rwctx.rowsDroppedByRelabel = nil } +// TryPush sends tss series to the configured remote write endpoint +// +// TryPush can be called concurrently for multiple remoteWriteCtx, +// so it shouldn't modify tss entries. func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool { // Apply relabeling var rctx *relabelCtx @@ -949,7 +953,6 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool { matchIdxsPool.Put(matchIdxs) } else if rwctx.deduplicator != nil { rwctx.deduplicator.Push(tss) - clear(tss) tss = tss[:0] } diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 16c404ef2..2fd8a9011 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -3,9 +3,16 @@ package remotewrite import ( "fmt" "math" + "os" + "reflect" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" + "github.com/VictoriaMetrics/metrics" ) func TestGetLabelsHash_Distribution(t *testing.T) { @@ -46,3 +53,163 @@ func TestGetLabelsHash_Distribution(t *testing.T) { f(5) f(10) } + +func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { + f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) { + t.Helper() + perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig)) + if err != nil { + t.Fatalf("cannot load relabel configs: %s", err) + } + rcs := &relabelConfigs{ + perURL: []*promrelabel.ParsedConfigs{ + perURLRelabel, + }, + } + allRelabelConfigs.Store(rcs) + + pss := make([]*pendingSeries, 1) + pss[0] = newPendingSeries(nil, true, 0, 100) + rwctx := &remoteWriteCtx{ + idx: 0, + streamAggrKeepInput: keepInput, + streamAggrDropInput: dropInput, + pss: pss, + rowsPushedAfterRelabel: metrics.GetOrCreateCounter(`foo`), + rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`), + } + if dedupInterval > 0 { + rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil) + } + + if len(streamAggrConfig) > 0 { + f := createFile(t, []byte(streamAggrConfig)) + sas, err := streamaggr.LoadFromFile(f.Name(), nil, nil) + if err != nil { + t.Fatalf("cannot load streamaggr configs: %s", err) + } + rwctx.sas.Store(sas) + } + + inputTss := mustParsePromMetrics(input) + expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss)) + + // copy inputTss to make sure it is not mutated during TryPush call + copy(expectedTss, inputTss) + rwctx.TryPush(inputTss) + + if !reflect.DeepEqual(expectedTss, inputTss) { + t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss) + } + } + + f(` +- interval: 1m + outputs: [sum_samples] +- interval: 2m + outputs: [count_series] +`, ` +- action: keep + source_labels: [env] + regex: "dev" +`, 0, false, false, ` +metric{env="dev"} 10 +metric{env="bar"} 20 +metric{env="dev"} 15 +metric{env="bar"} 25 +`) + f(``, ``, time.Hour, false, false, ` +metric{env="dev"} 10 +metric{env="foo"} 20 +metric{env="dev"} 15 +metric{env="foo"} 25 +`) + f(``, ` +- action: keep + source_labels: [env] + regex: "dev" +`, time.Hour, false, false, ` +metric{env="dev"} 10 +metric{env="bar"} 20 +metric{env="dev"} 15 +metric{env="bar"} 25 +`) + f(``, ` +- action: keep + source_labels: [env] + regex: "dev" +`, time.Hour, true, false, ` +metric{env="test"} 10 +metric{env="dev"} 20 +metric{env="foo"} 15 +metric{env="dev"} 25 +`) + f(``, ` +- action: keep + source_labels: [env] + regex: "dev" +`, time.Hour, false, true, ` +metric{env="foo"} 10 +metric{env="dev"} 20 +metric{env="foo"} 15 +metric{env="dev"} 25 +`) + f(``, ` +- action: keep + source_labels: [env] + regex: "dev" +`, time.Hour, true, true, ` +metric{env="dev"} 10 +metric{env="test"} 20 +metric{env="dev"} 15 +metric{env="bar"} 25 +`) +} + +func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { + var rows prometheus.Rows + errLogger := func(s string) { + panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) + } + rows.UnmarshalWithErrLogger(s, errLogger) + var tss []prompbmarshal.TimeSeries + samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) + for _, row := range rows.Rows { + labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: row.Metric, + }) + for _, tag := range row.Tags { + labels = append(labels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: row.Value, + Timestamp: row.Timestamp, + }) + ts := prompbmarshal.TimeSeries{ + Labels: labels, + Samples: samples[len(samples)-1:], + } + tss = append(tss, ts) + } + return tss +} + +func createFile(t *testing.T, data []byte) *os.File { + t.Helper() + f, err := os.CreateTemp("", "") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(f.Name(), data, 0644); err != nil { + t.Fatal(err) + } + if err := f.Sync(); err != nil { + t.Fatal(err) + } + return f +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 365af547a..76fb4a3dd 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -37,6 +37,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205). ## [v1.101.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.101.0)