app/vminsert: add ability to apply relabeling to all the incoming metrics if -relabelConfig command-line arg points to a file with a list of relabel_config entries

See https://victoriametrics.github.io/#relabeling
This commit is contained in:
Aliaksandr Valialkin 2020-07-02 19:42:12 +03:00
parent a45856570b
commit 6ebac3ab63
13 changed files with 213 additions and 12 deletions

View file

@ -16,7 +16,7 @@ var (
unparsedLabelsGlobal = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
"Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage")
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+
"before sending them to -remoteWrite.url. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config for details")
"before sending them to -remoteWrite.url. See https://victoriametrics.github.io/vmagent.html#relabeling for details")
relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url")
)

View file

@ -38,6 +38,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
ctx.ApplyRelabeling()
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -52,6 +52,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
}
ctx.AddLabel(tag.Key, tag.Value)
}
ctx.ApplyRelabeling()
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -8,8 +8,10 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
@ -63,6 +65,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount
ic.Reset() // This line is required for initializing ic internals.
rowsTotal := 0
atCopy := *at
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
ic.Labels = ic.Labels[:0]
@ -86,25 +89,41 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount
if len(db) > 0 && !hasDBLabel {
ic.AddLabel("db", db)
}
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels)
metricNameBufLen := len(ic.MetricNameBuf)
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:0], r.Measurement...)
skipFieldKey := len(r.Fields) == 1 && *skipSingleField
if len(ctx.metricGroupBuf) > 0 && !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...)
}
metricGroupPrefixLen := len(ctx.metricGroupBuf)
ic.AddLabel("", "placeholder")
placeholderLabel := &ic.Labels[len(ic.Labels)-1]
var labels []prompb.Label
if !hasRelabeling {
labels = ic.Labels
}
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, labels)
metricNameBufLen := len(ic.MetricNameBuf)
labelsLen := len(ic.Labels)
for j := range r.Fields {
f := &r.Fields[j]
if !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
}
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
ic.Labels = ic.Labels[:len(ic.Labels)-1]
ic.Labels = ic.Labels[:labelsLen]
ic.AddLabel("", metricGroup)
ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf[:metricNameBufLen], placeholderLabel)
if hasRelabeling {
ic.ApplyRelabeling()
labels = ic.Labels
if len(labels) == 0 {
// Skip metric without labels.
continue
}
} else {
labels = ic.Labels[labelsLen : labelsLen+1]
}
ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen]
for i := range labels {
ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &labels[i])
}
storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels)
if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
return err

View file

@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
@ -68,6 +69,7 @@ func main() {
netstorage.InitStorageNodes(*storageNodes)
logger.Infof("successfully initialized netstorage in %.3f seconds", time.Since(startTime).Seconds())
relabel.Init()
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
writeconcurrencylimiter.Init()

View file

@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -23,6 +24,8 @@ type InsertCtx struct {
bufRowss []bufRows
labelsBuf []byte
relabelCtx relabel.Ctx
}
type bufRows struct {
@ -50,7 +53,8 @@ func (br *bufRows) pushTo(sn *storageNode) error {
// Reset resets ctx.
func (ctx *InsertCtx) Reset() {
for _, label := range ctx.Labels {
for i := range ctx.Labels {
label := &ctx.Labels[i]
label.Name = nil
label.Value = nil
}
@ -64,6 +68,7 @@ func (ctx *InsertCtx) Reset() {
ctx.bufRowss[i].reset()
}
ctx.labelsBuf = ctx.labelsBuf[:0]
ctx.relabelCtx.Reset()
}
// AddLabelBytes adds (name, value) label to ctx.Labels.
@ -106,6 +111,11 @@ func (ctx *InsertCtx) AddLabel(name, value string) {
ctx.Labels = labels
}
// ApplyRelabeling applies relabeling to ctx.Labels.
func (ctx *InsertCtx) ApplyRelabeling() {
ctx.Labels = ctx.relabelCtx.ApplyRelabeling(ctx.Labels)
}
// WriteDataPoint writes (timestamp, value) data point with the given at and labels to ctx buffer.
func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels)

View file

@ -52,6 +52,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
}
ctx.AddLabel(tag.Key, tag.Value)
}
ctx.ApplyRelabeling()
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -59,6 +59,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
ctx.ApplyRelabeling()
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}

View file

@ -35,12 +35,19 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error {
rowsTotal := 0
for i := range timeseries {
ts := &timeseries[i]
storageNodeIdx := ctx.GetStorageNodeIdx(at, ts.Labels)
// Make a shallow copy of ts.Labels before calling ctx.ApplyRelabeling, since ctx.ApplyRelabeling may modify labels.
ctx.Labels = append(ctx.Labels[:0], ts.Labels...)
ctx.ApplyRelabeling()
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
ctx.MetricNameBuf = ctx.MetricNameBuf[:0]
for i := range ts.Samples {
r := &ts.Samples[i]
if len(ctx.MetricNameBuf) == 0 {
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ts.Labels)
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
}
if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil {
return err

View file

@ -0,0 +1,120 @@
package relabel
import (
"flag"
"fmt"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
var relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
"See https://victoriametrics.github.io/#relabeling for details")
// Init must be called after flag.Parse and before using the relabel package.
func Init() {
prcs, err := loadRelabelConfig()
if err != nil {
logger.Fatalf("cannot load relabelConfig: %s", err)
}
prcsGlobal.Store(&prcs)
if len(*relabelConfig) == 0 {
return
}
sighupCh := procutil.NewSighupChan()
go func() {
for range sighupCh {
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
prcs, err := loadRelabelConfig()
if err != nil {
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
continue
}
prcsGlobal.Store(&prcs)
logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig)
}
}()
}
var prcsGlobal atomic.Value
func loadRelabelConfig() ([]promrelabel.ParsedRelabelConfig, error) {
if len(*relabelConfig) == 0 {
return nil, nil
}
prcs, err := promrelabel.LoadRelabelConfigs(*relabelConfig)
if err != nil {
return nil, fmt.Errorf("error when reading -relabelConfig=%q: %w", *relabelConfig, err)
}
return prcs, nil
}
// HasRelabeling returns true if there is global relabeling.
func HasRelabeling() bool {
prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig)
return len(*prcs) > 0
}
// Ctx holds relabeling context.
type Ctx struct {
// tmpLabels is used during ApplyRelabeling call.
tmpLabels []prompbmarshal.Label
}
// Reset resets ctx.
func (ctx *Ctx) Reset() {
labels := ctx.tmpLabels
for i := range labels {
label := &labels[i]
label.Name = ""
label.Value = ""
}
ctx.tmpLabels = ctx.tmpLabels[:0]
}
// ApplyRelabeling applies relabeling to the given labels and returns the result.
//
// The returned labels are valid until the next call to ApplyRelabeling.
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig)
if len(*prcs) == 0 {
return labels
}
// Convert src to prompbmarshal.Label format suitable for relabeling.
tmpLabels := ctx.tmpLabels[:0]
for _, label := range labels {
name := bytesutil.ToUnsafeString(label.Name)
if len(name) == 0 {
name = "__name__"
}
value := bytesutil.ToUnsafeString(label.Value)
tmpLabels = append(tmpLabels, prompbmarshal.Label{
Name: name,
Value: value,
})
}
// Apply relabeling
tmpLabels = promrelabel.ApplyRelabelConfigs(tmpLabels, 0, *prcs, true)
ctx.tmpLabels = tmpLabels
// Return back labels to the desired format.
dst := labels[:0]
for _, label := range tmpLabels {
name := bytesutil.ToUnsafeBytes(label.Name)
if label.Name == "__name__" {
name = nil
}
value := bytesutil.ToUnsafeBytes(label.Value)
dst = append(dst, prompb.Label{
Name: name,
Value: value,
})
}
return dst
}

View file

@ -41,6 +41,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
tag := &r.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
ctx.ApplyRelabeling()
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
values := r.Values

View file

@ -79,6 +79,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
* [/api/v1/import](#how-to-import-time-series-data).
* [Arbitrary CSV data](#how-to-import-csv-data).
* Supports metrics' relabeling. See [these docs](#relabeling) for details.
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
@ -111,6 +112,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to delete time series](#how-to-delete-time-series)
* [How to export time series](#how-to-export-time-series)
* [How to import time series data](#how-to-import-time-series-data)
* [Relabeling](#relabeling)
* [Federation](#federation)
* [Capacity planning](#capacity-planning)
* [High availability](#high-availability)
@ -650,7 +652,7 @@ The delete API is intended mainly for the following cases:
It isn't recommended using delete API for the following cases, since it brings non-zero overhead:
* Regular cleanups for unneeded data. Just prevent writing unneeded data into VictoriaMetrics.
This can be done with relabeling in [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md).
This can be done with [relabeling](#relabeling).
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
@ -724,6 +726,22 @@ Note that it could be required to flush response cache after importing historica
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
and importing them concurrently. Note that the original file must be split on newlines.
### Relabeling
VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points
to a file containing a list of [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) entries.
Additionally VictoriaMetrics provides the following extra actions for relabeling rules:
* `replace_all`: replaces all the occurences of `regex` in the values of `source_labels` with the `replacement` and stores the result in the `target_label`.
* `labelmap_all`: replaces all the occurences of `regex` in all the label names with the `replacement`.
* `keep_if_equal`: keeps the entry if all label values from `source_labels` are equal.
* `drop_if_equal`: drops the entry if all the label values from `source_labels` are equal.
See also [relabeling in vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md#relabeling).
### Federation
VictoriaMetrics exports [Prometheus-compatible federation data](https://prometheus.io/docs/prometheus/latest/federation/)

View file

@ -90,7 +90,7 @@ func parseRelabelConfig(dst []ParsedRelabelConfig, rc *RelabelConfig) ([]ParsedR
return dst, fmt.Errorf("missing `source_labels` for `action=replace_all`")
}
if targetLabel == "" {
return dst, fmt.Errorf("missing `target_label` for `action=replace`")
return dst, fmt.Errorf("missing `target_label` for `action=replace_all`")
}
case "keep_if_equal":
if len(sourceLabels) < 2 {