mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent/remotewrite: do not cleanup timeseries which are used in multiple remote write contexts (#6206)
When at least one remote write has deduplication configured it cleans up timeseries while they can be in use by another remote write without deduplication https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205 --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
c0050beadc
commit
879771808b
3 changed files with 172 additions and 1 deletions
|
@ -910,6 +910,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||||
rwctx.rowsDroppedByRelabel = nil
|
rwctx.rowsDroppedByRelabel = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TryPush sends tss series to the configured remote write endpoint
|
||||||
|
//
|
||||||
|
// TryPush can be called concurrently for multiple remoteWriteCtx,
|
||||||
|
// so it shouldn't modify tss entries.
|
||||||
func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
|
func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
|
||||||
// Apply relabeling
|
// Apply relabeling
|
||||||
var rctx *relabelCtx
|
var rctx *relabelCtx
|
||||||
|
@ -949,7 +953,6 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
|
||||||
matchIdxsPool.Put(matchIdxs)
|
matchIdxsPool.Put(matchIdxs)
|
||||||
} else if rwctx.deduplicator != nil {
|
} else if rwctx.deduplicator != nil {
|
||||||
rwctx.deduplicator.Push(tss)
|
rwctx.deduplicator.Push(tss)
|
||||||
clear(tss)
|
|
||||||
tss = tss[:0]
|
tss = tss[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,16 @@ package remotewrite
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetLabelsHash_Distribution(t *testing.T) {
|
func TestGetLabelsHash_Distribution(t *testing.T) {
|
||||||
|
@ -46,3 +53,163 @@ func TestGetLabelsHash_Distribution(t *testing.T) {
|
||||||
f(5)
|
f(5)
|
||||||
f(10)
|
f(10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
|
||||||
|
f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) {
|
||||||
|
t.Helper()
|
||||||
|
perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot load relabel configs: %s", err)
|
||||||
|
}
|
||||||
|
rcs := &relabelConfigs{
|
||||||
|
perURL: []*promrelabel.ParsedConfigs{
|
||||||
|
perURLRelabel,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
allRelabelConfigs.Store(rcs)
|
||||||
|
|
||||||
|
pss := make([]*pendingSeries, 1)
|
||||||
|
pss[0] = newPendingSeries(nil, true, 0, 100)
|
||||||
|
rwctx := &remoteWriteCtx{
|
||||||
|
idx: 0,
|
||||||
|
streamAggrKeepInput: keepInput,
|
||||||
|
streamAggrDropInput: dropInput,
|
||||||
|
pss: pss,
|
||||||
|
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(`foo`),
|
||||||
|
rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`),
|
||||||
|
}
|
||||||
|
if dedupInterval > 0 {
|
||||||
|
rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(streamAggrConfig) > 0 {
|
||||||
|
f := createFile(t, []byte(streamAggrConfig))
|
||||||
|
sas, err := streamaggr.LoadFromFile(f.Name(), nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot load streamaggr configs: %s", err)
|
||||||
|
}
|
||||||
|
rwctx.sas.Store(sas)
|
||||||
|
}
|
||||||
|
|
||||||
|
inputTss := mustParsePromMetrics(input)
|
||||||
|
expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss))
|
||||||
|
|
||||||
|
// copy inputTss to make sure it is not mutated during TryPush call
|
||||||
|
copy(expectedTss, inputTss)
|
||||||
|
rwctx.TryPush(inputTss)
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(expectedTss, inputTss) {
|
||||||
|
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f(`
|
||||||
|
- interval: 1m
|
||||||
|
outputs: [sum_samples]
|
||||||
|
- interval: 2m
|
||||||
|
outputs: [count_series]
|
||||||
|
`, `
|
||||||
|
- action: keep
|
||||||
|
source_labels: [env]
|
||||||
|
regex: "dev"
|
||||||
|
`, 0, false, false, `
|
||||||
|
metric{env="dev"} 10
|
||||||
|
metric{env="bar"} 20
|
||||||
|
metric{env="dev"} 15
|
||||||
|
metric{env="bar"} 25
|
||||||
|
`)
|
||||||
|
f(``, ``, time.Hour, false, false, `
|
||||||
|
metric{env="dev"} 10
|
||||||
|
metric{env="foo"} 20
|
||||||
|
metric{env="dev"} 15
|
||||||
|
metric{env="foo"} 25
|
||||||
|
`)
|
||||||
|
f(``, `
|
||||||
|
- action: keep
|
||||||
|
source_labels: [env]
|
||||||
|
regex: "dev"
|
||||||
|
`, time.Hour, false, false, `
|
||||||
|
metric{env="dev"} 10
|
||||||
|
metric{env="bar"} 20
|
||||||
|
metric{env="dev"} 15
|
||||||
|
metric{env="bar"} 25
|
||||||
|
`)
|
||||||
|
f(``, `
|
||||||
|
- action: keep
|
||||||
|
source_labels: [env]
|
||||||
|
regex: "dev"
|
||||||
|
`, time.Hour, true, false, `
|
||||||
|
metric{env="test"} 10
|
||||||
|
metric{env="dev"} 20
|
||||||
|
metric{env="foo"} 15
|
||||||
|
metric{env="dev"} 25
|
||||||
|
`)
|
||||||
|
f(``, `
|
||||||
|
- action: keep
|
||||||
|
source_labels: [env]
|
||||||
|
regex: "dev"
|
||||||
|
`, time.Hour, false, true, `
|
||||||
|
metric{env="foo"} 10
|
||||||
|
metric{env="dev"} 20
|
||||||
|
metric{env="foo"} 15
|
||||||
|
metric{env="dev"} 25
|
||||||
|
`)
|
||||||
|
f(``, `
|
||||||
|
- action: keep
|
||||||
|
source_labels: [env]
|
||||||
|
regex: "dev"
|
||||||
|
`, time.Hour, true, true, `
|
||||||
|
metric{env="dev"} 10
|
||||||
|
metric{env="test"} 20
|
||||||
|
metric{env="dev"} 15
|
||||||
|
metric{env="bar"} 25
|
||||||
|
`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
||||||
|
var rows prometheus.Rows
|
||||||
|
errLogger := func(s string) {
|
||||||
|
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
|
||||||
|
}
|
||||||
|
rows.UnmarshalWithErrLogger(s, errLogger)
|
||||||
|
var tss []prompbmarshal.TimeSeries
|
||||||
|
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
|
||||||
|
for _, row := range rows.Rows {
|
||||||
|
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
|
||||||
|
labels = append(labels, prompbmarshal.Label{
|
||||||
|
Name: "__name__",
|
||||||
|
Value: row.Metric,
|
||||||
|
})
|
||||||
|
for _, tag := range row.Tags {
|
||||||
|
labels = append(labels, prompbmarshal.Label{
|
||||||
|
Name: tag.Key,
|
||||||
|
Value: tag.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
samples = append(samples, prompbmarshal.Sample{
|
||||||
|
Value: row.Value,
|
||||||
|
Timestamp: row.Timestamp,
|
||||||
|
})
|
||||||
|
ts := prompbmarshal.TimeSeries{
|
||||||
|
Labels: labels,
|
||||||
|
Samples: samples[len(samples)-1:],
|
||||||
|
}
|
||||||
|
tss = append(tss, ts)
|
||||||
|
}
|
||||||
|
return tss
|
||||||
|
}
|
||||||
|
|
||||||
|
func createFile(t *testing.T, data []byte) *os.File {
|
||||||
|
t.Helper()
|
||||||
|
f, err := os.CreateTemp("", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := os.WriteFile(f.Name(), data, 0644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := f.Sync(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details.
|
* FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details.
|
||||||
|
|
||||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
|
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205).
|
||||||
|
|
||||||
## [v1.101.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.101.0)
|
## [v1.101.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.101.0)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue