app/{vminsert,vmagent}: allow adding extra labels when importing data via Prometheus, CSV and JSON line formats

Extra labels may be added to the imported data by passing `extra_label=name=value` query args.
Multiple query args may be passed in order to add multiple extra labels.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/719
This commit is contained in:
Aliaksandr Valialkin 2020-09-02 19:41:12 +03:00
parent 038358b777
commit f41b36bb9a
9 changed files with 114 additions and 12 deletions

View file

@ -516,6 +516,9 @@ The following response should be returned:
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
@ -540,6 +543,9 @@ It should return somethins like the following:
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
@ -762,6 +768,9 @@ curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export
curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import -T exported_data.jsonl.gz
```
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts

View file

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -18,12 +19,18 @@ var (
// InsertHandler processes csv data from req.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -44,6 +51,7 @@ func insertRows(rows []parser.Row) error {
Value: tag.Value,
})
}
labels = append(labels, extraLabels...)
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,

View file

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -18,13 +19,19 @@ var (
// InsertHandler processes `/api/v1/import/prometheus` request.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, insertRows)
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -45,6 +52,7 @@ func insertRows(rows []parser.Row) error {
Value: tag.Value,
})
}
labels = append(labels, extraLabels...)
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -21,12 +22,18 @@ var (
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -44,6 +51,7 @@ func insertRows(rows []parser.Row) error {
Value: bytesutil.ToUnsafeString(tag.Value),
})
}
labels = append(labels, extraLabels...)
values := r.Values
timestamps := r.Timestamps
_ = timestamps[len(values)-1]

View file

@ -5,6 +5,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -17,14 +19,18 @@ var (
// InsertHandler processes /api/v1/import/csv requests.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows)
return insertRows(rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
@ -38,6 +44,10 @@ func insertRows(rows []parser.Row) error {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}

View file

@ -5,6 +5,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@ -17,13 +19,19 @@ var (
// InsertHandler processes `/api/v1/import/prometheus` request.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, insertRows)
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
@ -37,6 +45,10 @@ func insertRows(rows []parser.Row) error {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}

View file

@ -7,6 +7,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
@ -22,12 +24,18 @@ var (
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
@ -46,6 +54,10 @@ func insertRows(rows []parser.Row) error {
tag := &r.Tags[j]
ic.AddLabelBytes(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ic.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ic.ApplyRelabeling()
}

View file

@ -516,6 +516,9 @@ The following response should be returned:
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
@ -540,6 +543,9 @@ It should return somethins like the following:
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
@ -762,6 +768,9 @@ curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export
curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import -T exported_data.jsonl.gz
```
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts

View file

@ -0,0 +1,26 @@
package common
import (
"fmt"
"net/http"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// GetExtraLabels extracts name:value labels from `extra_label=name=value` query args from req.
func GetExtraLabels(req *http.Request) ([]prompbmarshal.Label, error) {
q := req.URL.Query()
var result []prompbmarshal.Label
for _, label := range q["extra_label"] {
tmp := strings.SplitN(label, "=", 2)
if len(tmp) != 2 {
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", label)
}
result = append(result, prompbmarshal.Label{
Name: tmp[0],
Value: tmp[1],
})
}
return result, nil
}