From 9505d48070aefbb91ac6d2a5cf951592e22fa5e1 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 29 Nov 2023 09:03:04 +0100 Subject: [PATCH] lib/streamaggr: properly reference slice with labels (#5406) * lib/streamaggr: properly reference slice with labels by limiting slice capacity. It must fix issues with slice modification, in case of append new slice will be allocated, instead of modifying refrenced slice https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402 * Reduce memory allocations when output_relabel_configs adds new labels to output samples --------- Co-authored-by: Aliaksandr Valialkin (cherry picked from commit 41f7940f973404a7b93f446f2f9055842d72c103) --- docs/CHANGELOG.md | 1 + lib/promrelabel/relabel.go | 6 +++++- lib/streamaggr/streamaggr.go | 36 +++++++++++++++++++------------ lib/streamaggr/streamaggr_test.go | 29 +++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 15 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b32eabd4d6..e3969e1fe8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -37,6 +37,7 @@ The sandbox cluster installation is running under the constant load generated by * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): prevent from `FATAL: cannot flush metainfo` panic when [`-remoteWrite.multitenantURL`](https://docs.victoriametrics.com/vmagent.html#multitenancy) command-line flag is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5357). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly add new labels at `output_relabel_configs` during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html). Previously this could lead to corrupted labels in output samples. Thanks to @ChengChung for providing [detailed report for this bug](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402). ## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1) diff --git a/lib/promrelabel/relabel.go b/lib/promrelabel/relabel.go index e2d5d68da1..f1e0a6ce75 100644 --- a/lib/promrelabel/relabel.go +++ b/lib/promrelabel/relabel.go @@ -70,11 +70,15 @@ func (prc *parsedRelabelConfig) String() string { // // It returns DebugStep list - one entry per each applied relabeling step. func (pcs *ParsedConfigs) ApplyDebug(labels []prompbmarshal.Label) ([]prompbmarshal.Label, []DebugStep) { - labels, dss := pcs.applyInternal(labels, 0, true) + // Protect from overwriting labels between len(labels) and cap(labels) by limiting labels capacity to its length. + labels, dss := pcs.applyInternal(labels[:len(labels):len(labels)], 0, true) return labels, dss } // Apply applies pcs to labels starting from the labelsOffset. +// +// Apply() may add additional labels after the len(labels), so make sure it doesn't corrupt in-use labels +// stored between len(labels) and cap(labels). func (pcs *ParsedConfigs) Apply(labels []prompbmarshal.Label, labelsOffset int) []prompbmarshal.Label { labels, _ = pcs.applyInternal(labels, labelsOffset, false) return labels diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 705709dc7f..c3d7bacddb 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -471,22 +471,30 @@ func (a *aggregator) flush() { tss := ctx.tss - // Apply output relabeling - if a.outputRelabeling != nil { - dst := tss[:0] - for _, ts := range tss { - ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0) - if len(ts.Labels) == 0 { - // The metric has been deleted by the relabeling - continue - } - dst = append(dst, ts) - } - tss = dst + if a.outputRelabeling == nil { + // Fast path - push the output metrics. + a.pushFunc(tss) + continue } - // Push the output metrics. - a.pushFunc(tss) + // Slower path - apply output relabeling and then push the output metrics. + auxLabels := promutils.GetLabels() + dstLabels := auxLabels.Labels[:0] + dst := tss[:0] + for _, ts := range tss { + dstLabelsLen := len(dstLabels) + dstLabels = append(dstLabels, ts.Labels...) + dstLabels = a.outputRelabeling.Apply(dstLabels, dstLabelsLen) + if len(dstLabels) == dstLabelsLen { + // The metric has been deleted by the relabeling + continue + } + ts.Labels = dstLabels[dstLabelsLen:] + dst = append(dst, ts) + } + a.pushFunc(dst) + auxLabels.Labels = dstLabels + promutils.PutLabels(auxLabels) } } diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 91ced47e1c..33924c8e21 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -686,6 +686,35 @@ cpu_usage{cpu="2"} 90 cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3 cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 `, "1111111") + + // append additional label + f(` +- interval: 1m + without: [abc] + outputs: [count_samples, sum_samples, count_series] + output_relabel_configs: + - action: replace_all + source_labels: [__name__] + regex: ":|_" + replacement: "-" + target_label: __name__ + - action: drop + source_labels: [de] + regex: fg + - target_label: new_label + replacement: must_keep_metric_name +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1 +bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1 +bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5 +foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2 +foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1 +foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5 +`, "1111") } func TestAggregatorsWithDedupInterval(t *testing.T) {