mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/streamaggr: properly reference slice with labels (#5406)
* lib/streamaggr: properly reference slice with labels by limiting slice capacity. It must fix issues with slice modification, in case of append new slice will be allocated, instead of modifying refrenced slice https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402 * Reduce memory allocations when output_relabel_configs adds new labels to output samples --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
7ca783dee9
commit
41f7940f97
4 changed files with 57 additions and 15 deletions
|
@ -37,6 +37,7 @@ The sandbox cluster installation is running under the constant load generated by
|
|||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): prevent from `FATAL: cannot flush metainfo` panic when [`-remoteWrite.multitenantURL`](https://docs.victoriametrics.com/vmagent.html#multitenancy) command-line flag is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5357).
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly add new labels at `output_relabel_configs` during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html). Previously this could lead to corrupted labels in output samples. Thanks to @ChengChung for providing [detailed report for this bug](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5402).
|
||||
|
||||
|
||||
## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1)
|
||||
|
|
|
@ -70,11 +70,15 @@ func (prc *parsedRelabelConfig) String() string {
|
|||
//
|
||||
// It returns DebugStep list - one entry per each applied relabeling step.
|
||||
func (pcs *ParsedConfigs) ApplyDebug(labels []prompbmarshal.Label) ([]prompbmarshal.Label, []DebugStep) {
|
||||
labels, dss := pcs.applyInternal(labels, 0, true)
|
||||
// Protect from overwriting labels between len(labels) and cap(labels) by limiting labels capacity to its length.
|
||||
labels, dss := pcs.applyInternal(labels[:len(labels):len(labels)], 0, true)
|
||||
return labels, dss
|
||||
}
|
||||
|
||||
// Apply applies pcs to labels starting from the labelsOffset.
|
||||
//
|
||||
// Apply() may add additional labels after the len(labels), so make sure it doesn't corrupt in-use labels
|
||||
// stored between len(labels) and cap(labels).
|
||||
func (pcs *ParsedConfigs) Apply(labels []prompbmarshal.Label, labelsOffset int) []prompbmarshal.Label {
|
||||
labels, _ = pcs.applyInternal(labels, labelsOffset, false)
|
||||
return labels
|
||||
|
|
|
@ -471,22 +471,30 @@ func (a *aggregator) flush() {
|
|||
|
||||
tss := ctx.tss
|
||||
|
||||
// Apply output relabeling
|
||||
if a.outputRelabeling != nil {
|
||||
dst := tss[:0]
|
||||
for _, ts := range tss {
|
||||
ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0)
|
||||
if len(ts.Labels) == 0 {
|
||||
// The metric has been deleted by the relabeling
|
||||
continue
|
||||
}
|
||||
dst = append(dst, ts)
|
||||
}
|
||||
tss = dst
|
||||
if a.outputRelabeling == nil {
|
||||
// Fast path - push the output metrics.
|
||||
a.pushFunc(tss)
|
||||
continue
|
||||
}
|
||||
|
||||
// Push the output metrics.
|
||||
a.pushFunc(tss)
|
||||
// Slower path - apply output relabeling and then push the output metrics.
|
||||
auxLabels := promutils.GetLabels()
|
||||
dstLabels := auxLabels.Labels[:0]
|
||||
dst := tss[:0]
|
||||
for _, ts := range tss {
|
||||
dstLabelsLen := len(dstLabels)
|
||||
dstLabels = append(dstLabels, ts.Labels...)
|
||||
dstLabels = a.outputRelabeling.Apply(dstLabels, dstLabelsLen)
|
||||
if len(dstLabels) == dstLabelsLen {
|
||||
// The metric has been deleted by the relabeling
|
||||
continue
|
||||
}
|
||||
ts.Labels = dstLabels[dstLabelsLen:]
|
||||
dst = append(dst, ts)
|
||||
}
|
||||
a.pushFunc(dst)
|
||||
auxLabels.Labels = dstLabels
|
||||
promutils.PutLabels(auxLabels)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -686,6 +686,35 @@ cpu_usage{cpu="2"} 90
|
|||
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
|
||||
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
|
||||
`, "1111111")
|
||||
|
||||
// append additional label
|
||||
f(`
|
||||
- interval: 1m
|
||||
without: [abc]
|
||||
outputs: [count_samples, sum_samples, count_series]
|
||||
output_relabel_configs:
|
||||
- action: replace_all
|
||||
source_labels: [__name__]
|
||||
regex: ":|_"
|
||||
replacement: "-"
|
||||
target_label: __name__
|
||||
- action: drop
|
||||
source_labels: [de]
|
||||
regex: fg
|
||||
- target_label: new_label
|
||||
replacement: must_keep_metric_name
|
||||
`, `
|
||||
foo{abc="123"} 4
|
||||
bar 5
|
||||
foo{abc="123"} 8.5
|
||||
foo{abc="456",de="fg"} 8
|
||||
`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1
|
||||
bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
||||
bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5
|
||||
foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2
|
||||
foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
||||
foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
|
||||
`, "1111")
|
||||
}
|
||||
|
||||
func TestAggregatorsWithDedupInterval(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue