mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
a27c2f3773
vmagent: properly add extra labels before sending data to remote storage labels from `remoteWrite.label` are now added to sent metrics just before they are pushed to `remoteWrite.url` after all relabelings, including stream aggregation relabelings (#4247) https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247 Signed-off-by: Alexander Marshalov <_@marshalov.org> Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
183 lines
6.1 KiB
Go
183 lines
6.1 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
)
|
|
|
|
var (
|
|
unparsedLabelsGlobal = flagutil.NewArrayString("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
|
|
"Pass multiple -remoteWrite.label flags in order to add multiple labels to metrics before sending them to remote storage")
|
|
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabeling configs, which are applied "+
|
|
"to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. "+
|
|
"The path can point either to local file or to http url. "+
|
|
"See https://docs.victoriametrics.com/vmagent.html#relabeling")
|
|
relabelConfigPaths = flagutil.NewArrayString("remoteWrite.urlRelabelConfig", "Optional path to relabel configs for the corresponding -remoteWrite.url. "+
|
|
"See also -remoteWrite.relabelConfig. The path can point either to local file or to http url. "+
|
|
"See https://docs.victoriametrics.com/vmagent.html#relabeling")
|
|
|
|
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
|
|
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
|
|
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
|
|
)
|
|
|
|
var labelsGlobal []prompbmarshal.Label
|
|
|
|
// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig.
|
|
func CheckRelabelConfigs() error {
|
|
_, err := loadRelabelConfigs()
|
|
return err
|
|
}
|
|
|
|
func loadRelabelConfigs() (*relabelConfigs, error) {
|
|
var rcs relabelConfigs
|
|
if *relabelConfigPathGlobal != "" {
|
|
global, err := promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load -remoteWrite.relabelConfig=%q: %w", *relabelConfigPathGlobal, err)
|
|
}
|
|
rcs.global = global
|
|
}
|
|
if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) {
|
|
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url or -remoteWrite.multitenantURL args: %d",
|
|
len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
|
}
|
|
rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
|
for i, path := range *relabelConfigPaths {
|
|
if len(path) == 0 {
|
|
// Skip empty relabel config.
|
|
continue
|
|
}
|
|
prc, err := promrelabel.LoadRelabelConfigs(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %w", path, err)
|
|
}
|
|
rcs.perURL[i] = prc
|
|
}
|
|
return &rcs, nil
|
|
}
|
|
|
|
type relabelConfigs struct {
|
|
global *promrelabel.ParsedConfigs
|
|
perURL []*promrelabel.ParsedConfigs
|
|
}
|
|
|
|
// initLabelsGlobal must be called after parsing command-line flags.
|
|
func initLabelsGlobal() {
|
|
labelsGlobal = nil
|
|
for _, s := range *unparsedLabelsGlobal {
|
|
if len(s) == 0 {
|
|
continue
|
|
}
|
|
n := strings.IndexByte(s, '=')
|
|
if n < 0 {
|
|
logger.Fatalf("missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s)
|
|
}
|
|
labelsGlobal = append(labelsGlobal, prompbmarshal.Label{
|
|
Name: s[:n],
|
|
Value: s[n+1:],
|
|
})
|
|
}
|
|
}
|
|
|
|
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries {
|
|
if pcs.Len() == 0 && !*usePromCompatibleNaming {
|
|
// Nothing to change.
|
|
return tss
|
|
}
|
|
tssDst := tss[:0]
|
|
labels := rctx.labels[:0]
|
|
for i := range tss {
|
|
ts := &tss[i]
|
|
labelsLen := len(labels)
|
|
labels = append(labels, ts.Labels...)
|
|
if *usePromCompatibleNaming {
|
|
// Replace unsupported Prometheus chars in label names and metric names with underscores.
|
|
tmpLabels := labels[labelsLen:]
|
|
for j := range tmpLabels {
|
|
label := &tmpLabels[j]
|
|
if label.Name == "__name__" {
|
|
label.Value = promrelabel.SanitizeMetricName(label.Value)
|
|
} else {
|
|
label.Name = promrelabel.SanitizeLabelName(label.Name)
|
|
}
|
|
}
|
|
}
|
|
labels = pcs.Apply(labels, labelsLen)
|
|
labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:])
|
|
if len(labels) == labelsLen {
|
|
// Drop the current time series, since relabeling removed all the labels.
|
|
continue
|
|
}
|
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
|
Labels: labels[labelsLen:],
|
|
Samples: ts.Samples,
|
|
})
|
|
}
|
|
rctx.labels = labels
|
|
return tssDst
|
|
}
|
|
|
|
func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) []prompbmarshal.TimeSeries {
|
|
if len(extraLabels) == 0 {
|
|
return tss
|
|
}
|
|
tssDst := tss[:0]
|
|
labels := rctx.labels[:0]
|
|
for i := range tss {
|
|
ts := &tss[i]
|
|
labelsLen := len(labels)
|
|
labels = append(labels, ts.Labels...)
|
|
for j := range extraLabels {
|
|
extraLabel := extraLabels[j]
|
|
if *usePromCompatibleNaming {
|
|
extraLabel.Name = promrelabel.SanitizeLabelName(extraLabel.Name)
|
|
}
|
|
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
|
|
if tmp != nil {
|
|
tmp.Value = extraLabel.Value
|
|
} else {
|
|
labels = append(labels, extraLabel)
|
|
}
|
|
}
|
|
labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:])
|
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
|
Labels: labels[labelsLen:],
|
|
Samples: ts.Samples,
|
|
})
|
|
}
|
|
rctx.labels = labels
|
|
return tssDst
|
|
}
|
|
|
|
type relabelCtx struct {
|
|
// pool for labels, which are used during the relabeling.
|
|
labels []prompbmarshal.Label
|
|
}
|
|
|
|
func (rctx *relabelCtx) reset() {
|
|
promrelabel.CleanLabels(rctx.labels)
|
|
rctx.labels = rctx.labels[:0]
|
|
}
|
|
|
|
var relabelCtxPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &relabelCtx{}
|
|
},
|
|
}
|
|
|
|
func getRelabelCtx() *relabelCtx {
|
|
return relabelCtxPool.Get().(*relabelCtx)
|
|
}
|
|
|
|
func putRelabelCtx(rctx *relabelCtx) {
|
|
rctx.labels = rctx.labels[:0]
|
|
relabelCtxPool.Put(rctx)
|
|
}
|