diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b32eabd4d6..e3969e1fe8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -37,6 +37,7 @@ The sandbox cluster installation is running under the constant load generated by * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): prevent from `FATAL: cannot flush metainfo` panic when [`-remoteWrite.multitenantURL`](https://docs.victoriametrics.com/vmagent.html#multitenancy) command-line flag is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5357). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly add new labels at `output_relabel_configs` during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html). Previously this could lead to corrupted labels in output samples. Thanks to @ChengChung for providing [detailed report for this bug](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402). ## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1) diff --git a/lib/promrelabel/relabel.go b/lib/promrelabel/relabel.go index e2d5d68da1..f1e0a6ce75 100644 --- a/lib/promrelabel/relabel.go +++ b/lib/promrelabel/relabel.go @@ -70,11 +70,15 @@ func (prc *parsedRelabelConfig) String() string { // // It returns DebugStep list - one entry per each applied relabeling step. func (pcs *ParsedConfigs) ApplyDebug(labels []prompbmarshal.Label) ([]prompbmarshal.Label, []DebugStep) { - labels, dss := pcs.applyInternal(labels, 0, true) + // Protect from overwriting labels between len(labels) and cap(labels) by limiting labels capacity to its length. + labels, dss := pcs.applyInternal(labels[:len(labels):len(labels)], 0, true) return labels, dss } // Apply applies pcs to labels starting from the labelsOffset. +// +// Apply() may add additional labels after the len(labels), so make sure it doesn't corrupt in-use labels +// stored between len(labels) and cap(labels). func (pcs *ParsedConfigs) Apply(labels []prompbmarshal.Label, labelsOffset int) []prompbmarshal.Label { labels, _ = pcs.applyInternal(labels, labelsOffset, false) return labels diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 705709dc7f..c3d7bacddb 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -471,22 +471,30 @@ func (a *aggregator) flush() { tss := ctx.tss - // Apply output relabeling - if a.outputRelabeling != nil { - dst := tss[:0] - for _, ts := range tss { - ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0) - if len(ts.Labels) == 0 { - // The metric has been deleted by the relabeling - continue - } - dst = append(dst, ts) - } - tss = dst + if a.outputRelabeling == nil { + // Fast path - push the output metrics. + a.pushFunc(tss) + continue } - // Push the output metrics. - a.pushFunc(tss) + // Slower path - apply output relabeling and then push the output metrics. + auxLabels := promutils.GetLabels() + dstLabels := auxLabels.Labels[:0] + dst := tss[:0] + for _, ts := range tss { + dstLabelsLen := len(dstLabels) + dstLabels = append(dstLabels, ts.Labels...) + dstLabels = a.outputRelabeling.Apply(dstLabels, dstLabelsLen) + if len(dstLabels) == dstLabelsLen { + // The metric has been deleted by the relabeling + continue + } + ts.Labels = dstLabels[dstLabelsLen:] + dst = append(dst, ts) + } + a.pushFunc(dst) + auxLabels.Labels = dstLabels + promutils.PutLabels(auxLabels) } } diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 91ced47e1c..33924c8e21 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -686,6 +686,35 @@ cpu_usage{cpu="2"} 90 cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3 cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 `, "1111111") + + // append additional label + f(` +- interval: 1m + without: [abc] + outputs: [count_samples, sum_samples, count_series] + output_relabel_configs: + - action: replace_all + source_labels: [__name__] + regex: ":|_" + replacement: "-" + target_label: __name__ + - action: drop + source_labels: [de] + regex: fg + - target_label: new_label + replacement: must_keep_metric_name +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1 +bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1 +bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5 +foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2 +foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1 +foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5 +`, "1111") } func TestAggregatorsWithDedupInterval(t *testing.T) {