diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 146930f8d..b79021c64 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -87,8 +87,8 @@ func initLabelsGlobal() { } } -func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries { - if len(extraLabels) == 0 && pcs.Len() == 0 && !*usePromCompatibleNaming { +func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries { + if pcs.Len() == 0 && !*usePromCompatibleNaming { // Nothing to change. return tss } @@ -98,16 +98,6 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLab ts := &tss[i] labelsLen := len(labels) labels = append(labels, ts.Labels...) - // extraLabels must be added before applying relabeling according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write - for j := range extraLabels { - extraLabel := &extraLabels[j] - tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name) - if tmp != nil { - tmp.Value = extraLabel.Value - } else { - labels = append(labels, *extraLabel) - } - } if *usePromCompatibleNaming { // Replace unsupported Prometheus chars in label names and metric names with underscores. tmpLabels := labels[labelsLen:] @@ -135,6 +125,38 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLab return tssDst } +func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) []prompbmarshal.TimeSeries { + if len(extraLabels) == 0 { + return tss + } + tssDst := tss[:0] + labels := rctx.labels[:0] + for i := range tss { + ts := &tss[i] + labelsLen := len(labels) + labels = append(labels, ts.Labels...) + for j := range extraLabels { + extraLabel := extraLabels[j] + if *usePromCompatibleNaming { + extraLabel.Name = promrelabel.SanitizeLabelName(extraLabel.Name) + } + tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name) + if tmp != nil { + tmp.Value = extraLabel.Value + } else { + labels = append(labels, extraLabel) + } + } + labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:]) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: ts.Samples, + }) + } + rctx.labels = labels + return tssDst +} + type relabelCtx struct { // pool for labels, which are used during the relabeling. labels []prompbmarshal.Label diff --git a/app/vmagent/remotewrite/relabel_test.go b/app/vmagent/remotewrite/relabel_test.go index eb3540002..441283f23 100644 --- a/app/vmagent/remotewrite/relabel_test.go +++ b/app/vmagent/remotewrite/relabel_test.go @@ -10,18 +10,16 @@ import ( ) func TestApplyRelabeling(t *testing.T) { - f := func(extraLabels []prompbmarshal.Label, pcs *promrelabel.ParsedConfigs, sTss, sExpTss string) { + f := func(pcs *promrelabel.ParsedConfigs, sTss, sExpTss string) { rctx := &relabelCtx{} tss, expTss := parseSeries(sTss), parseSeries(sExpTss) - gotTss := rctx.applyRelabeling(tss, extraLabels, pcs) + gotTss := rctx.applyRelabeling(tss, pcs) if !reflect.DeepEqual(gotTss, expTss) { t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss) } } - f(nil, nil, "up", "up") - f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, nil, "up", `up{foo="bar"}`) - f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, nil, `up{foo="baz"}`, `up{foo="bar"}`) + f(nil, "up", "up") pcs, err := promrelabel.ParseRelabelConfigsData([]byte(` - target_label: "foo" @@ -32,11 +30,32 @@ func TestApplyRelabeling(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %s", err) } - f(nil, pcs, `up{foo="baz", env="prod"}`, `up{foo="aaa"}`) + f(pcs, `up{foo="baz", env="prod"}`, `up{foo="aaa"}`) oldVal := *usePromCompatibleNaming *usePromCompatibleNaming = true - f(nil, nil, `foo.bar`, `foo_bar`) + f(nil, `foo.bar`, `foo_bar`) + *usePromCompatibleNaming = oldVal +} + +func TestAppendExtraLabels(t *testing.T) { + f := func(extraLabels []prompbmarshal.Label, sTss, sExpTss string) { + rctx := &relabelCtx{} + tss, expTss := parseSeries(sTss), parseSeries(sExpTss) + gotTss := rctx.appendExtraLabels(tss, extraLabels) + if !reflect.DeepEqual(gotTss, expTss) { + t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss) + } + } + + f(nil, "up", "up") + f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, "up", `up{foo="bar"}`) + f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, `up{foo="baz"}`, `up{foo="bar"}`) + f([]prompbmarshal.Label{{Name: "baz", Value: "qux"}}, `up{foo="baz"}`, `up{foo="baz",baz="qux"}`) + + oldVal := *usePromCompatibleNaming + *usePromCompatibleNaming = true + f([]prompbmarshal.Label{{Name: "foo.bar", Value: "baz"}}, "up", `up{foo_bar="baz"}`) *usePromCompatibleNaming = oldVal } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 29099e114..ac5681aa4 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -386,7 +386,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { } if rctx != nil { rowsCountBeforeRelabel := getRowsCount(tssBlock) - tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, pcsGlobal) + tssBlock = rctx.applyRelabeling(tssBlock, pcsGlobal) rowsCountAfterRelabel := getRowsCount(tssBlock) rowsDroppedByGlobalRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel) } @@ -668,7 +668,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { v = tssPool.Get().(*[]prompbmarshal.TimeSeries) tss = append(*v, tss...) rowsCountBeforeRelabel := getRowsCount(tss) - tss = rctx.applyRelabeling(tss, nil, pcs) + tss = rctx.applyRelabeling(tss, pcs) rowsCountAfterRelabel := getRowsCount(tss) rwctx.rowsDroppedByRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel) } @@ -719,6 +719,11 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop } func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { + if len(labelsGlobal) > 0 { + rctx := getRelabelCtx() + tss = rctx.appendExtraLabels(tss, labelsGlobal) + putRelabelCtx(rctx) + } pss := rwctx.pss idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) pss[idx].Push(tss) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 55dc470d3..ae0934ab6 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -25,6 +25,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components ## v1.93.x long-time support release (LTS) * BUGFIX: do not allow starting VictoriaMetrics components with improperly set boolean command-line flags in the form `-boolFlagName value`, since this leads to silent incomplete flags' parsing. This form should be replaced with `-boolFlagName=value`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4845). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): according to [the docs](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics) labels from `-remoteWrite.label` cmd-line flag are now added to the sent metrics just before they are pushed to the `-remoteWrite.url` (after all relabeling, including stream aggregation relabeling). In addition, it allows adding labels for identifying vmagent instances when using streaming aggregation in vmagents [cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247) and [these docs](https://docs.victoriametrics.com/stream-aggregation.html#cluster-mode) for more details. * BUGFIX: remove `DEBUG` logging when parsing `if` filters inside [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) and when parsing `match` filters inside [stream aggregation rules](https://docs.victoriametrics.com/stream-aggregation.html). * BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071). * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837). diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 9be338bac..eb01c9f64 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -1,7 +1,7 @@ --- sort: 98 weight: 98 -title: streaming aggregation +title: Streaming aggregation menu: docs: parent: "victoriametrics" @@ -10,7 +10,7 @@ aliases: - /stream-aggregation.html --- -# streaming aggregation +# Streaming aggregation [vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage. @@ -673,3 +673,13 @@ support the following approaches for hot reloading stream aggregation configs fr ``` * By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload). + +## Cluster mode + +If you use [vmagent in cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) for streaming aggregation +(with `-promscrape.cluster.*` parameters or with `VMAgent.spec.shardCount > 1` for [vmoperator](https://docs.victoriametrics.com/operator)) +then be careful when aggregating metrics via `by`, `without` or modifying via`*_relabel_configs` parameters, as incorrect usage may result in duplicates and data collision. For example, if more than one vmagent calculates `increase` for metric `http_requests_total` with `by: [path]` directive, then the resulting time series written to the remote database will be indistinguishable, as there is no way to tell to which `instance` the aggregation belongs. The proper fix would be to add a unique aggregation dimension: `by: [instance, path]`. With this change, aggregates between `instances` won't collide. + +If adding a new aggregation dimension isn't feasible (due to cardinality reduction purposes), then it is worth at least differentiating by which vmagent the aggregation was performed. You can do it with `remoteWrite.label` [parameter in vmagent](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics). +For example, for running in docker or k8s you can use `remoteWrite.label` with `POD_NAME` or `HOSTNAME` environment variable: `remoteWrite.label='vmagent=%{HOSTNAME}'`. +See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247#issue-1692894073) for details.