From 821492bc0b79499d140420f3be40ea222491c482 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 13 Jan 2021 01:52:50 +0300 Subject: [PATCH] adds extra_label to all import apis (#1007) * adds extra_label to all import apis, changes priority for extra_label - now it has priority over original labels * Update README.md Co-authored-by: Aliaksandr Valialkin * Update README.md Co-authored-by: Aliaksandr Valialkin * adds extra labels to vmagent import api changes order for adding labels, now its added after user values * adds tests for extra_label * import fix Co-authored-by: Aliaksandr Valialkin --- app/vmagent/influx/request_handler.go | 16 +++++++++++++--- app/vmagent/opentsdbhttp/request_handler.go | 12 ++++++++++-- app/vmagent/promremotewrite/request_handler.go | 12 ++++++++++-- app/vminsert/influx/request_handler.go | 16 +++++++++++++--- app/vminsert/opentsdbhttp/request_handler.go | 14 ++++++++++++-- app/vminsert/promremotewrite/request_handler.go | 16 +++++++++++++--- docs/Single-server-VictoriaMetrics.md | 4 ++++ 7 files changed, 75 insertions(+), 15 deletions(-) diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 6235e61f9..b39d365f3 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -33,7 +34,9 @@ var ( // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader) error { return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, false, "", "", insertRows) + return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { + return insertRows(db, rows, nil) + }) }) } @@ -41,17 +44,23 @@ func InsertHandlerForReader(r io.Reader) error { // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md func InsertHandlerForHTTP(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" q := req.URL.Query() precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") - return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows) + return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return insertRows(db, rows, extraLabels) + }) }) } -func insertRows(db string, rows []parser.Row) error { +func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) @@ -82,6 +91,7 @@ func insertRows(db string, rows []parser.Row) error { Value: db, }) } + commonLabels = append(commonLabels, extraLabels...) ctx.metricGroupBuf = ctx.metricGroupBuf[:0] if !*skipMeasurement { ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...) diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index 365cbe0aa..b3026ab86 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -19,12 +20,18 @@ var ( // InsertHandler processes HTTP OpenTSDB put requests. // See http://opentsdb.net/docs/build/html/api_http/put.html func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -45,6 +52,7 @@ func insertRows(rows []parser.Row) error { Value: tag.Value, }) } + labels = append(labels, extraLabels...) samples = append(samples, prompbmarshal.Sample{ Value: r.Value, Timestamp: r.Timestamp, diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 00dfcd614..0039e4ef4 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -20,12 +21,18 @@ var ( // InsertHandler processes remote write for prometheus. func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { + return insertRows(tss, extraLabels) + }) }) } -func insertRows(timeseries []prompb.TimeSeries) error { +func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -44,6 +51,7 @@ func insertRows(timeseries []prompb.TimeSeries) error { Value: bytesutil.ToUnsafeString(label.Value), }) } + labels = append(labels, extraLabels...) samplesLen := len(samples) for i := range ts.Samples { sample := &ts.Samples[i] diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 87dc9ed45..d563b085d 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -12,6 +12,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" @@ -37,7 +39,7 @@ var ( func InsertHandlerForReader(at *auth.Token, r io.Reader) error { return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { - return insertRows(at, db, rows, true) + return insertRows(at, db, rows, nil, true) }) }) } @@ -46,6 +48,10 @@ func InsertHandlerForReader(at *auth.Token, r io.Reader) error { // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" q := req.URL.Query() @@ -53,12 +59,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { - return insertRows(at, db, rows, false) + return insertRows(at, db, rows, extraLabels, false) }) }) } -func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccountProjectID bool) error { +func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label, mayOverrideAccountProjectID bool) error { ctx := getPushCtx() defer putPushCtx(ctx) @@ -91,6 +97,10 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount if !hasDBKey { ic.AddLabel("db", db) } + for j := range extraLabels { + label := &extraLabels[j] + ic.AddLabel(label.Name, label.Value) + } ctx.metricGroupBuf = ctx.metricGroupBuf[:0] if !*skipMeasurement { ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...) diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 44812120e..494c85603 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -8,6 +8,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -37,9 +39,13 @@ func InsertHandler(req *http.Request) error { } switch p.Suffix { case "api/put", "opentsdb/api/put": + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(at, rows) + return insertRows(at, rows, extraLabels) }) }) default: @@ -47,7 +53,7 @@ func InsertHandler(req *http.Request) error { } } -func insertRows(at *auth.Token, rows []parser.Row) error { +func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := netstorage.GetInsertCtx() defer netstorage.PutInsertCtx(ctx) @@ -61,6 +67,10 @@ func insertRows(at *auth.Token, rows []parser.Row) error { tag := &r.Tags[j] ctx.AddLabel(tag.Key, tag.Value) } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } if hasRelabeling { ctx.ApplyRelabeling() } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 8282faa4f..cbc90dfc2 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -7,6 +7,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" @@ -21,14 +23,18 @@ var ( // InsertHandler processes remote write for prometheus. func InsertHandler(at *auth.Token, req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, func(timeseries []prompb.TimeSeries) error { - return insertRows(at, timeseries) + return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { + return insertRows(at, tss, extraLabels) }) }) } -func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error { +func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { ctx := netstorage.GetInsertCtx() defer netstorage.PutInsertCtx(ctx) @@ -43,6 +49,10 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error { for _, srcLabel := range srcLabels { ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value) } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } if hasRelabeling { ctx.ApplyRelabeling() } diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index f4b64cc22..e60d13534 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -409,6 +409,8 @@ The `/api/v1/export` endpoint should return the following response: Note that Influx line protocol expects [timestamps in *nanoseconds* by default](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp), while VictoriaMetrics stores them with *milliseconds* precision. +Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. +For example, `/write?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. ## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd) @@ -524,6 +526,8 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]} ``` +Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. +For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. ## Prometheus querying API usage