lib/streamaggr: properly reference slice with labels (#5406)

* lib/streamaggr: properly reference slice with labels
by limiting slice capacity. It must fix issues with slice modification, in case of append new slice will be allocated, instead of modifying refrenced slice
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402

* Reduce memory allocations when output_relabel_configs adds new labels to output samples

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
(cherry picked from commit 41f7940f97)
This commit is contained in:
Nikolay 2023-11-29 09:03:04 +01:00 committed by hagen1778
parent 2a0717c8db
commit 9505d48070
No known key found for this signature in database
GPG key ID: 3BF75F3741CA9640
4 changed files with 57 additions and 15 deletions

View file

@ -37,6 +37,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): prevent from `FATAL: cannot flush metainfo` panic when [`-remoteWrite.multitenantURL`](https://docs.victoriametrics.com/vmagent.html#multitenancy) command-line flag is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5357).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly add new labels at `output_relabel_configs` during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html). Previously this could lead to corrupted labels in output samples. Thanks to @ChengChung for providing [detailed report for this bug](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402).
## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1)

View file

@ -70,11 +70,15 @@ func (prc *parsedRelabelConfig) String() string {
//
// It returns DebugStep list - one entry per each applied relabeling step.
func (pcs *ParsedConfigs) ApplyDebug(labels []prompbmarshal.Label) ([]prompbmarshal.Label, []DebugStep) {
labels, dss := pcs.applyInternal(labels, 0, true)
// Protect from overwriting labels between len(labels) and cap(labels) by limiting labels capacity to its length.
labels, dss := pcs.applyInternal(labels[:len(labels):len(labels)], 0, true)
return labels, dss
}
// Apply applies pcs to labels starting from the labelsOffset.
//
// Apply() may add additional labels after the len(labels), so make sure it doesn't corrupt in-use labels
// stored between len(labels) and cap(labels).
func (pcs *ParsedConfigs) Apply(labels []prompbmarshal.Label, labelsOffset int) []prompbmarshal.Label {
labels, _ = pcs.applyInternal(labels, labelsOffset, false)
return labels

View file

@ -471,22 +471,30 @@ func (a *aggregator) flush() {
tss := ctx.tss
// Apply output relabeling
if a.outputRelabeling != nil {
dst := tss[:0]
for _, ts := range tss {
ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0)
if len(ts.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
dst = append(dst, ts)
}
tss = dst
if a.outputRelabeling == nil {
// Fast path - push the output metrics.
a.pushFunc(tss)
continue
}
// Push the output metrics.
a.pushFunc(tss)
// Slower path - apply output relabeling and then push the output metrics.
auxLabels := promutils.GetLabels()
dstLabels := auxLabels.Labels[:0]
dst := tss[:0]
for _, ts := range tss {
dstLabelsLen := len(dstLabels)
dstLabels = append(dstLabels, ts.Labels...)
dstLabels = a.outputRelabeling.Apply(dstLabels, dstLabelsLen)
if len(dstLabels) == dstLabelsLen {
// The metric has been deleted by the relabeling
continue
}
ts.Labels = dstLabels[dstLabelsLen:]
dst = append(dst, ts)
}
a.pushFunc(dst)
auxLabels.Labels = dstLabels
promutils.PutLabels(auxLabels)
}
}

View file

@ -686,6 +686,35 @@ cpu_usage{cpu="2"} 90
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`, "1111111")
// append additional label
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
output_relabel_configs:
- action: replace_all
source_labels: [__name__]
regex: ":|_"
replacement: "-"
target_label: __name__
- action: drop
source_labels: [de]
regex: fg
- target_label: new_label
replacement: must_keep_metric_name
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1
bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5
foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2
foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
`, "1111")
}
func TestAggregatorsWithDedupInterval(t *testing.T) {