{lib/streamaggr,vmagent/remotewrite}: breaking change for keepInput flag (#4575)

* {lib/streamaggr,vmagent/remotewrite}: breaking change for keepInput flag

Changes default behaviour of keepInput flag to write series which did not match any aggregators to the remote write.
See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* Update app/vmagent/remotewrite/remotewrite.go

Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-07-25 03:33:30 +04:00 committed by GitHub
parent b6ae325763
commit 736197179e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 160 additions and 40 deletions

View file

@ -620,7 +620,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
v = tssRelabelPool.Get().(*[]prompbmarshal.TimeSeries)
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
tss = append(*v, tss...)
rowsCountBeforeRelabel := getRowsCount(tss)
tss = rctx.applyRelabeling(tss, nil, pcs)
@ -630,20 +630,41 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
rowsCount := getRowsCount(tss)
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
// Apply stream aggregation if any
sas := rwctx.sas.Load()
sas.Push(tss)
if sas == nil || rwctx.streamAggrKeepInput {
// Push samples to the remote storage
rwctx.pushInternal(tss)
defer func() {
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssRelabelPool.Put(v)
putRelabelCtx(rctx)
// Load stream aggregagation config
sas := rwctx.sas.Load()
// Fast path, no need to track used series
if sas == nil || rwctx.streamAggrKeepInput {
// Apply stream aggregation to the input samples
// it's safe to call sas.Push with sas == nil
sas.Push(tss, nil)
// Push all samples to the remote storage
rwctx.pushInternal(tss)
return
}
// Track series which were used for stream aggregation.
ut := streamaggr.NewTssUsageTracker(len(tss))
sas.Push(tss, ut.Matched)
unmatchedSeries := tssPool.Get().(*[]prompbmarshal.TimeSeries)
// Push only unmatched series to the remote storage
*unmatchedSeries = ut.GetUnmatched(tss, *unmatchedSeries)
rwctx.pushInternal(*unmatchedSeries)
*unmatchedSeries = prompbmarshal.ResetTimeSeries(*unmatchedSeries)
tssPool.Put(unmatchedSeries)
}
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
@ -682,7 +703,7 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
var tssRelabelPool = &sync.Pool{
var tssPool = &sync.Pool{
New: func() interface{} {
a := []prompbmarshal.TimeSeries{}
return &a

View file

@ -164,7 +164,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
ts.Labels = labels
ts.Samples = samples
sas.Push(tss)
sas.Push(tss, nil)
}
}

View file

@ -24,6 +24,10 @@ The following `tip` changes can be tested by building VictoriaMetrics components
## tip
**Update notes:** release contains breaking change to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) `-remoteWrite.streamAggr.keepInput` command-line flag.
Default behaviour has changed to keep metrics which were not matched by any aggregation rule when `-remoteWrite.streamAggr.keepInput` is set to false (default value).
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) for details.
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).

View file

@ -10,6 +10,8 @@ import (
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -18,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"gopkg.in/yaml.v2"
)
var supportedOutputs = []string{
@ -194,12 +195,12 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
}
// Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.Push(tss)
aggr.Push(tss, matched)
}
}
@ -449,7 +450,7 @@ func (a *aggregator) dedupFlush() {
skipAggrSuffix: true,
}
a.dedupAggr.appendSeriesForFlush(ctx)
a.push(ctx.tss, false)
a.push(ctx.tss, nil)
}
func (a *aggregator) flush() {
@ -498,9 +499,9 @@ func (a *aggregator) MustStop() {
}
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) {
if a.dedupAggr == nil {
a.push(tss, true)
a.push(tss, matched)
return
}
@ -509,20 +510,14 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
pushSample := a.dedupAggr.pushSample
inputKey := ""
bb := bbPool.Get()
labels := promutils.GetLabels()
for _, ts := range tss {
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
if matched != nil {
matched(idx)
}
labels.Sort()
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
bb.B = marshalLabelsFast(bb.B[:0], ts.Labels)
outputKey := bytesutil.InternBytes(bb.B)
for _, sample := range ts.Samples {
pushSample(inputKey, outputKey, sample.Value)
@ -532,14 +527,17 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
bbPool.Put(bb)
}
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, applyFilters bool) {
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, tracker func(id int)) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
for _, ts := range tss {
if applyFilters && !a.match.Match(ts.Labels) {
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
if tracker != nil {
tracker(idx)
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
if applyFilters {
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
@ -804,3 +802,30 @@ func sortAndRemoveDuplicates(a []string) []string {
}
return dst
}
// TssUsageTracker tracks used series for streaming aggregation.
type TssUsageTracker struct {
usedSeries map[int]struct{}
}
// NewTssUsageTracker returns new TssUsageTracker.
func NewTssUsageTracker(totalSeries int) *TssUsageTracker {
return &TssUsageTracker{usedSeries: make(map[int]struct{}, totalSeries)}
}
// Matched marks series with id as used.
// Not safe for concurrent use. The caller must
// ensure that there are no concurrent calls to Matched.
func (tut *TssUsageTracker) Matched(id int) {
tut.usedSeries[id] = struct{}{}
}
// GetUnmatched returns unused series from tss.
func (tut *TssUsageTracker) GetUnmatched(tss, dst []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
for k := range tss {
if _, ok := tut.usedSeries[k]; !ok {
dst = append(dst, tss[k])
}
}
return dst
}

View file

@ -183,7 +183,7 @@ func TestAggregatorsSuccess(t *testing.T) {
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
a.Push(tssInput, nil)
a.MustStop()
// Verify the tssOutput contains the expected metrics
@ -703,7 +703,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
a.Push(tssInput, nil)
if a != nil {
for _, aggr := range a.as {
aggr.dedupFlush()

View file

@ -37,8 +37,13 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
without: [job]
outputs: [%q]
`, output)
i := 0
pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("unexpected pushFunc call"))
i++
if i > 1 {
// pushFunc is expected to be called exactly once at MustStop
panic(fmt.Errorf("unexpected pushFunc call"))
}
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
@ -50,16 +55,78 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
b.SetBytes(int64(len(benchSeries)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Push(benchSeries)
a.Push(benchSeries, nil)
}
})
}
func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSeries {
func BenchmarkAggregatorsPushWithSeriesTracker(b *testing.B) {
config := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
without: [job]
outputs: [%q]
`, "total")
i := 0
pushFunc := func(tss []prompbmarshal.TimeSeries) {
i++
if i > 1 {
// pushFunc is expected to be called exactly once at MustStop
panic(fmt.Errorf("unexpected pushFunc call"))
}
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
b.Fatalf("unexpected error when initializing aggregators: %s", err)
}
defer a.MustStop()
tests := []struct {
name string
series []prompbmarshal.TimeSeries
}{
{
name: "all matches",
series: benchSeries,
},
{
name: "no matches",
series: benchSeriesWithRandomNames100,
},
{
name: "50% matches",
series: benchSeriesWithRandomNames50,
},
{
name: "10% matches",
series: benchSeriesWithRandomNames10,
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(tt.series)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ut := NewTssUsageTracker(len(tt.series))
a.Push(tt.series, ut.Matched)
}
})
})
}
}
func newBenchSeries(seriesCount, samplesPerSeries int, randomNameFraction float64) []prompbmarshal.TimeSeries {
a := make([]string, seriesCount*samplesPerSeries)
for i := 0; i < samplesPerSeries; i++ {
for j := 0; j < seriesCount; j++ {
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo",instance="bar"} %d`, j, i*10)
metricName := "http_requests_total"
if randomNameFraction > 0 && j%int(1/randomNameFraction) == 0 {
metricName = fmt.Sprintf("random_other_name_%d", j)
}
s := fmt.Sprintf(`%s{path="/foo/%d",job="foo",instance="bar"} %d`, metricName, j, i*10)
a = append(a, s)
}
}
@ -70,4 +137,7 @@ func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSerie
const seriesCount = 10000
const samplesPerSeries = 10
var benchSeries = newBenchSeries(seriesCount, samplesPerSeries)
var benchSeries = newBenchSeries(seriesCount, samplesPerSeries, 0)
var benchSeriesWithRandomNames10 = newBenchSeries(seriesCount, samplesPerSeries, 0.1)
var benchSeriesWithRandomNames50 = newBenchSeries(seriesCount, samplesPerSeries, 0.5)
var benchSeriesWithRandomNames100 = newBenchSeries(seriesCount, samplesPerSeries, 1)