app/{vminsert,vmagent}: add ability to ingest data via DataDog "submit metrics" API

See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206
This commit is contained in:
Aliaksandr Valialkin 2021-09-28 22:47:45 +03:00
parent 80df31b2ee
commit 4e65bfcc00
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
16 changed files with 751 additions and 17 deletions

View file

@ -187,12 +187,11 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co
- `<accountID>` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`,
where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`.
- `<suffix>` may have the following values:
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
- `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/).
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html).
This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag.
See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below).
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
- `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` at `vmselect` (see below).
- `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below).
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.

View file

@ -21,6 +21,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did
See [Quick Start](#quick-start) for details.
* Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details.
* Accepts data via all ingestion protocols supported by VictoriaMetrics:
* DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent).
* InfluxDB line protocol via `http://<vmagent>:8429/write`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-opentsdb-compatible-agents).

View file

@ -0,0 +1,99 @@
package datadog
import (
"fmt"
"net/http"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadog"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadog"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels)
})
})
}
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: ss.Metric,
})
labels = append(labels, prompbmarshal.Label{
Name: "host",
Value: ss.Host,
})
for _, tag := range ss.Tags {
n := strings.IndexByte(tag, ':')
if n < 0 {
return fmt.Errorf("cannot find ':' in tag %q", tag)
}
name := tag[:n]
value := tag[n+1:]
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: name,
Value: value,
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for _, pt := range ss.Points {
samples = append(samples, prompbmarshal.Sample{
Timestamp: pt.Timestamp(),
Value: pt.Value(),
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View file

@ -11,6 +11,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
@ -224,6 +225,36 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
influxQueryRequests.Inc()
influxutils.WriteDatabaseNames(w)
return true
case "/datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
datadogWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, `{"valid":true}`)
return true
case "/datadog/api/v1/check_run":
datadogCheckRunRequests.Inc()
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/intake/":
datadogIntakeRequests.Inc()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, `{}`)
return true
case "/targets":
promscrapeTargetsRequests.Inc()
promscrape.WriteHumanReadableTargetsStatus(w, r)
@ -330,6 +361,35 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
influxQueryRequests.Inc()
influxutils.WriteDatabaseNames(w)
return true
case "datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, `{"valid":true}`)
return true
case "datadog/api/v1/check_run":
datadogCheckRunRequests.Inc()
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/intake/":
datadogIntakeRequests.Inc()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, `{}`)
return true
default:
httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix)
return true
@ -352,10 +412,17 @@ var (
nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/influx/write", protocol="influx"}`)
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/query", protocol="influx"}`)
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)
datadogIntakeRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/intake/", protocol="datadog"}`)
promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/targets"}`)

View file

@ -0,0 +1,97 @@
package datadog
import (
"fmt"
"net/http"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadog"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="datadog"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
ce := req.Header.Get("Content-Encoding")
err := parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels)
})
if err != nil {
return fmt.Errorf("headers: %q; err: %w", req.Header, err)
}
return nil
})
}
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset()
rowsTotal := 0
hasRelabeling := relabel.HasRelabeling()
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", ss.Metric)
ctx.AddLabel("host", ss.Host)
for _, tag := range ss.Tags {
n := strings.IndexByte(tag, ':')
if n < 0 {
return fmt.Errorf("cannot find ':' in tag %q", tag)
}
name := tag[:n]
value := tag[n+1:]
if name == "host" {
name = "exported_host"
}
ctx.AddLabel(name, value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
for _, pt := range ss.Points {
timestamp := pt.Timestamp()
value := pt.Value()
if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
return err
}
}
}
rowsInserted.Add(rowsTotal)
rowsTenantInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
@ -238,6 +239,36 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
influxQueryRequests.Inc()
influxutils.WriteDatabaseNames(w)
return true
case "datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, `{"valid":true}`)
return true
case "datadog/api/v1/check_run":
datadogCheckRunRequests.Inc()
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/intake/":
datadogIntakeRequests.Inc()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, `{}`)
return true
default:
// This is not our link
return false
@ -262,11 +293,18 @@ var (
nativeimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import/native", protocol="nativeimport"}`)
nativeimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import/native", protocol="nativeimport"}`)
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/influx/", protocol="influx"}`)
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/influx/write", protocol="influx"}`)
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/datadog/api/v1/series", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/api/v1/check_run", protocol="datadog"}`)
datadogIntakeRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/datadog/intake/", protocol="datadog"}`)
_ = metrics.NewGauge(`vm_metrics_with_dropped_labels_total`, func() float64 {
return float64(atomic.LoadUint64(&storage.MetricsWithDroppedLabels))
})

View file

@ -6,6 +6,7 @@ sort: 15
## tip
* FEATURE: add ability to accept metrics from [DataDog agent](https://docs.datadoghq.com/agent/) and [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent). This option simplifies the migration path from DataDog to VictoriaMetrics. See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206).
* FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading from [Apache Kafka](https://kafka.apache.org/).
* FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues.
* FEATURE: add `rollup_scrape_interval(m[d])` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which returns `min`, `max` and `avg` values for the interval between samples for `m` on the given lookbehind window `d`.

View file

@ -191,12 +191,11 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co
- `<accountID>` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`,
where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`.
- `<suffix>` may have the following values:
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
- `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/).
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html).
This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag.
See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below).
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
- `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `influx/write` and `influx/api/v2/write` - for inserting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` at `vmselect` (see below).
- `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below).
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.

View file

@ -265,6 +265,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](#
See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus.
## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path.
Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`.
Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line:
```bash
echo '
{
"series": [
{
"host": "test.example.com",
"interval": 20,
"metric": "system.load.1",
"points": [[
0,
0.5
]],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series
```
The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format):
```bash
curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1'
```
This command should return the following output if everything is OK:
```
{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]}
```
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
Use `http://<victoriametric-addr>:8428` url instead of InfluxDB url in agents' configs.
@ -790,6 +836,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv
Time series data can be imported via any supported ingestion protocol:
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details.
* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details.
* InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.

View file

@ -269,6 +269,52 @@ VictoriaMetrics also supports [importing data in Prometheus exposition format](#
See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be used as drop-in replacement for Prometheus.
## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD]() via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v1/series` path.
Run DataDog agent with `DD_DD_URL=http://victoriametrics-host:8428/datadog` environment variable in order to write data to VictoriaMetrics at `victoriametrics-host` host. Another option is to set `dd_url` param at [DataDog agent configuration file](https://docs.datadoghq.com/agent/guide/agent-configuration-files/) to `http://victoriametrics-host:8428/datadog`.
Example on how to send data to VictoriaMetrics via DataDog "submit metrics" API from command line:
```bash
echo '
{
"series": [
{
"host": "test.example.com",
"interval": 20,
"metric": "system.load.1",
"points": [[
0,
0.5
]],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
' | curl -X POST --data-binary @- http://localhost:8428/datadog/api/v1/series
```
The imported data can be read via [export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format):
```bash
curl http://localhost:8428/api/v1/export -d 'match[]=system.load.1'
```
This command should return the following output if everything is OK:
```
{"metric":{"__name__":"system.load.1","environment":"test","host":"test.example.com"},"values":[0.5],"timestamps":[1632833641000]}
```
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
## How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
Use `http://<victoriametric-addr>:8428` url instead of InfluxDB url in agents' configs.
@ -794,6 +840,7 @@ The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv
Time series data can be imported via any supported ingestion protocol:
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). See [these docs](#prometheus-setup) for details.
* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details.
* InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.

View file

@ -25,6 +25,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did
See [Quick Start](#quick-start) for details.
* Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details.
* Accepts data via all ingestion protocols supported by VictoriaMetrics:
* DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent).
* InfluxDB line protocol via `http://<vmagent>:8429/write`. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-opentsdb-compatible-agents).

View file

@ -5,6 +5,7 @@ import (
"sync"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zlib"
)
// GetGzipReader returns new gzip reader from the pool.
@ -29,3 +30,24 @@ func PutGzipReader(zr *gzip.Reader) {
}
var gzipReaderPool sync.Pool
// GetZlibReader returns zlib reader.
func GetZlibReader(r io.Reader) (io.ReadCloser, error) {
v := zlibReaderPool.Get()
if v == nil {
return zlib.NewReader(r)
}
zr := v.(io.ReadCloser)
if err := zr.(zlib.Resetter).Reset(r, nil); err != nil {
return nil, err
}
return zr, nil
}
// PutZlibReader returns back zlib reader obtained via GetZlibReader.
func PutZlibReader(zr io.ReadCloser) {
_ = zr.Close()
zlibReaderPool.Put(zr)
}
var zlibReaderPool sync.Pool

View file

@ -0,0 +1,73 @@
package datadog
import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// Request represents DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Request struct {
Series []Series `json:"series"`
}
func (req *Request) reset() {
req.Series = req.Series[:0]
}
// Unmarshal unmarshals DataDog /api/v1/series request body from b to req.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use.
func (req *Request) Unmarshal(b []byte) error {
req.reset()
if err := json.Unmarshal(b, req); err != nil {
return fmt.Errorf("cannot unmarshal %q: %w", b, err)
}
// Set missing timestamps to the current time.
currentTimestamp := float64(fasttime.UnixTimestamp())
series := req.Series
for i := range series {
points := series[i].Points
for j := range points {
if points[j][0] <= 0 {
points[j][0] = currentTimestamp
}
}
}
return nil
}
// Series represents a series item from DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Series struct {
Host string `json:"host"`
// Do not decode Interval, since it isn't used by VictoriaMetrics
// Interval int64 `json:"interval"`
Metric string `json:"metric"`
Points []Point `json:"points"`
Tags []string `json:"tags"`
// Do not decode Type, since it isn't used by VictoriaMetrics
// Type string `json:"type"`
}
// Point represents a point from DataDog POST request to /api/v1/series
type Point [2]float64
// Timestamp returns timestamp in milliseconds from the given pt.
func (pt *Point) Timestamp() int64 {
return int64(pt[0] * 1000)
}
// Value returns value from the given pt.
func (pt *Point) Value() float64 {
return pt[1]
}

View file

@ -0,0 +1,66 @@
package datadog
import (
"reflect"
"testing"
)
func TestRequestUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var req Request
if err := req.Unmarshal([]byte(s)); err == nil {
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
}
}
f("")
f("foobar")
f(`{"series":123`)
f(`1234`)
f(`[]`)
}
func TestRequestUnmarshalSuccess(t *testing.T) {
f := func(s string, reqExpected *Request) {
t.Helper()
var req Request
if err := req.Unmarshal([]byte(s)); err != nil {
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
}
if !reflect.DeepEqual(&req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected)
}
}
f("{}", &Request{})
f(`
{
"series": [
{
"host": "test.example.com",
"interval": 20,
"metric": "system.load.1",
"points": [[
1575317847,
0.5
]],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}
`, &Request{
Series: []Series{{
Host: "test.example.com",
Metric: "system.load.1",
Points: []Point{{
1575317847,
0.5,
}},
Tags: []string{
"environment:test",
},
}},
})
}

View file

@ -0,0 +1,39 @@
package datadog
import (
"fmt"
"testing"
)
func BenchmarkRequestUnmarshal(b *testing.B) {
reqBody := []byte(`{
"series": [
{
"host": "test.example.com",
"interval": 20,
"metric": "system.load.1",
"points": [
1575317847,
0.5
],
"tags": [
"environment:test"
],
"type": "rate"
}
]
}`)
b.SetBytes(int64(len(reqBody)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var req Request
for pb.Next() {
if err := req.Unmarshal(reqBody); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
if len(req.Series) != 1 {
panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series)))
}
}
})
}

View file

@ -0,0 +1,138 @@
package datadog
import (
"bufio"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
)
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
var maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series")
// ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
//
// callback shouldn't hold series after returning.
func ParseStream(r io.Reader, contentEncoding string, callback func(series []Series) error) error {
switch contentEncoding {
case "gzip":
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped DataDog data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
case "deflate":
zlr, err := common.GetZlibReader(r)
if err != nil {
return fmt.Errorf("cannot read deflated DataDog data: %w", err)
}
defer common.PutZlibReader(zlr)
r = zlr
}
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err
}
req := getRequest()
defer putRequest(req)
if err := req.Unmarshal(ctx.reqBuf.B); err != nil {
unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %s", len(ctx.reqBuf.B), err)
}
rows := 0
series := req.Series
for i := range series {
rows += len(series[i].Points)
}
rowsRead.Add(rows)
if err := callback(series); err != nil {
return fmt.Errorf("error when processing imported data: %w", err)
}
return nil
}
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
}
func (ctx *pushCtx) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
}
func (ctx *pushCtx) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1)
startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
if reqLen > int64(maxInsertRequestSize.N) {
readErrors.Inc()
return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N)
}
return nil
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadog"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadog"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadog"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadog"}`)
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *Request {
v := requestPool.Get()
if v == nil {
return &Request{}
}
return v.(*Request)
}
func putRequest(req *Request) {
requestPool.Put(req)
}
var requestPool sync.Pool