fixed applying remoteWrite.label for pushed metrics (#4247) (#4824)

vmagent: properly add extra labels before sending data to remote storage

labels from `remoteWrite.label` are now added to sent metrics just before they
 are pushed to `remoteWrite.url` after all relabelings, including stream aggregation relabelings (#4247)

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247

Signed-off-by: Alexander Marshalov <_@marshalov.org>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
This commit is contained in:
Alexander Marshalov 2023-08-15 13:47:48 +02:00 committed by Aliaksandr Valialkin
parent cdf2eaf688
commit e5e50504db
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 80 additions and 23 deletions

View file

@ -87,8 +87,8 @@ func initLabelsGlobal() {
}
}
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries {
if len(extraLabels) == 0 && pcs.Len() == 0 && !*usePromCompatibleNaming {
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries {
if pcs.Len() == 0 && !*usePromCompatibleNaming {
// Nothing to change.
return tss
}
@ -98,16 +98,6 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLab
ts := &tss[i]
labelsLen := len(labels)
labels = append(labels, ts.Labels...)
// extraLabels must be added before applying relabeling according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
for j := range extraLabels {
extraLabel := &extraLabels[j]
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
if tmp != nil {
tmp.Value = extraLabel.Value
} else {
labels = append(labels, *extraLabel)
}
}
if *usePromCompatibleNaming {
// Replace unsupported Prometheus chars in label names and metric names with underscores.
tmpLabels := labels[labelsLen:]
@ -135,6 +125,38 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLab
return tssDst
}
func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) []prompbmarshal.TimeSeries {
if len(extraLabels) == 0 {
return tss
}
tssDst := tss[:0]
labels := rctx.labels[:0]
for i := range tss {
ts := &tss[i]
labelsLen := len(labels)
labels = append(labels, ts.Labels...)
for j := range extraLabels {
extraLabel := extraLabels[j]
if *usePromCompatibleNaming {
extraLabel.Name = promrelabel.SanitizeLabelName(extraLabel.Name)
}
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
if tmp != nil {
tmp.Value = extraLabel.Value
} else {
labels = append(labels, extraLabel)
}
}
labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:])
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: ts.Samples,
})
}
rctx.labels = labels
return tssDst
}
type relabelCtx struct {
// pool for labels, which are used during the relabeling.
labels []prompbmarshal.Label

View file

@ -10,18 +10,16 @@ import (
)
func TestApplyRelabeling(t *testing.T) {
f := func(extraLabels []prompbmarshal.Label, pcs *promrelabel.ParsedConfigs, sTss, sExpTss string) {
f := func(pcs *promrelabel.ParsedConfigs, sTss, sExpTss string) {
rctx := &relabelCtx{}
tss, expTss := parseSeries(sTss), parseSeries(sExpTss)
gotTss := rctx.applyRelabeling(tss, extraLabels, pcs)
gotTss := rctx.applyRelabeling(tss, pcs)
if !reflect.DeepEqual(gotTss, expTss) {
t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss)
}
}
f(nil, nil, "up", "up")
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, nil, "up", `up{foo="bar"}`)
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, nil, `up{foo="baz"}`, `up{foo="bar"}`)
f(nil, "up", "up")
pcs, err := promrelabel.ParseRelabelConfigsData([]byte(`
- target_label: "foo"
@ -32,11 +30,32 @@ func TestApplyRelabeling(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
f(nil, pcs, `up{foo="baz", env="prod"}`, `up{foo="aaa"}`)
f(pcs, `up{foo="baz", env="prod"}`, `up{foo="aaa"}`)
oldVal := *usePromCompatibleNaming
*usePromCompatibleNaming = true
f(nil, nil, `foo.bar`, `foo_bar`)
f(nil, `foo.bar`, `foo_bar`)
*usePromCompatibleNaming = oldVal
}
func TestAppendExtraLabels(t *testing.T) {
f := func(extraLabels []prompbmarshal.Label, sTss, sExpTss string) {
rctx := &relabelCtx{}
tss, expTss := parseSeries(sTss), parseSeries(sExpTss)
gotTss := rctx.appendExtraLabels(tss, extraLabels)
if !reflect.DeepEqual(gotTss, expTss) {
t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss)
}
}
f(nil, "up", "up")
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, "up", `up{foo="bar"}`)
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, `up{foo="baz"}`, `up{foo="bar"}`)
f([]prompbmarshal.Label{{Name: "baz", Value: "qux"}}, `up{foo="baz"}`, `up{foo="baz",baz="qux"}`)
oldVal := *usePromCompatibleNaming
*usePromCompatibleNaming = true
f([]prompbmarshal.Label{{Name: "foo.bar", Value: "baz"}}, "up", `up{foo_bar="baz"}`)
*usePromCompatibleNaming = oldVal
}

View file

@ -386,7 +386,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
}
if rctx != nil {
rowsCountBeforeRelabel := getRowsCount(tssBlock)
tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, pcsGlobal)
tssBlock = rctx.applyRelabeling(tssBlock, pcsGlobal)
rowsCountAfterRelabel := getRowsCount(tssBlock)
rowsDroppedByGlobalRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
}
@ -668,7 +668,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
tss = append(*v, tss...)
rowsCountBeforeRelabel := getRowsCount(tss)
tss = rctx.applyRelabeling(tss, nil, pcs)
tss = rctx.applyRelabeling(tss, pcs)
rowsCountAfterRelabel := getRowsCount(tss)
rwctx.rowsDroppedByRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
}
@ -719,6 +719,11 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop
}
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
if len(labelsGlobal) > 0 {
rctx := getRelabelCtx()
tss = rctx.appendExtraLabels(tss, labelsGlobal)
putRelabelCtx(rctx)
}
pss := rwctx.pss
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
pss[idx].Push(tss)

View file

@ -25,6 +25,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
## v1.93.x long-time support release (LTS)
* BUGFIX: do not allow starting VictoriaMetrics components with improperly set boolean command-line flags in the form `-boolFlagName value`, since this leads to silent incomplete flags' parsing. This form should be replaced with `-boolFlagName=value`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4845).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): according to [the docs](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics) labels from `-remoteWrite.label` cmd-line flag are now added to the sent metrics just before they are pushed to the `-remoteWrite.url` (after all relabeling, including stream aggregation relabeling). In addition, it allows adding labels for identifying vmagent instances when using streaming aggregation in vmagents [cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247) and [these docs](https://docs.victoriametrics.com/stream-aggregation.html#cluster-mode) for more details.
* BUGFIX: remove `DEBUG` logging when parsing `if` filters inside [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) and when parsing `match` filters inside [stream aggregation rules](https://docs.victoriametrics.com/stream-aggregation.html).
* BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837).

View file

@ -1,7 +1,7 @@
---
sort: 98
weight: 98
title: streaming aggregation
title: Streaming aggregation
menu:
docs:
parent: "victoriametrics"
@ -10,7 +10,7 @@ aliases:
- /stream-aggregation.html
---
# streaming aggregation
# Streaming aggregation
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage.
@ -673,3 +673,13 @@ support the following approaches for hot reloading stream aggregation configs fr
```
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload).
## Cluster mode
If you use [vmagent in cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) for streaming aggregation
(with `-promscrape.cluster.*` parameters or with `VMAgent.spec.shardCount > 1` for [vmoperator](https://docs.victoriametrics.com/operator))
then be careful when aggregating metrics via `by`, `without` or modifying via`*_relabel_configs` parameters, as incorrect usage may result in duplicates and data collision. For example, if more than one vmagent calculates `increase` for metric `http_requests_total` with `by: [path]` directive, then the resulting time series written to the remote database will be indistinguishable, as there is no way to tell to which `instance` the aggregation belongs. The proper fix would be to add a unique aggregation dimension: `by: [instance, path]`. With this change, aggregates between `instances` won't collide.
If adding a new aggregation dimension isn't feasible (due to cardinality reduction purposes), then it is worth at least differentiating by which vmagent the aggregation was performed. You can do it with `remoteWrite.label` [parameter in vmagent](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics).
For example, for running in docker or k8s you can use `remoteWrite.label` with `POD_NAME` or `HOSTNAME` environment variable: `remoteWrite.label='vmagent=%{HOSTNAME}'`.
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247#issue-1692894073) for details.