app/{vminsert,vmagent}: preliminary support for /api/v2/series ingestion from new versions of DataDog Agent

This commit adds only JSON support - https://docs.datadoghq.com/api/latest/metrics/#submit-metrics ,
while recent versions of DataDog Agent send data to /api/v2/series in undocumented Protobuf format.
The support for this format will be added later.

Thanks to @AndrewChubatiuk for the initial implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451
This commit is contained in:
Aliaksandr Valialkin 2023-12-21 18:29:10 +02:00
parent 8c1dcf4743
commit fb90a56de2
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
22 changed files with 870 additions and 184 deletions

View file

@ -516,10 +516,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent ## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path.
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
at `/datadog/api/v1/series` path.
### Sending metrics to VictoriaMetrics ### Sending metrics to VictoriaMetrics
@ -531,12 +529,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu
</p> </p>
To configure DataDog agent via ENV variable add the following prefix: To configure DataDog agent via ENV variable add the following prefix:
<div class="with-copy" markdown="1">
<div class="with-copy" markdown="1">
``` ```
DD_DD_URL=http://victoriametrics:8428/datadog DD_DD_URL=http://victoriametrics:8428/datadog
``` ```
</div> </div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ _Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -545,14 +542,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d
add the following line: add the following line:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
dd_url: http://victoriametrics:8428/datadog dd_url: http://victoriametrics:8428/datadog
``` ```
</div> </div>
vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data, [vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics ### Sending metrics to Datadog and VictoriaMetrics
@ -567,12 +562,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad
Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver: Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}' DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}'
``` ```
</div> </div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ _Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -582,19 +575,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog
add the following line: add the following line:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
additional_endpoints: additional_endpoints:
"http://victoriametrics:8428/datadog": "http://victoriametrics:8428/datadog":
- apikey - apikey
``` ```
</div> </div>
### Send via cURL ### Send via cURL
See how to send data to VictoriaMetrics via See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series).
[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line.
The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export). The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export).
@ -605,7 +595,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/
If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics. If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics.
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to
undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet. undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet.
@ -2580,7 +2570,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-csvTrimTimestamp duration -csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size -datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName -datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View file

@ -1,4 +1,4 @@
package datadog package datadogv1
import ( import (
"net/http" "net/http"
@ -8,33 +8,32 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var ( var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadog"}`) rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadogv1"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadog"}`) rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadogv1"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadogv1"}`)
) )
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. // InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req) extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil { if err != nil {
return err return err
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []datadog.Series) error { return stream.Parse(req.Body, ce, func(series []datadogv1.Series) error {
return insertRows(at, series, extraLabels) return insertRows(at, series, extraLabels)
}) })
} }
func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, series []datadogv1.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)
@ -63,7 +62,7 @@ func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmar
}) })
} }
for _, tag := range ss.Tags { for _, tag := range ss.Tags {
name, value := datadog.SplitTag(tag) name, value := datadogutils.SplitTag(tag)
if name == "host" { if name == "host" {
name = "exported_host" name = "exported_host"
} }

View file

@ -0,0 +1,96 @@
package datadogv2
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadogv2"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadogv2"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadogv2"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v2/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ct := req.Header.Get("Content-Type")
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, ct, func(series []datadogv2.Series) error {
return insertRows(at, series, extraLabels)
})
}
func insertRows(at *auth.Token, series []datadogv2.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: ss.Metric,
})
for _, rs := range ss.Resources {
labels = append(labels, prompbmarshal.Label{
Name: rs.Type,
Value: rs.Name,
})
}
for _, tag := range ss.Tags {
name, value := datadogutils.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: name,
Value: value,
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for _, pt := range ss.Points {
samples = append(samples, prompbmarshal.Sample{
Timestamp: pt.Timestamp * 1000,
Value: pt.Value,
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View file

@ -12,7 +12,8 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
@ -343,9 +344,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, `{"status":"ok"}`) fmt.Fprintf(w, `{"status":"ok"}`)
return true return true
case "/datadog/api/v1/series": case "/datadog/api/v1/series":
datadogWriteRequests.Inc() datadogv1WriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil { if err := datadogv1.InsertHandlerForHTTP(nil, r); err != nil {
datadogWriteErrors.Inc() datadogv1WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v2/series":
datadogv2WriteRequests.Inc()
if err := datadogv2.InsertHandlerForHTTP(nil, r); err != nil {
datadogv2WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return true
} }
@ -566,9 +578,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
fmt.Fprintf(w, `{"status":"ok"}`) fmt.Fprintf(w, `{"status":"ok"}`)
return true return true
case "datadog/api/v1/series": case "datadog/api/v1/series":
datadogWriteRequests.Inc() datadogv1WriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil { if err := datadogv1.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteErrors.Inc() datadogv1WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v2/series":
datadogv2WriteRequests.Inc()
if err := datadogv2.InsertHandlerForHTTP(at, r); err != nil {
datadogv2WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return true
} }
@ -626,8 +648,11 @@ var (
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`) influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) datadogv1WriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) datadogv1WriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogv2WriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogv2WriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)

View file

@ -1,4 +1,4 @@
package datadog package datadogv1
import ( import (
"net/http" "net/http"
@ -7,31 +7,30 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var ( var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadog"}`) rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadogv1"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`) rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadogv1"}`)
) )
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. // InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(req *http.Request) error { func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req) extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil { if err != nil {
return err return err
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []parser.Series) error { return stream.Parse(req.Body, ce, func(series []datadogv1.Series) error {
return insertRows(series, extraLabels) return insertRows(series, extraLabels)
}) })
} }
func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error { func insertRows(series []datadogv1.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx() ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx) defer common.PutInsertCtx(ctx)
@ -54,7 +53,7 @@ func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error
ctx.AddLabel("device", ss.Device) ctx.AddLabel("device", ss.Device)
} }
for _, tag := range ss.Tags { for _, tag := range ss.Tags {
name, value := parser.SplitTag(tag) name, value := datadogutils.SplitTag(tag)
if name == "host" { if name == "host" {
name = "exported_host" name = "exported_host"
} }

View file

@ -0,0 +1,88 @@
package datadogv2
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadogv2"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadogv2"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v2/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ct := req.Header.Get("Content-Type")
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, ct, func(series []datadogv2.Series) error {
return insertRows(series, extraLabels)
})
}
func insertRows(series []datadogv2.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
rowsLen := 0
for i := range series {
rowsLen += len(series[i].Points)
}
ctx.Reset(rowsLen)
rowsTotal := 0
hasRelabeling := relabel.HasRelabeling()
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", ss.Metric)
for _, rs := range ss.Resources {
ctx.AddLabel(rs.Type, rs.Name)
}
for _, tag := range ss.Tags {
name, value := datadogutils.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
ctx.AddLabel(name, value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
var metricNameRaw []byte
var err error
for _, pt := range ss.Points {
timestamp := pt.Timestamp * 1000
value := pt.Value
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value)
if err != nil {
return err
}
}
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}

View file

@ -13,7 +13,8 @@ import (
vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
@ -246,9 +247,20 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, `{"status":"ok"}`) fmt.Fprintf(w, `{"status":"ok"}`)
return true return true
case "/datadog/api/v1/series": case "/datadog/api/v1/series":
datadogWriteRequests.Inc() datadogv1WriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(r); err != nil { if err := datadogv1.InsertHandlerForHTTP(r); err != nil {
datadogWriteErrors.Inc() datadogv1WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v2/series":
datadogv2WriteRequests.Inc()
if err := datadogv2.InsertHandlerForHTTP(r); err != nil {
datadogv2WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return true
} }
@ -371,8 +383,11 @@ var (
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/query", protocol="influx"}`) influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) datadogv1WriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) datadogv1WriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogv2WriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogv2WriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)

View file

@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by
## tip ## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). JSON protocol is supperted right now. Protobuf protocol will be supported later. See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags: * FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags:
- `-datasource.oauth2.endpointParams` for `-datasource.url` - `-datasource.oauth2.endpointParams` for `-datasource.url`

View file

@ -363,7 +363,8 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details. - `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details. - `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details.
- `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry). - `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry).
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - `datadog/api/v1/series` - for ingesting data with DataDog submit metrics API v1. See [these docs](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) for details.
- `datadog/api/v2/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. - `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
- `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details. - `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details.
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.

View file

@ -519,10 +519,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent ## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path.
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
at `/datadog/api/v1/series` path.
### Sending metrics to VictoriaMetrics ### Sending metrics to VictoriaMetrics
@ -534,12 +532,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu
</p> </p>
To configure DataDog agent via ENV variable add the following prefix: To configure DataDog agent via ENV variable add the following prefix:
<div class="with-copy" markdown="1">
<div class="with-copy" markdown="1">
``` ```
DD_DD_URL=http://victoriametrics:8428/datadog DD_DD_URL=http://victoriametrics:8428/datadog
``` ```
</div> </div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ _Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -548,14 +545,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d
add the following line: add the following line:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
dd_url: http://victoriametrics:8428/datadog dd_url: http://victoriametrics:8428/datadog
``` ```
</div> </div>
vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data, [vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics ### Sending metrics to Datadog and VictoriaMetrics
@ -570,12 +565,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad
Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver: Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}' DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}'
``` ```
</div> </div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ _Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -585,19 +578,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog
add the following line: add the following line:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
additional_endpoints: additional_endpoints:
"http://victoriametrics:8428/datadog": "http://victoriametrics:8428/datadog":
- apikey - apikey
``` ```
</div> </div>
### Send via cURL ### Send via cURL
See how to send data to VictoriaMetrics via See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series).
[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line.
The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export). The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export).
@ -608,7 +598,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/
If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics. If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics.
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to
undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet. undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet.
@ -2583,7 +2573,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-csvTrimTimestamp duration -csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size -datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName -datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View file

@ -527,10 +527,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent ## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/)
or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path.
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics)
at `/datadog/api/v1/series` path.
### Sending metrics to VictoriaMetrics ### Sending metrics to VictoriaMetrics
@ -542,12 +540,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu
</p> </p>
To configure DataDog agent via ENV variable add the following prefix: To configure DataDog agent via ENV variable add the following prefix:
<div class="with-copy" markdown="1">
<div class="with-copy" markdown="1">
``` ```
DD_DD_URL=http://victoriametrics:8428/datadog DD_DD_URL=http://victoriametrics:8428/datadog
``` ```
</div> </div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ _Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -556,14 +553,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d
add the following line: add the following line:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
dd_url: http://victoriametrics:8428/datadog dd_url: http://victoriametrics:8428/datadog
``` ```
</div> </div>
vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data, [vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics ### Sending metrics to Datadog and VictoriaMetrics
@ -578,12 +573,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad
Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver: Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}' DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}'
``` ```
</div> </div>
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ _Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._
@ -593,19 +586,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog
add the following line: add the following line:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
``` ```
additional_endpoints: additional_endpoints:
"http://victoriametrics:8428/datadog": "http://victoriametrics:8428/datadog":
- apikey - apikey
``` ```
</div> </div>
### Send via cURL ### Send via cURL
See how to send data to VictoriaMetrics via See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series).
[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line.
The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export). The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export).
@ -616,7 +606,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/
If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics. If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics.
Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. Extra labels may be added to all the written time series by passing `extra_label=name=value` query args.
For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics.
DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to
undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet. undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet.
@ -2591,7 +2581,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-csvTrimTimestamp duration -csvTrimTimestamp duration
Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-datadog.maxInsertRequestSize size -datadog.maxInsertRequestSize size
The maximum size in bytes of a single DataDog POST request to /api/v1/series The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-datadog.sanitizeMetricName -datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)

View file

@ -473,7 +473,7 @@ http://vminsert:8480/insert/0/datadog
### /datadog/api/v1/series ### /datadog/api/v1/series
**Imports data in DataDog format into VictoriaMetrics** **Imports data in DataDog v1 format into VictoriaMetrics**
Single-node VictoriaMetrics: Single-node VictoriaMetrics:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
@ -531,7 +531,79 @@ echo '
Additional information: Additional information:
* [How to send data from datadog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) * [How to send data from DataDog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent)
* [URL format for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
### /datadog/api/v2/series
**Imports data in [DataDog v2](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) format into VictoriaMetrics**
Single-node VictoriaMetrics:
<div class="with-copy" markdown="1">
```console
echo '
{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 0,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:8428/datadog/api/v2/series
```
</div>
Cluster version of VictoriaMetrics:
<div class="with-copy" markdown="1">
```console
echo '
{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 0,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- 'http://<vminsert>:8480/insert/0/datadog/api/v2/series'
```
</div>
Additional information:
* [How to send data from DataDog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent)
* [URL format for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) * [URL format for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
### /federate ### /federate

View file

@ -0,0 +1,57 @@
package datadogutils
import (
"flag"
"regexp"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
)
var (
// MaxInsertRequestSize is the maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
MaxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v2/series")
// SanitizeMetricName controls sanitizing metric names ingested via DataDog protocols.
//
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
// But there's some hidden behaviour. In addition to what it states in the docs, the following is also done:
// - Consecutive underscores are replaced with just one underscore
// - Underscore immediately before or after a dot are removed
SanitizeMetricName = flag.Bool("datadog.sanitizeMetricName", true, "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at "+
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
)
// SplitTag splits DataDog tag into tag name and value.
//
// See https://docs.datadoghq.com/getting_started/tagging/#define-tags
func SplitTag(tag string) (string, string) {
n := strings.IndexByte(tag, ':')
if n < 0 {
// No tag value.
return tag, "no_label_value"
}
return tag[:n], tag[n+1:]
}
// SanitizeName performs DataDog-compatible sanitizing for metric names
//
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
func SanitizeName(name string) string {
return namesSanitizer.Transform(name)
}
var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
s = unsupportedDatadogChars.ReplaceAllString(s, "_")
s = multiUnderscores.ReplaceAllString(s, "_")
s = underscoresWithDots.ReplaceAllString(s, ".")
return s
})
var (
unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`)
multiUnderscores = regexp.MustCompile(`_+`)
underscoresWithDots = regexp.MustCompile(`_?\._?`)
)

View file

@ -1,13 +1,30 @@
package stream package datadogutils
import ( import (
"testing" "testing"
) )
func TestSplitTag(t *testing.T) {
f := func(s, nameExpected, valueExpected string) {
t.Helper()
name, value := SplitTag(s)
if name != nameExpected {
t.Fatalf("unexpected name obtained from %q; got %q; want %q", s, name, nameExpected)
}
if value != valueExpected {
t.Fatalf("unexpected value obtained from %q; got %q; want %q", s, value, valueExpected)
}
}
f("", "", "no_label_value")
f("foo", "foo", "no_label_value")
f("foo:bar", "foo", "bar")
f(":bar", "", "bar")
}
func TestSanitizeName(t *testing.T) { func TestSanitizeName(t *testing.T) {
f := func(s, resultExpected string) { f := func(s, resultExpected string) {
t.Helper() t.Helper()
result := sanitizeName(s) result := SanitizeName(s)
if result != resultExpected { if result != resultExpected {
t.Fatalf("unexpected result for sanitizeName(%q); got\n%q\nwant\n%q", s, result, resultExpected) t.Fatalf("unexpected result for sanitizeName(%q); got\n%q\nwant\n%q", s, result, resultExpected)
} }

View file

@ -1,28 +1,13 @@
package datadog package datadogv1
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
// SplitTag splits DataDog tag into tag name and value.
//
// See https://docs.datadoghq.com/getting_started/tagging/#define-tags
func SplitTag(tag string) (string, string) {
n := strings.IndexByte(tag, ':')
if n < 0 {
// No tag value.
return tag, "no_label_value"
}
return tag[:n], tag[n+1:]
}
// Request represents DataDog POST request to /api/v1/series // Request represents DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Request struct { type Request struct {
Series []Series `json:"series"` Series []Series `json:"series"`
} }
@ -41,8 +26,6 @@ func (req *Request) reset() {
// Unmarshal unmarshals DataDog /api/v1/series request body from b to req. // Unmarshal unmarshals DataDog /api/v1/series request body from b to req.
// //
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use. // b shouldn't be modified when req is in use.
func (req *Request) Unmarshal(b []byte) error { func (req *Request) Unmarshal(b []byte) error {
req.reset() req.reset()
@ -64,8 +47,6 @@ func (req *Request) Unmarshal(b []byte) error {
} }
// Series represents a series item from DataDog POST request to /api/v1/series // Series represents a series item from DataDog POST request to /api/v1/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Series struct { type Series struct {
Metric string `json:"metric"` Metric string `json:"metric"`
Host string `json:"host"` Host string `json:"host"`

View file

@ -1,27 +1,10 @@
package datadog package datadogv1
import ( import (
"reflect" "reflect"
"testing" "testing"
) )
func TestSplitTag(t *testing.T) {
f := func(s, nameExpected, valueExpected string) {
t.Helper()
name, value := SplitTag(s)
if name != nameExpected {
t.Fatalf("unexpected name obtained from %q; got %q; want %q", s, name, nameExpected)
}
if value != valueExpected {
t.Fatalf("unexpected value obtained from %q; got %q; want %q", s, value, valueExpected)
}
}
f("", "", "no_label_value")
f("foo", "foo", "no_label_value")
f("foo:bar", "foo", "bar")
f(":bar", "", "bar")
}
func TestRequestUnmarshalMissingHost(t *testing.T) { func TestRequestUnmarshalMissingHost(t *testing.T) {
// This tests https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432 // This tests https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
req := Request{ req := Request{

View file

@ -1,4 +1,4 @@
package datadog package datadogv1
import ( import (
"fmt" "fmt"
@ -12,10 +12,10 @@ func BenchmarkRequestUnmarshal(b *testing.B) {
"host": "test.example.com", "host": "test.example.com",
"interval": 20, "interval": 20,
"metric": "system.load.1", "metric": "system.load.1",
"points": [ "points": [[
1575317847, 1575317847,
0.5 0.5
], ]],
"tags": [ "tags": [
"environment:test" "environment:test"
], ],

View file

@ -2,39 +2,24 @@ package stream
import ( import (
"bufio" "bufio"
"flag"
"fmt" "fmt"
"io" "io"
"regexp"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var (
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series")
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
// But there's some hidden behaviour. In addition to what it states in the docs, the following is also done:
// - Consecutive underscores are replaced with just one underscore
// - Underscore immediately before or after a dot are removed
sanitizeMetricName = flag.Bool("datadog.sanitizeMetricName", true, "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at "+
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
)
// Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. // Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
// //
// callback shouldn't hold series after returning. // callback shouldn't hold series after returning.
func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error { func Parse(r io.Reader, contentEncoding string, callback func(series []datadogv1.Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -70,8 +55,8 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.S
series := req.Series series := req.Series
for i := range series { for i := range series {
rows += len(series[i].Points) rows += len(series[i].Points)
if *sanitizeMetricName { if *datadogutils.SanitizeMetricName {
series[i].Metric = sanitizeName(series[i].Metric) series[i].Metric = datadogutils.SanitizeName(series[i].Metric)
} }
} }
rowsRead.Add(rows) rowsRead.Add(rows)
@ -94,25 +79,25 @@ func (ctx *pushCtx) reset() {
func (ctx *pushCtx) Read() error { func (ctx *pushCtx) Read() error {
readCalls.Inc() readCalls.Inc()
lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1) lr := io.LimitReader(ctx.br, int64(datadogutils.MaxInsertRequestSize.N)+1)
startTime := fasttime.UnixTimestamp() startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr) reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil { if err != nil {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
} }
if reqLen > int64(maxInsertRequestSize.N) { if reqLen > int64(datadogutils.MaxInsertRequestSize.N) {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", datadogutils.MaxInsertRequestSize.N)
} }
return nil return nil
} }
var ( var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadog"}`) readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadogv1"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadog"}`) readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadogv1"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadog"}`) rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadogv1"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadog"}`) unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadogv1"}`)
) )
func getPushCtx(r io.Reader) *pushCtx { func getPushCtx(r io.Reader) *pushCtx {
@ -144,36 +129,16 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadog.Request { func getRequest() *datadogv1.Request {
v := requestPool.Get() v := requestPool.Get()
if v == nil { if v == nil {
return &datadog.Request{} return &datadogv1.Request{}
} }
return v.(*datadog.Request) return v.(*datadogv1.Request)
} }
func putRequest(req *datadog.Request) { func putRequest(req *datadogv1.Request) {
requestPool.Put(req) requestPool.Put(req)
} }
var requestPool sync.Pool var requestPool sync.Pool
// sanitizeName performs DataDog-compatible sanitizing for metric names
//
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
func sanitizeName(name string) string {
return namesSanitizer.Transform(name)
}
var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
s = unsupportedDatadogChars.ReplaceAllString(s, "_")
s = multiUnderscores.ReplaceAllString(s, "_")
s = underscoresWithDots.ReplaceAllString(s, ".")
return s
})
var (
unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`)
multiUnderscores = regexp.MustCompile(`_+`)
underscoresWithDots = regexp.MustCompile(`_?\._?`)
)

View file

@ -0,0 +1,143 @@
package datadogv2
import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// Request represents DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Request struct {
Series []Series `json:"series"`
}
func (req *Request) reset() {
// recursively reset all the fields in req in order to avoid field value
// re-use in json.Unmarshal() when the corresponding field is missing
// in the unmarshaled JSON.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432
series := req.Series
for i := range series {
series[i].reset()
}
req.Series = series[:0]
}
// UnmarshalJSON unmarshals JSON DataDog /api/v2/series request body from b to req.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use.
func UnmarshalJSON(req *Request, b []byte) error {
req.reset()
if err := json.Unmarshal(b, req); err != nil {
return fmt.Errorf("cannot unmarshal %q: %w", b, err)
}
// Set missing timestamps to the current time.
currentTimestamp := int64(fasttime.UnixTimestamp())
series := req.Series
for i := range series {
points := series[i].Points
for j := range points {
pt := &points[j]
if pt.Timestamp <= 0 {
pt.Timestamp = currentTimestamp
}
}
}
return nil
}
// UnmarshalProtobuf unmarshals protobuf DataDog /api/v2/series request body from b to req.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
//
// b shouldn't be modified when req is in use.
func UnmarshalProtobuf(req *Request, b []byte) error {
req.reset()
_ = b
return fmt.Errorf("unimplemented")
}
// Series represents a series item from DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Series struct {
// Do not decode Interval, since it isn't used by VictoriaMetrics
// Interval int64 `json:"interval"`
// Do not decode Metadata, since it isn't used by VictoriaMetrics
// Metadata Metadata `json:"metadata"`
// Metric is the name of the metric
Metric string `json:"metric"`
// Points points for the given metric
Points []Point `json:"points"`
Resources []Resource `json:"resources"`
// Do not decode SourceTypeName, since it isn't used by VictoriaMetrics
// SourceTypeName string `json:"source_type_name"`
Tags []string
// Do not decode Type, since it isn't used by VictoriaMetrics
// Type int `json:"type"`
// Do not decode Unit, since it isn't used by VictoriaMetrics
// Unit string
}
func (s *Series) reset() {
s.Metric = ""
points := s.Points
for i := range points {
points[i].reset()
}
s.Points = points[:0]
resources := s.Resources
for i := range resources {
resources[i].reset()
}
s.Resources = resources[:0]
tags := s.Tags
for i := range tags {
tags[i] = ""
}
s.Tags = tags[:0]
}
// Point represents a point from DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Point struct {
// Timestamp is point timestamp in seconds
Timestamp int64 `json:"timestamp"`
// Value is point value
Value float64 `json:"value"`
}
func (pt *Point) reset() {
pt.Timestamp = 0
pt.Value = 0
}
// Resource is series resource from DataDog POST request to /api/v2/series
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
type Resource struct {
Name string `json:"name"`
Type string `json:"type"`
}
func (r *Resource) reset() {
r.Name = ""
r.Type = ""
}

View file

@ -0,0 +1,77 @@
package datadogv2
import (
"reflect"
"testing"
)
func TestRequestUnmarshalJSONFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var req Request
if err := UnmarshalJSON(&req, []byte(s)); err == nil {
t.Fatalf("expecting non-nil error for Unmarshal(%q)", s)
}
}
f("")
f("foobar")
f(`{"series":123`)
f(`1234`)
f(`[]`)
}
func TestRequestUnmarshalJSONSuccess(t *testing.T) {
f := func(s string, reqExpected *Request) {
t.Helper()
var req Request
if err := UnmarshalJSON(&req, []byte(s)); err != nil {
t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
}
if !reflect.DeepEqual(&req, reqExpected) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected)
}
}
f("{}", &Request{})
f(`
{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 1636629071,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}
`, &Request{
Series: []Series{{
Metric: "system.load.1",
Points: []Point{
{
Timestamp: 1636629071,
Value: 0.7,
},
},
Resources: []Resource{
{
Name: "dummyhost",
Type: "host",
},
},
Tags: []string{
"environment:test",
},
}},
})
}

View file

@ -0,0 +1,43 @@
package datadogv2
import (
"fmt"
"testing"
)
func BenchmarkRequestUnmarshalJSON(b *testing.B) {
reqBody := []byte(`{
"series": [
{
"metric": "system.load.1",
"type": 0,
"points": [
{
"timestamp": 1636629071,
"value": 0.7
}
],
"resources": [
{
"name": "dummyhost",
"type": "host"
}
],
"tags": ["environment:test"]
}
]
}`)
b.SetBytes(int64(len(reqBody)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var req Request
for pb.Next() {
if err := UnmarshalJSON(&req, reqBody); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
if len(req.Series) != 1 {
panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series)))
}
}
})
}

View file

@ -0,0 +1,154 @@
package stream
import (
"bufio"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
// Parse parses DataDog POST request for /api/v2/series from reader and calls callback for the parsed request.
//
// callback shouldn't hold series after returning.
func Parse(r io.Reader, contentEncoding, contentType string, callback func(series []datadogv2.Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
switch contentEncoding {
case "gzip":
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped DataDog data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
case "deflate":
zlr, err := common.GetZlibReader(r)
if err != nil {
return fmt.Errorf("cannot read deflated DataDog data: %w", err)
}
defer common.PutZlibReader(zlr)
r = zlr
}
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err
}
req := getRequest()
defer putRequest(req)
var err error
switch contentType {
case "application/x-protobuf":
err = datadogv2.UnmarshalProtobuf(req, ctx.reqBuf.B)
default:
err = datadogv2.UnmarshalJSON(req, ctx.reqBuf.B)
}
if err != nil {
unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal DataDog %s request with size %d bytes: %w", contentType, len(ctx.reqBuf.B), err)
}
rows := 0
series := req.Series
for i := range series {
rows += len(series[i].Points)
if *datadogutils.SanitizeMetricName {
series[i].Metric = datadogutils.SanitizeName(series[i].Metric)
}
}
rowsRead.Add(rows)
if err := callback(series); err != nil {
return fmt.Errorf("error when processing imported data: %w", err)
}
return nil
}
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
}
func (ctx *pushCtx) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
}
func (ctx *pushCtx) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, int64(datadogutils.MaxInsertRequestSize.N)+1)
startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
if reqLen > int64(datadogutils.MaxInsertRequestSize.N) {
readErrors.Inc()
return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", datadogutils.MaxInsertRequestSize.N)
}
return nil
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadogv2"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadogv2"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadogv2"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadogv2"}`)
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadogv2.Request {
v := requestPool.Get()
if v == nil {
return &datadogv2.Request{}
}
return v.(*datadogv2.Request)
}
func putRequest(req *datadogv2.Request) {
requestPool.Put(req)
}
var requestPool sync.Pool