app/vmagent/remotewrite: follow-up after a27c2f3773

- Fix Prometheus-compatible naming after applying the relabeling if -usePromCompatibleNaming command-line flag is set.
  This should prevent from possible Prometheus-incompatible metric names and label names generated by the relabeling.
- Do not return anything from relabelCtx.appendExtraLabels() function, since it cannot change the number of time series
  passed to it. Append labels for the passed time series in-place.
- Remove promrelabel.FinalizeLabels() call after adding extra labels to time series, since this call has been already
  made at relabelCtx.applyRelabeling(). It is user's responsibility if he passes labels with double underscore prefixes
  to -remoteWrite.label.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247
This commit is contained in:
Aliaksandr Valialkin 2023-08-17 14:35:26 +02:00
parent 67dd975be5
commit 50820ab1aa
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 23 additions and 26 deletions

View file

@ -98,24 +98,15 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, pcs *pro
ts := &tss[i]
labelsLen := len(labels)
labels = append(labels, ts.Labels...)
if *usePromCompatibleNaming {
// Replace unsupported Prometheus chars in label names and metric names with underscores.
tmpLabels := labels[labelsLen:]
for j := range tmpLabels {
label := &tmpLabels[j]
if label.Name == "__name__" {
label.Value = promrelabel.SanitizeMetricName(label.Value)
} else {
label.Name = promrelabel.SanitizeLabelName(label.Name)
}
}
}
labels = pcs.Apply(labels, labelsLen)
labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:])
if len(labels) == labelsLen {
// Drop the current time series, since relabeling removed all the labels.
continue
}
if *usePromCompatibleNaming {
fixPromCompatibleNaming(labels[labelsLen:])
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: ts.Samples,
@ -125,11 +116,10 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, pcs *pro
return tssDst
}
func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) []prompbmarshal.TimeSeries {
func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) {
if len(extraLabels) == 0 {
return tss
return
}
tssDst := tss[:0]
labels := rctx.labels[:0]
for i := range tss {
ts := &tss[i]
@ -147,14 +137,9 @@ func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraL
labels = append(labels, extraLabel)
}
}
labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:])
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: ts.Samples,
})
ts.Labels = labels[labelsLen:]
}
rctx.labels = labels
return tssDst
}
type relabelCtx struct {
@ -181,3 +166,15 @@ func putRelabelCtx(rctx *relabelCtx) {
rctx.labels = rctx.labels[:0]
relabelCtxPool.Put(rctx)
}
func fixPromCompatibleNaming(labels []prompbmarshal.Label) {
// Replace unsupported Prometheus chars in label names and metric names with underscores.
for i := range labels {
label := &labels[i]
if label.Name == "__name__" {
label.Value = promrelabel.SanitizeMetricName(label.Value)
} else {
label.Name = promrelabel.SanitizeLabelName(label.Name)
}
}
}

View file

@ -42,9 +42,9 @@ func TestAppendExtraLabels(t *testing.T) {
f := func(extraLabels []prompbmarshal.Label, sTss, sExpTss string) {
rctx := &relabelCtx{}
tss, expTss := parseSeries(sTss), parseSeries(sExpTss)
gotTss := rctx.appendExtraLabels(tss, extraLabels)
if !reflect.DeepEqual(gotTss, expTss) {
t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss)
rctx.appendExtraLabels(tss, extraLabels)
if !reflect.DeepEqual(tss, expTss) {
t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, tss)
}
}

View file

@ -355,7 +355,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
var rctx *relabelCtx
rcs := allRelabelConfigs.Load()
pcsGlobal := rcs.global
if pcsGlobal.Len() > 0 || len(labelsGlobal) > 0 {
if pcsGlobal.Len() > 0 {
rctx = getRelabelCtx()
}
tss := wr.Timeseries
@ -722,7 +722,7 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
if len(labelsGlobal) > 0 {
rctx := getRelabelCtx()
defer putRelabelCtx(rctx)
tss = rctx.appendExtraLabels(tss, labelsGlobal)
rctx.appendExtraLabels(tss, labelsGlobal)
}
pss := rwctx.pss