mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
adds extra_label to all import apis (#1007)
* adds extra_label to all import apis, changes priority for extra_label - now it has priority over original labels * Update README.md Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com> * Update README.md Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com> * adds extra labels to vmagent import api changes order for adding labels, now its added after user values * adds tests for extra_label * import fix Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com>
This commit is contained in:
parent
bc8b38daca
commit
821492bc0b
7 changed files with 75 additions and 15 deletions
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -33,7 +34,9 @@ var (
|
|||
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
|
||||
func InsertHandlerForReader(r io.Reader) error {
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(r, false, "", "", insertRows)
|
||||
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
|
||||
return insertRows(db, rows, nil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -41,17 +44,23 @@ func InsertHandlerForReader(r io.Reader) error {
|
|||
//
|
||||
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
|
||||
func InsertHandlerForHTTP(req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||
q := req.URL.Query()
|
||||
precision := q.Get("precision")
|
||||
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
|
||||
db := q.Get("db")
|
||||
return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows)
|
||||
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
|
||||
return insertRows(db, rows, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(db string, rows []parser.Row) error {
|
||||
func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := getPushCtx()
|
||||
defer putPushCtx(ctx)
|
||||
|
||||
|
@ -82,6 +91,7 @@ func insertRows(db string, rows []parser.Row) error {
|
|||
Value: db,
|
||||
})
|
||||
}
|
||||
commonLabels = append(commonLabels, extraLabels...)
|
||||
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
|
||||
if !*skipMeasurement {
|
||||
ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -19,12 +20,18 @@ var (
|
|||
// InsertHandler processes HTTP OpenTSDB put requests.
|
||||
// See http://opentsdb.net/docs/build/html/api_http/put.html
|
||||
func InsertHandler(req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, insertRows)
|
||||
return parser.ParseStream(req, func(rows []parser.Row) error {
|
||||
return insertRows(rows, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(rows []parser.Row) error {
|
||||
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
|
@ -45,6 +52,7 @@ func insertRows(rows []parser.Row) error {
|
|||
Value: tag.Value,
|
||||
})
|
||||
}
|
||||
labels = append(labels, extraLabels...)
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Value: r.Value,
|
||||
Timestamp: r.Timestamp,
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -20,12 +21,18 @@ var (
|
|||
|
||||
// InsertHandler processes remote write for prometheus.
|
||||
func InsertHandler(req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, insertRows)
|
||||
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
|
||||
return insertRows(tss, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(timeseries []prompb.TimeSeries) error {
|
||||
func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
|
@ -44,6 +51,7 @@ func insertRows(timeseries []prompb.TimeSeries) error {
|
|||
Value: bytesutil.ToUnsafeString(label.Value),
|
||||
})
|
||||
}
|
||||
labels = append(labels, extraLabels...)
|
||||
samplesLen := len(samples)
|
||||
for i := range ts.Samples {
|
||||
sample := &ts.Samples[i]
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
|
@ -37,7 +39,7 @@ var (
|
|||
func InsertHandlerForReader(at *auth.Token, r io.Reader) error {
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
|
||||
return insertRows(at, db, rows, true)
|
||||
return insertRows(at, db, rows, nil, true)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -46,6 +48,10 @@ func InsertHandlerForReader(at *auth.Token, r io.Reader) error {
|
|||
//
|
||||
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
|
||||
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||
q := req.URL.Query()
|
||||
|
@ -53,12 +59,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
|||
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
|
||||
db := q.Get("db")
|
||||
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
|
||||
return insertRows(at, db, rows, false)
|
||||
return insertRows(at, db, rows, extraLabels, false)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccountProjectID bool) error {
|
||||
func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label, mayOverrideAccountProjectID bool) error {
|
||||
ctx := getPushCtx()
|
||||
defer putPushCtx(ctx)
|
||||
|
||||
|
@ -91,6 +97,10 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccount
|
|||
if !hasDBKey {
|
||||
ic.AddLabel("db", db)
|
||||
}
|
||||
for j := range extraLabels {
|
||||
label := &extraLabels[j]
|
||||
ic.AddLabel(label.Name, label.Value)
|
||||
}
|
||||
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
|
||||
if !*skipMeasurement {
|
||||
ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...)
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
|
@ -37,9 +39,13 @@ func InsertHandler(req *http.Request) error {
|
|||
}
|
||||
switch p.Suffix {
|
||||
case "api/put", "opentsdb/api/put":
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, func(rows []parser.Row) error {
|
||||
return insertRows(at, rows)
|
||||
return insertRows(at, rows, extraLabels)
|
||||
})
|
||||
})
|
||||
default:
|
||||
|
@ -47,7 +53,7 @@ func InsertHandler(req *http.Request) error {
|
|||
}
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, rows []parser.Row) error {
|
||||
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := netstorage.GetInsertCtx()
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
|
@ -61,6 +67,10 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
|
|||
tag := &r.Tags[j]
|
||||
ctx.AddLabel(tag.Key, tag.Value)
|
||||
}
|
||||
for j := range extraLabels {
|
||||
label := &extraLabels[j]
|
||||
ctx.AddLabel(label.Name, label.Value)
|
||||
}
|
||||
if hasRelabeling {
|
||||
ctx.ApplyRelabeling()
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
|
@ -21,14 +23,18 @@ var (
|
|||
|
||||
// InsertHandler processes remote write for prometheus.
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, func(timeseries []prompb.TimeSeries) error {
|
||||
return insertRows(at, timeseries)
|
||||
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
|
||||
return insertRows(at, tss, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error {
|
||||
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := netstorage.GetInsertCtx()
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
|
@ -43,6 +49,10 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries) error {
|
|||
for _, srcLabel := range srcLabels {
|
||||
ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value)
|
||||
}
|
||||
for j := range extraLabels {
|
||||
label := &extraLabels[j]
|
||||
ctx.AddLabel(label.Name, label.Value)
|
||||
}
|
||||
if hasRelabeling {
|
||||
ctx.ApplyRelabeling()
|
||||
}
|
||||
|
|
|
@ -409,6 +409,8 @@ The `/api/v1/export` endpoint should return the following response:
|
|||
Note that Influx line protocol expects [timestamps in *nanoseconds* by default](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp),
|
||||
while VictoriaMetrics stores them with *milliseconds* precision.
|
||||
|
||||
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
|
||||
For example, `/write?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
|
||||
|
||||
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
|
||||
|
||||
|
@ -524,6 +526,8 @@ The `/api/v1/export` endpoint should return the following response:
|
|||
{"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]}
|
||||
```
|
||||
|
||||
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
|
||||
For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
|
||||
|
||||
## Prometheus querying API usage
|
||||
|
||||
|
|
Loading…
Reference in a new issue