mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/{vminsert,vmagent}: follow-up after 53a63c6c4c
Extend /api/v1/import/prometheus with the support for Pushgateway way of specifying additional labels. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1415
This commit is contained in:
parent
53a63c6c4c
commit
58d459e8a8
9 changed files with 221 additions and 194 deletions
25
README.md
25
README.md
|
@ -1048,7 +1048,8 @@ Time series data can be imported into VictoriaMetrics via any supported data ing
|
|||
* `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format).
|
||||
See [these docs](#how-to-import-data-in-native-format) for details.
|
||||
* `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details.
|
||||
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format and in [Pushgateway format](https://github.com/prometheus/pushgateway#url).
|
||||
See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
|
||||
### How to import data in JSON line format
|
||||
|
||||
|
@ -1153,9 +1154,11 @@ Note that it could be required to flush response cache after importing historica
|
|||
|
||||
### How to import data in Prometheus exposition format
|
||||
|
||||
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
|
||||
and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md)
|
||||
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
|
||||
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format),
|
||||
in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md)
|
||||
and in [Pushgateway format](https://github.com/prometheus/pushgateway#url) via `/api/v1/import/prometheus` path.
|
||||
|
||||
For example, the following command imports a single line in Prometheus exposition format into VictoriaMetrics:
|
||||
|
||||
<div class="with-copy" markdown="1">
|
||||
|
||||
|
@ -1181,6 +1184,16 @@ It should return something like the following:
|
|||
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
|
||||
```
|
||||
|
||||
The following command imports a single metric via [Pushgateway format](https://github.com/prometheus/pushgateway#url) with `{job="my_app",instance="host123"}` labels:
|
||||
|
||||
<div class="with-copy" markdown="1">
|
||||
|
||||
```console
|
||||
curl -d 'metric{label="abc"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus/metrics/job/my_app/instance/host123'
|
||||
```
|
||||
|
||||
</div>
|
||||
|
||||
Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/prometheus` for importing gzipped data:
|
||||
|
||||
<div class="with-copy" markdown="1">
|
||||
|
@ -1192,8 +1205,8 @@ curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428
|
|||
|
||||
</div>
|
||||
|
||||
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
|
||||
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
||||
Extra labels may be added to all the imported metrics either via [Pushgateway format](https://github.com/prometheus/pushgateway#url)
|
||||
or by passing `extra_label=name=value` query args. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
||||
|
||||
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
|
||||
It can be overridden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/pushgateway"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
|
@ -218,6 +217,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
}
|
||||
|
||||
path := strings.Replace(r.URL.Path, "//", "/", -1)
|
||||
if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus") || strings.HasPrefix(path, "/api/v1/import/prometheus") {
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(nil, r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(path, "datadog/") {
|
||||
// Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
|
||||
|
@ -251,15 +260,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/prometheus/api/v1/import/prometheus", "/api/v1/import/prometheus":
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(nil, r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/prometheus/api/v1/import/native", "/api/v1/import/native":
|
||||
nativeimportRequests.Inc()
|
||||
if err := native.InsertHandler(nil, r); err != nil {
|
||||
|
@ -388,16 +388,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
staticServer.ServeHTTP(w, r)
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(r.URL.Path, "/api/v1/pushgateway") {
|
||||
pushgatewayRequests.Inc()
|
||||
if err := pushgateway.InsertHandler(nil, r); err != nil {
|
||||
pushgatewayErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
}
|
||||
if remotewrite.MultitenancyEnabled() {
|
||||
return processMultitenantRequest(w, r, path)
|
||||
}
|
||||
|
@ -420,6 +410,16 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
|
|||
httpserver.Errorf(w, r, "cannot obtain auth token: %s", err)
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(p.Suffix, "prometheus/api/v1/import/prometheus") {
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(at, r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(p.Suffix, "datadog/") {
|
||||
// Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
|
||||
|
@ -453,15 +453,6 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
|
|||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/prometheus":
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(at, r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/native":
|
||||
nativeimportRequests.Inc()
|
||||
if err := native.InsertHandler(at, r); err != nil {
|
||||
|
@ -519,16 +510,6 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
|
|||
fmt.Fprintf(w, `{}`)
|
||||
return true
|
||||
default:
|
||||
if strings.HasPrefix(r.URL.Path, "/api/v1/pushgateway") {
|
||||
pushgatewayRequests.Inc()
|
||||
if err := pushgateway.InsertHandler(nil, r); err != nil {
|
||||
pushgatewayErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
}
|
||||
httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix)
|
||||
return true
|
||||
}
|
||||
|
@ -547,9 +528,6 @@ var (
|
|||
prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
||||
prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
|
||||
|
||||
pushgatewayRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/pushgateway", protocol="pushgateway"}`)
|
||||
pushgatewayErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/pushgateway", protocol="pushgateway"}`)
|
||||
|
||||
nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
|
||||
nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
|
||||
|
||||
|
|
|
@ -1,127 +0,0 @@
|
|||
package pushgateway
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="pushgateway"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="pushgateway"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="pushgateway"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes `/api/v1/pushgateway` request.
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
// Pushgateway endpoint is of the style: /metrics/job/<JOB_NAME>{/<LABEL_NAME>/<LABEL_VALUE>}
|
||||
// Source:https://github.com/prometheus/pushgateway#url
|
||||
pushgatewayPath := strings.TrimSuffix(strings.Replace(req.URL.Path, "/api/v1/pushgateway", "", 1), "/")
|
||||
pathLabels, err := extractLabelsFromPath(pushgatewayPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defaultTimestamp, err := parserCommon.GetTimestamp(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
||||
return insertRows(at, rows, append(pathLabels, extraLabels...))
|
||||
}, nil)
|
||||
})
|
||||
}
|
||||
|
||||
func extractLabelsFromPath(pushgatewayPath string) ([]prompbmarshal.Label, error) {
|
||||
// Parsing Pushgateway path which is of the style: /metrics/job/<JOB_NAME>{/<LABEL_NAME>/<LABEL_VALUE>}
|
||||
// With an arbitrary number of /<LABEL_NAME>/<LABEL_VALUE> pairs
|
||||
// Source:https://github.com/prometheus/pushgateway#url
|
||||
var result []prompbmarshal.Label
|
||||
if !strings.HasPrefix(pushgatewayPath, "/metrics/job/") {
|
||||
return nil, fmt.Errorf("pushgateway endpoint format is incorrect. Expected /metrics/job/<JOB_NAME>{/<LABEL_NAME>/<LABEL_VALUE>}, got %q ", pushgatewayPath)
|
||||
}
|
||||
labelsString := strings.Replace(pushgatewayPath, "/metrics/job/", "", 1)
|
||||
labelsSlice := strings.Split(labelsString, "/")
|
||||
if len(labelsSlice) == 1 && labelsSlice[0] == "" {
|
||||
return nil, fmt.Errorf("pushgateway path has to contain a job name after /job/. Expected /metrics/job/<JOB_NAME>{/<LABEL_NAME>/<LABEL_VALUE>}, got %q ", pushgatewayPath)
|
||||
}
|
||||
|
||||
//The first value that comes after /metrics/job/JOB_NAME gives origin to a label with key "job" and value "JOB_NAME"
|
||||
result = append(result, prompbmarshal.Label{
|
||||
Name: "job",
|
||||
Value: labelsSlice[0],
|
||||
})
|
||||
|
||||
// We expect the number of items to be odd.
|
||||
// The first item is the job label and after that is key/value pairs
|
||||
if len(labelsSlice)%2 == 0 {
|
||||
return nil, fmt.Errorf("number of label key/pair passed via pushgateway endpoint format does not match")
|
||||
}
|
||||
|
||||
// We start at 1, since index 0 was the job label value, and we jump every 2 - first item is the key, second is the value.
|
||||
for i := 1; i < len(labelsSlice); i = i + 2 {
|
||||
result = append(result, prompbmarshal.Label{
|
||||
Name: labelsSlice[i],
|
||||
Value: labelsSlice[i+1],
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
tssDst := ctx.WriteRequest.Timeseries[:0]
|
||||
labels := ctx.Labels[:0]
|
||||
samples := ctx.Samples[:0]
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
labelsLen := len(labels)
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: r.Metric,
|
||||
})
|
||||
for j := range r.Tags {
|
||||
tag := &r.Tags[j]
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: tag.Key,
|
||||
Value: tag.Value,
|
||||
})
|
||||
}
|
||||
labels = append(labels, extraLabels...)
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Value: r.Value,
|
||||
Timestamp: r.Timestamp,
|
||||
})
|
||||
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
||||
Labels: labels[labelsLen:],
|
||||
Samples: samples[len(samples)-1:],
|
||||
})
|
||||
}
|
||||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
}
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return nil
|
||||
}
|
|
@ -120,6 +120,16 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
staticServer.ServeHTTP(w, r)
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus") || strings.HasPrefix(path, "/api/v1/import/prometheus") {
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(path, "/datadog/") {
|
||||
// Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
|
||||
|
@ -153,15 +163,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/prometheus/api/v1/import/prometheus", "/api/v1/import/prometheus":
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "/prometheus/api/v1/import/native", "/api/v1/import/native":
|
||||
nativeimportRequests.Inc()
|
||||
if err := native.InsertHandler(r); err != nil {
|
||||
|
|
|
@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: add support for [Pushgateway data import format](https://github.com/prometheus/pushgateway#url) via `/api/v1/import/prometheus` url. See [these docs](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1415). Thanks to @PerGon for [the intial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3360).
|
||||
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `http://<vmselect>:8481/admin/tenants` API endpoint for returning a list of registered tenants. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) for details.
|
||||
* FEATURE: [VictoriaMetrics enterprise](https://docs.victoriametrics.com/enterprise.html): add `-storageNode.filter` command-line flag for filtering the [discovered vmstorage nodes](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) with arbitrary regular expressions. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3353).
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): allow using numeric values with `K`, `Ki`, `M`, `Mi`, `G`, `Gi`, `T` and `Ti` suffixes inside MetricsQL queries. For example `8Ki` equals to `8*1024`, while `8.2M` equals to `8.2*1000*1000`.
|
||||
|
|
|
@ -292,7 +292,7 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html
|
|||
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` at `vmselect` (see below).
|
||||
- `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below).
|
||||
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
|
||||
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
|
||||
- URLs for [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:
|
||||
- `<accountID>` is an arbitrary number identifying data namespace for the query (aka tenant)
|
||||
|
|
|
@ -1052,7 +1052,8 @@ Time series data can be imported into VictoriaMetrics via any supported data ing
|
|||
* `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format).
|
||||
See [these docs](#how-to-import-data-in-native-format) for details.
|
||||
* `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details.
|
||||
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format and in [Pushgateway format](https://github.com/prometheus/pushgateway#url).
|
||||
See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
|
||||
|
||||
### How to import data in JSON line format
|
||||
|
||||
|
@ -1157,9 +1158,11 @@ Note that it could be required to flush response cache after importing historica
|
|||
|
||||
### How to import data in Prometheus exposition format
|
||||
|
||||
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
|
||||
and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md)
|
||||
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
|
||||
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format),
|
||||
in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md)
|
||||
and in [Pushgateway format](https://github.com/prometheus/pushgateway#url) via `/api/v1/import/prometheus` path.
|
||||
|
||||
For example, the following command imports a single line in Prometheus exposition format into VictoriaMetrics:
|
||||
|
||||
<div class="with-copy" markdown="1">
|
||||
|
||||
|
@ -1185,6 +1188,16 @@ It should return something like the following:
|
|||
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
|
||||
```
|
||||
|
||||
The following command imports a single metric via [Pushgateway format](https://github.com/prometheus/pushgateway#url) with `{job="my_app",instance="host123"}` labels:
|
||||
|
||||
<div class="with-copy" markdown="1">
|
||||
|
||||
```console
|
||||
curl -d 'metric{label="abc"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus/metrics/job/my_app/instance/host123'
|
||||
```
|
||||
|
||||
</div>
|
||||
|
||||
Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/prometheus` for importing gzipped data:
|
||||
|
||||
<div class="with-copy" markdown="1">
|
||||
|
@ -1196,8 +1209,8 @@ curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428
|
|||
|
||||
</div>
|
||||
|
||||
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
|
||||
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
||||
Extra labels may be added to all the imported metrics either via [Pushgateway format](https://github.com/prometheus/pushgateway#url)
|
||||
or by passing `extra_label=name=value` query args. For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
|
||||
|
||||
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
|
||||
It can be overridden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
@ -9,18 +10,74 @@ import (
|
|||
)
|
||||
|
||||
// GetExtraLabels extracts name:value labels from `extra_label=name=value` query args from req.
|
||||
//
|
||||
// It also extracts Pushgateways-compatible extra labels from req.URL.Path
|
||||
// according to https://github.com/prometheus/pushgateway#url .
|
||||
func GetExtraLabels(req *http.Request) ([]prompbmarshal.Label, error) {
|
||||
labels, err := getPushgatewayLabels(req.URL.Path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse pushgateway-style labels from %q: %w", req.URL.Path, err)
|
||||
}
|
||||
q := req.URL.Query()
|
||||
var result []prompbmarshal.Label
|
||||
for _, label := range q["extra_label"] {
|
||||
tmp := strings.SplitN(label, "=", 2)
|
||||
if len(tmp) != 2 {
|
||||
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", label)
|
||||
}
|
||||
result = append(result, prompbmarshal.Label{
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: tmp[0],
|
||||
Value: tmp[1],
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
func getPushgatewayLabels(path string) ([]prompbmarshal.Label, error) {
|
||||
n := strings.Index(path, "/metrics/job")
|
||||
if n < 0 {
|
||||
return nil, nil
|
||||
}
|
||||
s := path[n+len("/metrics/"):]
|
||||
if !strings.HasPrefix(s, "job/") && !strings.HasPrefix(s, "job@base64/") {
|
||||
return nil, nil
|
||||
}
|
||||
labelsCount := (strings.Count(s, "/") + 1) / 2
|
||||
labels := make([]prompbmarshal.Label, 0, labelsCount)
|
||||
for len(s) > 0 {
|
||||
n := strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("missing value for label %q", s)
|
||||
}
|
||||
name := s[:n]
|
||||
s = s[n+1:]
|
||||
isBase64 := strings.HasSuffix(name, "@base64")
|
||||
if isBase64 {
|
||||
name = name[:len(name)-len("@base64")]
|
||||
}
|
||||
var value string
|
||||
n = strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
value = s
|
||||
s = ""
|
||||
} else {
|
||||
value = s[:n]
|
||||
s = s[n+1:]
|
||||
}
|
||||
if isBase64 {
|
||||
data, err := base64.URLEncoding.DecodeString(value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot base64-decode value=%q for label=%q: %w", value, name, err)
|
||||
}
|
||||
value = string(data)
|
||||
}
|
||||
if len(value) == 0 {
|
||||
// Skip labels with empty values
|
||||
continue
|
||||
}
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
return labels, nil
|
||||
}
|
||||
|
|
91
lib/protoparser/common/extra_labels_test.go
Normal file
91
lib/protoparser/common/extra_labels_test.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
func TestGetExtraLabelsSuccess(t *testing.T) {
|
||||
f := func(requestURI, expectedLabels string) {
|
||||
t.Helper()
|
||||
fullURL := "http://fobar" + requestURI
|
||||
req, err := http.NewRequest("GET", fullURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse %q: %s", fullURL, err)
|
||||
}
|
||||
extraLabels, err := GetExtraLabels(req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
labelsStr := getLabelsString(extraLabels)
|
||||
if labelsStr != expectedLabels {
|
||||
t.Fatalf("unexpected labels;\ngot\n%s\nwant\n%s", labelsStr, expectedLabels)
|
||||
}
|
||||
}
|
||||
f("", `{}`)
|
||||
f("/foo/bar", `{}`)
|
||||
f("/foo?extra_label=foo=bar", `{foo="bar"}`)
|
||||
f("/foo?extra_label=a=x&extra_label=b=y", `{a="x",b="y"}`)
|
||||
f("/metrics/job/foo", `{job="foo"}`)
|
||||
f("/metrics/job/foo?extra_label=a=b", `{a="b",job="foo"}`)
|
||||
f("/metrics/job/foo/b/bcd?extra_label=a=b&extra_label=qwe=rty", `{a="b",b="bcd",job="foo",qwe="rty"}`)
|
||||
f("/metrics/job/titan/name/%CE%A0%CF%81%CE%BF%CE%BC%CE%B7%CE%B8%CE%B5%CF%8D%CF%82", `{job="titan",name="Προμηθεύς"}`)
|
||||
f("/metrics/job/titan/name@base64/zqDPgc6_zrzOt864zrXPjc-C", `{job="titan",name="Προμηθεύς"}`)
|
||||
}
|
||||
|
||||
func TestGetPushgatewayLabelsSuccess(t *testing.T) {
|
||||
f := func(path, expectedLabels string) {
|
||||
t.Helper()
|
||||
labels, err := getPushgatewayLabels(path)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in getPushgatewayLabels(%q): %s", path, err)
|
||||
}
|
||||
labelsStr := getLabelsString(labels)
|
||||
if labelsStr != expectedLabels {
|
||||
t.Fatalf("unexpected labels returned from getPushgatewayLabels(%q);\ngot\n%s\nwant\n%s", path, labelsStr, expectedLabels)
|
||||
}
|
||||
}
|
||||
f("", "{}")
|
||||
f("/foo/bar", "{}")
|
||||
f("/metrics/foo/bar", "{}")
|
||||
f("/metrics/job", "{}")
|
||||
f("/metrics/job@base64", "{}")
|
||||
f("/metrics/job/", "{}")
|
||||
f("/metrics/job/foo", `{job="foo"}`)
|
||||
f("/foo/metrics/job/foo", `{job="foo"}`)
|
||||
f("/api/v1/import/prometheus/metrics/job/foo", `{job="foo"}`)
|
||||
f("/foo/metrics/job@base64/Zm9v", `{job="foo"}`)
|
||||
f("/foo/metrics/job/x/a/foo/aaa/bar", `{a="foo",aaa="bar",job="x"}`)
|
||||
f("/foo/metrics/job/x/a@base64/Zm9v", `{a="foo",job="x"}`)
|
||||
}
|
||||
|
||||
func TestGetPushgatewayLabelsFailure(t *testing.T) {
|
||||
f := func(path string) {
|
||||
t.Helper()
|
||||
labels, err := getPushgatewayLabels(path)
|
||||
if err == nil {
|
||||
labelsStr := getLabelsString(labels)
|
||||
t.Fatalf("expecting non-nil error for getPushgatewayLabels(%q); got labels %s", path, labelsStr)
|
||||
}
|
||||
}
|
||||
// missing bar value
|
||||
f("/metrics/job/foo/bar")
|
||||
// invalid base64 encoding for job
|
||||
f("/metrics/job@base64/#$%")
|
||||
// invalid base64 encoding for non-job label
|
||||
f("/metrics/job/foo/bar@base64/#$%")
|
||||
}
|
||||
|
||||
func getLabelsString(labels []prompbmarshal.Label) string {
|
||||
a := make([]string, len(labels))
|
||||
for i, label := range labels {
|
||||
a[i] = fmt.Sprintf("%s=%q", label.Name, label.Value)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return "{" + strings.Join(a, ",") + "}"
|
||||
}
|
Loading…
Reference in a new issue