From 6ebac3ab63551d3b7d0cc43d3070213b04a7ad83 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 2 Jul 2020 19:42:12 +0300 Subject: [PATCH] app/vminsert: add ability to apply relabeling to all the incoming metrics if `-relabelConfig` command-line arg points to a file with a list of `relabel_config` entries See https://victoriametrics.github.io/#relabeling --- app/vmagent/remotewrite/relabel.go | 2 +- app/vminsert/csvimport/request_handler.go | 5 + app/vminsert/graphite/request_handler.go | 5 + app/vminsert/influx/request_handler.go | 31 ++++- app/vminsert/main.go | 2 + app/vminsert/netstorage/insert_ctx.go | 12 +- app/vminsert/opentsdb/request_handler.go | 5 + app/vminsert/opentsdbhttp/request_handler.go | 5 + .../promremotewrite/request_handler.go | 11 +- app/vminsert/relabel/relabel.go | 120 ++++++++++++++++++ app/vminsert/vmimport/request_handler.go | 5 + docs/Single-server-VictoriaMetrics.md | 20 ++- lib/promrelabel/config.go | 2 +- 13 files changed, 213 insertions(+), 12 deletions(-) create mode 100644 app/vminsert/relabel/relabel.go diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 7b739a71c..6adede560 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -16,7 +16,7 @@ var ( unparsedLabelsGlobal = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+ "Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage") relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+ - "before sending them to -remoteWrite.url. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config for details") + "before sending them to -remoteWrite.url. See https://victoriametrics.github.io/vmagent.html#relabeling for details") relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url") ) diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index 8d7df4ee7..472bec359 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -38,6 +38,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { tag := &r.Tags[j] ctx.AddLabel(tag.Key, tag.Value) } + ctx.ApplyRelabeling() + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 14a557fba..8760c2342 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -52,6 +52,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { } ctx.AddLabel(tag.Key, tag.Value) } + ctx.ApplyRelabeling() + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index b046f1b73..1cf44929b 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -8,8 +8,10 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" @@ -63,6 +65,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount ic.Reset() // This line is required for initializing ic internals. rowsTotal := 0 atCopy := *at + hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] ic.Labels = ic.Labels[:0] @@ -86,25 +89,41 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount if len(db) > 0 && !hasDBLabel { ic.AddLabel("db", db) } - ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels) - metricNameBufLen := len(ic.MetricNameBuf) ctx.metricGroupBuf = append(ctx.metricGroupBuf[:0], r.Measurement...) skipFieldKey := len(r.Fields) == 1 && *skipSingleField if len(ctx.metricGroupBuf) > 0 && !skipFieldKey { ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...) } metricGroupPrefixLen := len(ctx.metricGroupBuf) - ic.AddLabel("", "placeholder") - placeholderLabel := &ic.Labels[len(ic.Labels)-1] + var labels []prompb.Label + if !hasRelabeling { + labels = ic.Labels + } + ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, labels) + metricNameBufLen := len(ic.MetricNameBuf) + labelsLen := len(ic.Labels) for j := range r.Fields { f := &r.Fields[j] if !skipFieldKey { ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...) } metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf) - ic.Labels = ic.Labels[:len(ic.Labels)-1] + ic.Labels = ic.Labels[:labelsLen] ic.AddLabel("", metricGroup) - ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf[:metricNameBufLen], placeholderLabel) + if hasRelabeling { + ic.ApplyRelabeling() + labels = ic.Labels + if len(labels) == 0 { + // Skip metric without labels. + continue + } + } else { + labels = ic.Labels[labelsLen : labelsLen+1] + } + ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen] + for i := range labels { + ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &labels[i]) + } storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels) if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { return err diff --git a/app/vminsert/main.go b/app/vminsert/main.go index e31e7d8aa..3daf7d8ba 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" @@ -68,6 +69,7 @@ func main() { netstorage.InitStorageNodes(*storageNodes) logger.Infof("successfully initialized netstorage in %.3f seconds", time.Since(startTime).Seconds()) + relabel.Init() storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) writeconcurrencylimiter.Init() diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index c9f09b19c..9973254e8 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -23,6 +24,8 @@ type InsertCtx struct { bufRowss []bufRows labelsBuf []byte + + relabelCtx relabel.Ctx } type bufRows struct { @@ -50,7 +53,8 @@ func (br *bufRows) pushTo(sn *storageNode) error { // Reset resets ctx. func (ctx *InsertCtx) Reset() { - for _, label := range ctx.Labels { + for i := range ctx.Labels { + label := &ctx.Labels[i] label.Name = nil label.Value = nil } @@ -64,6 +68,7 @@ func (ctx *InsertCtx) Reset() { ctx.bufRowss[i].reset() } ctx.labelsBuf = ctx.labelsBuf[:0] + ctx.relabelCtx.Reset() } // AddLabelBytes adds (name, value) label to ctx.Labels. @@ -106,6 +111,11 @@ func (ctx *InsertCtx) AddLabel(name, value string) { ctx.Labels = labels } +// ApplyRelabeling applies relabeling to ctx.Labels. +func (ctx *InsertCtx) ApplyRelabeling() { + ctx.Labels = ctx.relabelCtx.ApplyRelabeling(ctx.Labels) +} + // WriteDataPoint writes (timestamp, value) data point with the given at and labels to ctx buffer. func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error { ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels) diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 8d4ce0ba2..42ae8a538 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -52,6 +52,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { } ctx.AddLabel(tag.Key, tag.Value) } + ctx.ApplyRelabeling() + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index adc4cfd5c..580a89e8f 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -59,6 +59,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { tag := &r.Tags[j] ctx.AddLabel(tag.Key, tag.Value) } + ctx.ApplyRelabeling() + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 04e29acfb..085aad2fd 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -35,12 +35,19 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error { rowsTotal := 0 for i := range timeseries { ts := ×eries[i] - storageNodeIdx := ctx.GetStorageNodeIdx(at, ts.Labels) + // Make a shallow copy of ts.Labels before calling ctx.ApplyRelabeling, since ctx.ApplyRelabeling may modify labels. + ctx.Labels = append(ctx.Labels[:0], ts.Labels...) + ctx.ApplyRelabeling() + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) ctx.MetricNameBuf = ctx.MetricNameBuf[:0] for i := range ts.Samples { r := &ts.Samples[i] if len(ctx.MetricNameBuf) == 0 { - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ts.Labels) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) } if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil { return err diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go new file mode 100644 index 000000000..0bcc2d4c6 --- /dev/null +++ b/app/vminsert/relabel/relabel.go @@ -0,0 +1,120 @@ +package relabel + +import ( + "flag" + "fmt" + "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" +) + +var relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+ + "See https://victoriametrics.github.io/#relabeling for details") + +// Init must be called after flag.Parse and before using the relabel package. +func Init() { + prcs, err := loadRelabelConfig() + if err != nil { + logger.Fatalf("cannot load relabelConfig: %s", err) + } + prcsGlobal.Store(&prcs) + if len(*relabelConfig) == 0 { + return + } + sighupCh := procutil.NewSighupChan() + go func() { + for range sighupCh { + logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig) + prcs, err := loadRelabelConfig() + if err != nil { + logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err) + continue + } + prcsGlobal.Store(&prcs) + logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig) + } + }() +} + +var prcsGlobal atomic.Value + +func loadRelabelConfig() ([]promrelabel.ParsedRelabelConfig, error) { + if len(*relabelConfig) == 0 { + return nil, nil + } + prcs, err := promrelabel.LoadRelabelConfigs(*relabelConfig) + if err != nil { + return nil, fmt.Errorf("error when reading -relabelConfig=%q: %w", *relabelConfig, err) + } + return prcs, nil +} + +// HasRelabeling returns true if there is global relabeling. +func HasRelabeling() bool { + prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig) + return len(*prcs) > 0 +} + +// Ctx holds relabeling context. +type Ctx struct { + // tmpLabels is used during ApplyRelabeling call. + tmpLabels []prompbmarshal.Label +} + +// Reset resets ctx. +func (ctx *Ctx) Reset() { + labels := ctx.tmpLabels + for i := range labels { + label := &labels[i] + label.Name = "" + label.Value = "" + } + ctx.tmpLabels = ctx.tmpLabels[:0] +} + +// ApplyRelabeling applies relabeling to the given labels and returns the result. +// +// The returned labels are valid until the next call to ApplyRelabeling. +func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { + prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig) + if len(*prcs) == 0 { + return labels + } + // Convert src to prompbmarshal.Label format suitable for relabeling. + tmpLabels := ctx.tmpLabels[:0] + for _, label := range labels { + name := bytesutil.ToUnsafeString(label.Name) + if len(name) == 0 { + name = "__name__" + } + value := bytesutil.ToUnsafeString(label.Value) + tmpLabels = append(tmpLabels, prompbmarshal.Label{ + Name: name, + Value: value, + }) + } + + // Apply relabeling + tmpLabels = promrelabel.ApplyRelabelConfigs(tmpLabels, 0, *prcs, true) + ctx.tmpLabels = tmpLabels + + // Return back labels to the desired format. + dst := labels[:0] + for _, label := range tmpLabels { + name := bytesutil.ToUnsafeBytes(label.Name) + if label.Name == "__name__" { + name = nil + } + value := bytesutil.ToUnsafeBytes(label.Value) + dst = append(dst, prompb.Label{ + Name: name, + Value: value, + }) + } + return dst +} diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 377f22979..1489dfe48 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -41,6 +41,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { tag := &r.Tags[j] ctx.AddLabelBytes(tag.Key, tag.Value) } + ctx.ApplyRelabeling() + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) values := r.Values diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index e453360a8..e15addbf5 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -79,6 +79,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. * [/api/v1/import](#how-to-import-time-series-data). * [Arbitrary CSV data](#how-to-import-csv-data). +* Supports metrics' relabeling. See [these docs](#relabeling) for details. * Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads. * Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster). * See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles). @@ -111,6 +112,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [How to delete time series](#how-to-delete-time-series) * [How to export time series](#how-to-export-time-series) * [How to import time series data](#how-to-import-time-series-data) +* [Relabeling](#relabeling) * [Federation](#federation) * [Capacity planning](#capacity-planning) * [High availability](#high-availability) @@ -650,7 +652,7 @@ The delete API is intended mainly for the following cases: It isn't recommended using delete API for the following cases, since it brings non-zero overhead: * Regular cleanups for unneeded data. Just prevent writing unneeded data into VictoriaMetrics. - This can be done with relabeling in [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md). + This can be done with [relabeling](#relabeling). See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details. * Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted time series occupy disk space until the next merge operation, which can never occur when deleting too old data. @@ -724,6 +726,22 @@ Note that it could be required to flush response cache after importing historica Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts and importing them concurrently. Note that the original file must be split on newlines. + +### Relabeling + +VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points +to a file containing a list of [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) entries. + +Additionally VictoriaMetrics provides the following extra actions for relabeling rules: + +* `replace_all`: replaces all the occurences of `regex` in the values of `source_labels` with the `replacement` and stores the result in the `target_label`. +* `labelmap_all`: replaces all the occurences of `regex` in all the label names with the `replacement`. +* `keep_if_equal`: keeps the entry if all label values from `source_labels` are equal. +* `drop_if_equal`: drops the entry if all the label values from `source_labels` are equal. + +See also [relabeling in vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md#relabeling). + + ### Federation VictoriaMetrics exports [Prometheus-compatible federation data](https://prometheus.io/docs/prometheus/latest/federation/) diff --git a/lib/promrelabel/config.go b/lib/promrelabel/config.go index f0134c0ac..8904ee033 100644 --- a/lib/promrelabel/config.go +++ b/lib/promrelabel/config.go @@ -90,7 +90,7 @@ func parseRelabelConfig(dst []ParsedRelabelConfig, rc *RelabelConfig) ([]ParsedR return dst, fmt.Errorf("missing `source_labels` for `action=replace_all`") } if targetLabel == "" { - return dst, fmt.Errorf("missing `target_label` for `action=replace`") + return dst, fmt.Errorf("missing `target_label` for `action=replace_all`") } case "keep_if_equal": if len(sourceLabels) < 2 {