From fb90a56de2a11b01e0b4f04fe0bf5a020bc1577a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 21 Dec 2023 18:29:10 +0200 Subject: [PATCH] app/{vminsert,vmagent}: preliminary support for /api/v2/series ingestion from new versions of DataDog Agent This commit adds only JSON support - https://docs.datadoghq.com/api/latest/metrics/#submit-metrics , while recent versions of DataDog Agent send data to /api/v2/series in undocumented Protobuf format. The support for this format will be added later. Thanks to @AndrewChubatiuk for the initial implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451 --- README.md | 24 +-- .../{datadog => datadogv1}/request_handler.go | 21 ++- app/vmagent/datadogv2/request_handler.go | 96 +++++++++++ app/vmagent/main.go | 43 ++++- .../{datadog => datadogv1}/request_handler.go | 19 +-- app/vminsert/datadogv2/request_handler.go | 88 ++++++++++ app/vminsert/main.go | 27 ++- docs/CHANGELOG.md | 1 + docs/Cluster-VictoriaMetrics.md | 3 +- docs/README.md | 24 +-- docs/Single-server-VictoriaMetrics.md | 24 +-- docs/url-examples.md | 76 ++++++++- lib/protoparser/datadogutils/datadogutils.go | 57 +++++++ .../datadogutils_test.go} | 21 ++- .../{datadog => datadogv1}/parser.go | 21 +-- .../{datadog => datadogv1}/parser_test.go | 19 +-- .../parser_timing_test.go | 6 +- .../stream/streamparser.go | 67 ++------ lib/protoparser/datadogv2/parser.go | 143 ++++++++++++++++ lib/protoparser/datadogv2/parser_test.go | 77 +++++++++ .../datadogv2/parser_timing_test.go | 43 +++++ .../datadogv2/stream/streamparser.go | 154 ++++++++++++++++++ 22 files changed, 870 insertions(+), 184 deletions(-) rename app/vmagent/{datadog => datadogv1}/request_handler.go (85%) create mode 100644 app/vmagent/datadogv2/request_handler.go rename app/vminsert/{datadog => datadogv1}/request_handler.go (83%) create mode 100644 app/vminsert/datadogv2/request_handler.go create mode 100644 lib/protoparser/datadogutils/datadogutils.go rename lib/protoparser/{datadog/stream/streamparser_test.go => datadogutils/datadogutils_test.go} (61%) rename lib/protoparser/{datadog => datadogv1}/parser.go (82%) rename lib/protoparser/{datadog => datadogv1}/parser_test.go (79%) rename lib/protoparser/{datadog => datadogv1}/parser_timing_test.go (93%) rename lib/protoparser/{datadog => datadogv1}/stream/streamparser.go (58%) create mode 100644 lib/protoparser/datadogv2/parser.go create mode 100644 lib/protoparser/datadogv2/parser_test.go create mode 100644 lib/protoparser/datadogv2/parser_timing_test.go create mode 100644 lib/protoparser/datadogv2/stream/streamparser.go diff --git a/README.md b/README.md index e81401aee..ca62eff47 100644 --- a/README.md +++ b/README.md @@ -516,10 +516,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) -or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) -at `/datadog/api/v1/series` path. +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. ### Sending metrics to VictoriaMetrics @@ -531,12 +529,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu

To configure DataDog agent via ENV variable add the following prefix: -
+
``` DD_DD_URL=http://victoriametrics:8428/datadog ``` -
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ @@ -545,14 +542,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d add the following line:
- ``` dd_url: http://victoriametrics:8428/datadog ``` -
-vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data, +[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. ### Sending metrics to Datadog and VictoriaMetrics @@ -567,12 +562,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
- ``` DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}' ``` -
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ @@ -582,19 +575,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog add the following line:
- ``` additional_endpoints: "http://victoriametrics:8428/datadog": - apikey ``` -
### Send via cURL -See how to send data to VictoriaMetrics via -[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line. +See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series). The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export). @@ -605,7 +595,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/ If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics. Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. -For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. +For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet. @@ -2580,7 +2570,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series + The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadogv1/request_handler.go similarity index 85% rename from app/vmagent/datadog/request_handler.go rename to app/vmagent/datadogv1/request_handler.go index 4cdfe1093..722cdffb3 100644 --- a/app/vmagent/datadog/request_handler.go +++ b/app/vmagent/datadogv1/request_handler.go @@ -1,4 +1,4 @@ -package datadog +package datadogv1 import ( "net/http" @@ -8,33 +8,32 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadog"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadog"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`) + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadogv1"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadogv1"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadogv1"}`) ) // InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. -// -// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } ce := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, ce, func(series []datadog.Series) error { + return stream.Parse(req.Body, ce, func(series []datadogv1.Series) error { return insertRows(at, series, extraLabels) }) } -func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, series []datadogv1.Series, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -63,7 +62,7 @@ func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmar }) } for _, tag := range ss.Tags { - name, value := datadog.SplitTag(tag) + name, value := datadogutils.SplitTag(tag) if name == "host" { name = "exported_host" } diff --git a/app/vmagent/datadogv2/request_handler.go b/app/vmagent/datadogv2/request_handler.go new file mode 100644 index 000000000..00502c174 --- /dev/null +++ b/app/vmagent/datadogv2/request_handler.go @@ -0,0 +1,96 @@ +package datadogv2 + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadogv2"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadogv2"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadogv2"}`) +) + +// InsertHandlerForHTTP processes remote write for DataDog POST /api/v2/series request. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + ct := req.Header.Get("Content-Type") + ce := req.Header.Get("Content-Encoding") + return stream.Parse(req.Body, ce, ct, func(series []datadogv2.Series) error { + return insertRows(at, series, extraLabels) + }) +} + +func insertRows(at *auth.Token, series []datadogv2.Series, extraLabels []prompbmarshal.Label) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + rowsTotal := 0 + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range series { + ss := &series[i] + rowsTotal += len(ss.Points) + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: ss.Metric, + }) + for _, rs := range ss.Resources { + labels = append(labels, prompbmarshal.Label{ + Name: rs.Type, + Value: rs.Name, + }) + } + for _, tag := range ss.Tags { + name, value := datadogutils.SplitTag(tag) + if name == "host" { + name = "exported_host" + } + labels = append(labels, prompbmarshal.Label{ + Name: name, + Value: value, + }) + } + labels = append(labels, extraLabels...) + samplesLen := len(samples) + for _, pt := range ss.Points { + samples = append(samples, prompbmarshal.Sample{ + Timestamp: pt.Timestamp * 1000, + Value: pt.Value, + }) + } + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[samplesLen:], + }) + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + if !remotewrite.TryPush(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } + rowsInserted.Add(rowsTotal) + if at != nil { + rowsTenantInserted.Get(at).Add(rowsTotal) + } + rowsPerInsert.Update(float64(rowsTotal)) + return nil +} diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 7aa82de5a..9f66d54e8 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -12,7 +12,8 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadog" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv1" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv2" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native" @@ -343,9 +344,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { fmt.Fprintf(w, `{"status":"ok"}`) return true case "/datadog/api/v1/series": - datadogWriteRequests.Inc() - if err := datadog.InsertHandlerForHTTP(nil, r); err != nil { - datadogWriteErrors.Inc() + datadogv1WriteRequests.Inc() + if err := datadogv1.InsertHandlerForHTTP(nil, r); err != nil { + datadogv1WriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/datadog/api/v2/series": + datadogv2WriteRequests.Inc() + if err := datadogv2.InsertHandlerForHTTP(nil, r); err != nil { + datadogv2WriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true } @@ -566,9 +578,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri fmt.Fprintf(w, `{"status":"ok"}`) return true case "datadog/api/v1/series": - datadogWriteRequests.Inc() - if err := datadog.InsertHandlerForHTTP(at, r); err != nil { - datadogWriteErrors.Inc() + datadogv1WriteRequests.Inc() + if err := datadogv1.InsertHandlerForHTTP(at, r); err != nil { + datadogv1WriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "datadog/api/v2/series": + datadogv2WriteRequests.Inc() + if err := datadogv2.InsertHandlerForHTTP(at, r); err != nil { + datadogv2WriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true } @@ -626,8 +648,11 @@ var ( influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`) - datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) - datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogv1WriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogv1WriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) + + datadogv2WriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`) + datadogv2WriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`) datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) diff --git a/app/vminsert/datadog/request_handler.go b/app/vminsert/datadogv1/request_handler.go similarity index 83% rename from app/vminsert/datadog/request_handler.go rename to app/vminsert/datadogv1/request_handler.go index 9717b7ef5..792a73879 100644 --- a/app/vminsert/datadog/request_handler.go +++ b/app/vminsert/datadogv1/request_handler.go @@ -1,4 +1,4 @@ -package datadog +package datadogv1 import ( "net/http" @@ -7,31 +7,30 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream" "github.com/VictoriaMetrics/metrics" ) var ( - rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadog"}`) - rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadog"}`) + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadogv1"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadogv1"}`) ) // InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. -// -// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics func InsertHandlerForHTTP(req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } ce := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, ce, func(series []parser.Series) error { + return stream.Parse(req.Body, ce, func(series []datadogv1.Series) error { return insertRows(series, extraLabels) }) } -func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error { +func insertRows(series []datadogv1.Series, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) @@ -54,7 +53,7 @@ func insertRows(series []parser.Series, extraLabels []prompbmarshal.Label) error ctx.AddLabel("device", ss.Device) } for _, tag := range ss.Tags { - name, value := parser.SplitTag(tag) + name, value := datadogutils.SplitTag(tag) if name == "host" { name = "exported_host" } diff --git a/app/vminsert/datadogv2/request_handler.go b/app/vminsert/datadogv2/request_handler.go new file mode 100644 index 000000000..80449b47b --- /dev/null +++ b/app/vminsert/datadogv2/request_handler.go @@ -0,0 +1,88 @@ +package datadogv2 + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadogv2"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadogv2"}`) +) + +// InsertHandlerForHTTP processes remote write for DataDog POST /api/v2/series request. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +func InsertHandlerForHTTP(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + ct := req.Header.Get("Content-Type") + ce := req.Header.Get("Content-Encoding") + return stream.Parse(req.Body, ce, ct, func(series []datadogv2.Series) error { + return insertRows(series, extraLabels) + }) +} + +func insertRows(series []datadogv2.Series, extraLabels []prompbmarshal.Label) error { + ctx := common.GetInsertCtx() + defer common.PutInsertCtx(ctx) + + rowsLen := 0 + for i := range series { + rowsLen += len(series[i].Points) + } + ctx.Reset(rowsLen) + rowsTotal := 0 + hasRelabeling := relabel.HasRelabeling() + for i := range series { + ss := &series[i] + rowsTotal += len(ss.Points) + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", ss.Metric) + for _, rs := range ss.Resources { + ctx.AddLabel(rs.Type, rs.Name) + } + for _, tag := range ss.Tags { + name, value := datadogutils.SplitTag(tag) + if name == "host" { + name = "exported_host" + } + ctx.AddLabel(name, value) + } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + var metricNameRaw []byte + var err error + for _, pt := range ss.Points { + timestamp := pt.Timestamp * 1000 + value := pt.Value + metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value) + if err != nil { + return err + } + } + } + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return ctx.FlushBufs() +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index a53f8216d..c32ea402d 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -13,7 +13,8 @@ import ( vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv1" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv2" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native" @@ -246,9 +247,20 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { fmt.Fprintf(w, `{"status":"ok"}`) return true case "/datadog/api/v1/series": - datadogWriteRequests.Inc() - if err := datadog.InsertHandlerForHTTP(r); err != nil { - datadogWriteErrors.Inc() + datadogv1WriteRequests.Inc() + if err := datadogv1.InsertHandlerForHTTP(r); err != nil { + datadogv1WriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(202) + fmt.Fprintf(w, `{"status":"ok"}`) + return true + case "/datadog/api/v2/series": + datadogv2WriteRequests.Inc() + if err := datadogv2.InsertHandlerForHTTP(r); err != nil { + datadogv2WriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true } @@ -371,8 +383,11 @@ var ( influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/influx/query", protocol="influx"}`) - datadogWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) - datadogWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogv1WriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`) + datadogv1WriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`) + + datadogv2WriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`) + datadogv2WriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`) datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 95d865a7c..6e0ca7541 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,6 +28,7 @@ The sandbox cluster installation is running under the constant load generated by ## tip +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). JSON protocol is supperted right now. Protobuf protocol will be supported later. See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags: - `-datasource.oauth2.endpointParams` for `-datasource.url` diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 83fd78292..c9fb35bd7 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -363,7 +363,8 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr - `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-csv-data) for details. - `prometheus/api/v1/import/prometheus` - for importing data in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md). This endpoint also supports [Pushgateway protocol](https://github.com/prometheus/pushgateway#url). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-import-data-in-prometheus-exposition-format) for details. - `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry). - - `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. + - `datadog/api/v1/series` - for ingesting data with DataDog submit metrics API v1. See [these docs](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) for details. + - `datadog/api/v2/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. - `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details. - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. diff --git a/docs/README.md b/docs/README.md index 0a48307a5..758ae2274 100644 --- a/docs/README.md +++ b/docs/README.md @@ -519,10 +519,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) -or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) -at `/datadog/api/v1/series` path. +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. ### Sending metrics to VictoriaMetrics @@ -534,12 +532,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu

To configure DataDog agent via ENV variable add the following prefix: -
+
``` DD_DD_URL=http://victoriametrics:8428/datadog ``` -
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ @@ -548,14 +545,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d add the following line:
- ``` dd_url: http://victoriametrics:8428/datadog ``` -
-vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data, +[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. ### Sending metrics to Datadog and VictoriaMetrics @@ -570,12 +565,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
- ``` DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}' ``` -
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ @@ -585,19 +578,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog add the following line:
- ``` additional_endpoints: "http://victoriametrics:8428/datadog": - apikey ``` -
### Send via cURL -See how to send data to VictoriaMetrics via -[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line. +See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series). The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export). @@ -608,7 +598,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/ If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics. Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. -For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. +For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet. @@ -2583,7 +2573,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series + The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index fed6433f4..5e7bbcd0d 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -527,10 +527,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) -or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) -at `/datadog/api/v1/series` path. +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. ### Sending metrics to VictoriaMetrics @@ -542,12 +540,11 @@ or via [configuration file](https://docs.datadoghq.com/agent/guide/agent-configu

To configure DataDog agent via ENV variable add the following prefix: -
+
``` DD_DD_URL=http://victoriametrics:8428/datadog ``` -
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ @@ -556,14 +553,12 @@ To configure DataDog agent via [configuration file](https://github.com/DataDog/d add the following line:
- ``` dd_url: http://victoriametrics:8428/datadog ``` -
-vmagent also can accept Datadog metrics format. Depending on where vmagent will forward data, +[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. ### Sending metrics to Datadog and VictoriaMetrics @@ -578,12 +573,10 @@ sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `ad Run DataDog using the following ENV variable with VictoriaMetrics as additional metrics receiver:
- ``` DD_ADDITIONAL_ENDPOINTS='{\"http://victoriametrics:8428/datadog\": [\"apikey\"]}' ``` -
_Choose correct URL for VictoriaMetrics [here](https://docs.victoriametrics.com/url-examples.html#datadog)._ @@ -593,19 +586,16 @@ To configure DataDog Dual Shipping via [configuration file](https://docs.datadog add the following line:
- ``` additional_endpoints: "http://victoriametrics:8428/datadog": - apikey ``` -
### Send via cURL -See how to send data to VictoriaMetrics via -[DataDog "submit metrics"](https://docs.victoriametrics.com/url-examples.html#datadogapiv1series) from command line. +See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series). The imported data can be read via [export API](https://docs.victoriametrics.com/url-examples.html#apiv1export). @@ -616,7 +606,7 @@ according to [DataDog metric naming recommendations](https://docs.datadoghq.com/ If you need accepting metric names as is without sanitizing, then pass `-datadog.sanitizeMetricName=false` command-line flag to VictoriaMetrics. Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. -For example, `/datadog/api/v1/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. +For example, `/datadog/api/v2/series?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. DataDog agent sends the [configured tags](https://docs.datadoghq.com/getting_started/tagging/) to undocumented endpoint - `/datadog/intake`. This endpoint isn't supported by VictoriaMetrics yet. @@ -2591,7 +2581,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -csvTrimTimestamp duration Trim timestamps when importing csv data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -datadog.maxInsertRequestSize size - The maximum size in bytes of a single DataDog POST request to /api/v1/series + The maximum size in bytes of a single DataDog POST request to /datadog/api/v2/series Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) -datadog.sanitizeMetricName Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true) diff --git a/docs/url-examples.md b/docs/url-examples.md index bc1a9983e..35d3f55fe 100644 --- a/docs/url-examples.md +++ b/docs/url-examples.md @@ -473,7 +473,7 @@ http://vminsert:8480/insert/0/datadog ### /datadog/api/v1/series -**Imports data in DataDog format into VictoriaMetrics** +**Imports data in DataDog v1 format into VictoriaMetrics** Single-node VictoriaMetrics:
@@ -531,7 +531,79 @@ echo ' Additional information: -* [How to send data from datadog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) +* [How to send data from DataDog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) +* [URL format for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) + + +### /datadog/api/v2/series + +**Imports data in [DataDog v2](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) format into VictoriaMetrics** + +Single-node VictoriaMetrics: +
+ +```console +echo ' +{ + "series": [ + { + "metric": "system.load.1", + "type": 0, + "points": [ + { + "timestamp": 0, + "value": 0.7 + } + ], + "resources": [ + { + "name": "dummyhost", + "type": "host" + } + ], + "tags": ["environment:test"] + } + ] +} +' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:8428/datadog/api/v2/series +``` + +
+ +Cluster version of VictoriaMetrics: +
+ +```console +echo ' +{ + "series": [ + { + "metric": "system.load.1", + "type": 0, + "points": [ + { + "timestamp": 0, + "value": 0.7 + } + ], + "resources": [ + { + "name": "dummyhost", + "type": "host" + } + ], + "tags": ["environment:test"] + } + ] +} +' | curl -X POST -H 'Content-Type: application/json' --data-binary @- 'http://:8480/insert/0/datadog/api/v2/series' +``` + +
+ +Additional information: + +* [How to send data from DataDog agent](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) * [URL format for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) ### /federate diff --git a/lib/protoparser/datadogutils/datadogutils.go b/lib/protoparser/datadogutils/datadogutils.go new file mode 100644 index 000000000..f3a71a425 --- /dev/null +++ b/lib/protoparser/datadogutils/datadogutils.go @@ -0,0 +1,57 @@ +package datadogutils + +import ( + "flag" + "regexp" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" +) + +var ( + // MaxInsertRequestSize is the maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics + MaxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v2/series") + + // SanitizeMetricName controls sanitizing metric names ingested via DataDog protocols. + // + // If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply: + // https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics + // But there's some hidden behaviour. In addition to what it states in the docs, the following is also done: + // - Consecutive underscores are replaced with just one underscore + // - Underscore immediately before or after a dot are removed + SanitizeMetricName = flag.Bool("datadog.sanitizeMetricName", true, "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at "+ + "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics") +) + +// SplitTag splits DataDog tag into tag name and value. +// +// See https://docs.datadoghq.com/getting_started/tagging/#define-tags +func SplitTag(tag string) (string, string) { + n := strings.IndexByte(tag, ':') + if n < 0 { + // No tag value. + return tag, "no_label_value" + } + return tag[:n], tag[n+1:] +} + +// SanitizeName performs DataDog-compatible sanitizing for metric names +// +// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics +func SanitizeName(name string) string { + return namesSanitizer.Transform(name) +} + +var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string { + s = unsupportedDatadogChars.ReplaceAllString(s, "_") + s = multiUnderscores.ReplaceAllString(s, "_") + s = underscoresWithDots.ReplaceAllString(s, ".") + return s +}) + +var ( + unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`) + multiUnderscores = regexp.MustCompile(`_+`) + underscoresWithDots = regexp.MustCompile(`_?\._?`) +) diff --git a/lib/protoparser/datadog/stream/streamparser_test.go b/lib/protoparser/datadogutils/datadogutils_test.go similarity index 61% rename from lib/protoparser/datadog/stream/streamparser_test.go rename to lib/protoparser/datadogutils/datadogutils_test.go index 19a52edf4..05575530a 100644 --- a/lib/protoparser/datadog/stream/streamparser_test.go +++ b/lib/protoparser/datadogutils/datadogutils_test.go @@ -1,13 +1,30 @@ -package stream +package datadogutils import ( "testing" ) +func TestSplitTag(t *testing.T) { + f := func(s, nameExpected, valueExpected string) { + t.Helper() + name, value := SplitTag(s) + if name != nameExpected { + t.Fatalf("unexpected name obtained from %q; got %q; want %q", s, name, nameExpected) + } + if value != valueExpected { + t.Fatalf("unexpected value obtained from %q; got %q; want %q", s, value, valueExpected) + } + } + f("", "", "no_label_value") + f("foo", "foo", "no_label_value") + f("foo:bar", "foo", "bar") + f(":bar", "", "bar") +} + func TestSanitizeName(t *testing.T) { f := func(s, resultExpected string) { t.Helper() - result := sanitizeName(s) + result := SanitizeName(s) if result != resultExpected { t.Fatalf("unexpected result for sanitizeName(%q); got\n%q\nwant\n%q", s, result, resultExpected) } diff --git a/lib/protoparser/datadog/parser.go b/lib/protoparser/datadogv1/parser.go similarity index 82% rename from lib/protoparser/datadog/parser.go rename to lib/protoparser/datadogv1/parser.go index c32a9c99d..ba0d69071 100644 --- a/lib/protoparser/datadog/parser.go +++ b/lib/protoparser/datadogv1/parser.go @@ -1,28 +1,13 @@ -package datadog +package datadogv1 import ( "encoding/json" "fmt" - "strings" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) -// SplitTag splits DataDog tag into tag name and value. -// -// See https://docs.datadoghq.com/getting_started/tagging/#define-tags -func SplitTag(tag string) (string, string) { - n := strings.IndexByte(tag, ':') - if n < 0 { - // No tag value. - return tag, "no_label_value" - } - return tag[:n], tag[n+1:] -} - // Request represents DataDog POST request to /api/v1/series -// -// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics type Request struct { Series []Series `json:"series"` } @@ -41,8 +26,6 @@ func (req *Request) reset() { // Unmarshal unmarshals DataDog /api/v1/series request body from b to req. // -// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics -// // b shouldn't be modified when req is in use. func (req *Request) Unmarshal(b []byte) error { req.reset() @@ -64,8 +47,6 @@ func (req *Request) Unmarshal(b []byte) error { } // Series represents a series item from DataDog POST request to /api/v1/series -// -// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics type Series struct { Metric string `json:"metric"` Host string `json:"host"` diff --git a/lib/protoparser/datadog/parser_test.go b/lib/protoparser/datadogv1/parser_test.go similarity index 79% rename from lib/protoparser/datadog/parser_test.go rename to lib/protoparser/datadogv1/parser_test.go index 5ea720331..d359f616a 100644 --- a/lib/protoparser/datadog/parser_test.go +++ b/lib/protoparser/datadogv1/parser_test.go @@ -1,27 +1,10 @@ -package datadog +package datadogv1 import ( "reflect" "testing" ) -func TestSplitTag(t *testing.T) { - f := func(s, nameExpected, valueExpected string) { - t.Helper() - name, value := SplitTag(s) - if name != nameExpected { - t.Fatalf("unexpected name obtained from %q; got %q; want %q", s, name, nameExpected) - } - if value != valueExpected { - t.Fatalf("unexpected value obtained from %q; got %q; want %q", s, value, valueExpected) - } - } - f("", "", "no_label_value") - f("foo", "foo", "no_label_value") - f("foo:bar", "foo", "bar") - f(":bar", "", "bar") -} - func TestRequestUnmarshalMissingHost(t *testing.T) { // This tests https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432 req := Request{ diff --git a/lib/protoparser/datadog/parser_timing_test.go b/lib/protoparser/datadogv1/parser_timing_test.go similarity index 93% rename from lib/protoparser/datadog/parser_timing_test.go rename to lib/protoparser/datadogv1/parser_timing_test.go index f3c2b568a..6964305dc 100644 --- a/lib/protoparser/datadog/parser_timing_test.go +++ b/lib/protoparser/datadogv1/parser_timing_test.go @@ -1,4 +1,4 @@ -package datadog +package datadogv1 import ( "fmt" @@ -12,10 +12,10 @@ func BenchmarkRequestUnmarshal(b *testing.B) { "host": "test.example.com", "interval": 20, "metric": "system.load.1", - "points": [ + "points": [[ 1575317847, 0.5 - ], + ]], "tags": [ "environment:test" ], diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadogv1/stream/streamparser.go similarity index 58% rename from lib/protoparser/datadog/stream/streamparser.go rename to lib/protoparser/datadogv1/stream/streamparser.go index d79f0b84a..d1ce9f7d4 100644 --- a/lib/protoparser/datadog/stream/streamparser.go +++ b/lib/protoparser/datadogv1/stream/streamparser.go @@ -2,39 +2,24 @@ package stream import ( "bufio" - "flag" "fmt" "io" - "regexp" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) -var ( - // The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics - maxInsertRequestSize = flagutil.NewBytes("datadog.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single DataDog POST request to /api/v1/series") - - // If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply: - // https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics - // But there's some hidden behaviour. In addition to what it states in the docs, the following is also done: - // - Consecutive underscores are replaced with just one underscore - // - Underscore immediately before or after a dot are removed - sanitizeMetricName = flag.Bool("datadog.sanitizeMetricName", true, "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at "+ - "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics") -) - // Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. // // callback shouldn't hold series after returning. -func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error { +func Parse(r io.Reader, contentEncoding string, callback func(series []datadogv1.Series) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -70,8 +55,8 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.S series := req.Series for i := range series { rows += len(series[i].Points) - if *sanitizeMetricName { - series[i].Metric = sanitizeName(series[i].Metric) + if *datadogutils.SanitizeMetricName { + series[i].Metric = datadogutils.SanitizeName(series[i].Metric) } } rowsRead.Add(rows) @@ -94,25 +79,25 @@ func (ctx *pushCtx) reset() { func (ctx *pushCtx) Read() error { readCalls.Inc() - lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1) + lr := io.LimitReader(ctx.br, int64(datadogutils.MaxInsertRequestSize.N)+1) startTime := fasttime.UnixTimestamp() reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) } - if reqLen > int64(maxInsertRequestSize.N) { + if reqLen > int64(datadogutils.MaxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) + return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", datadogutils.MaxInsertRequestSize.N) } return nil } var ( - readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadog"}`) - readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadog"}`) - rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadog"}`) - unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadog"}`) + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadogv1"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadogv1"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadogv1"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadogv1"}`) ) func getPushCtx(r io.Reader) *pushCtx { @@ -144,36 +129,16 @@ func putPushCtx(ctx *pushCtx) { var pushCtxPool sync.Pool var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) -func getRequest() *datadog.Request { +func getRequest() *datadogv1.Request { v := requestPool.Get() if v == nil { - return &datadog.Request{} + return &datadogv1.Request{} } - return v.(*datadog.Request) + return v.(*datadogv1.Request) } -func putRequest(req *datadog.Request) { +func putRequest(req *datadogv1.Request) { requestPool.Put(req) } var requestPool sync.Pool - -// sanitizeName performs DataDog-compatible sanitizing for metric names -// -// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics -func sanitizeName(name string) string { - return namesSanitizer.Transform(name) -} - -var namesSanitizer = bytesutil.NewFastStringTransformer(func(s string) string { - s = unsupportedDatadogChars.ReplaceAllString(s, "_") - s = multiUnderscores.ReplaceAllString(s, "_") - s = underscoresWithDots.ReplaceAllString(s, ".") - return s -}) - -var ( - unsupportedDatadogChars = regexp.MustCompile(`[^0-9a-zA-Z_\.]+`) - multiUnderscores = regexp.MustCompile(`_+`) - underscoresWithDots = regexp.MustCompile(`_?\._?`) -) diff --git a/lib/protoparser/datadogv2/parser.go b/lib/protoparser/datadogv2/parser.go new file mode 100644 index 000000000..1f9b6e2f6 --- /dev/null +++ b/lib/protoparser/datadogv2/parser.go @@ -0,0 +1,143 @@ +package datadogv2 + +import ( + "encoding/json" + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// Request represents DataDog POST request to /api/v2/series +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type Request struct { + Series []Series `json:"series"` +} + +func (req *Request) reset() { + // recursively reset all the fields in req in order to avoid field value + // re-use in json.Unmarshal() when the corresponding field is missing + // in the unmarshaled JSON. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432 + series := req.Series + for i := range series { + series[i].reset() + } + req.Series = series[:0] +} + +// UnmarshalJSON unmarshals JSON DataDog /api/v2/series request body from b to req. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +// +// b shouldn't be modified when req is in use. +func UnmarshalJSON(req *Request, b []byte) error { + req.reset() + if err := json.Unmarshal(b, req); err != nil { + return fmt.Errorf("cannot unmarshal %q: %w", b, err) + } + // Set missing timestamps to the current time. + currentTimestamp := int64(fasttime.UnixTimestamp()) + series := req.Series + for i := range series { + points := series[i].Points + for j := range points { + pt := &points[j] + if pt.Timestamp <= 0 { + pt.Timestamp = currentTimestamp + } + } + } + return nil +} + +// UnmarshalProtobuf unmarshals protobuf DataDog /api/v2/series request body from b to req. +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +// +// b shouldn't be modified when req is in use. +func UnmarshalProtobuf(req *Request, b []byte) error { + req.reset() + _ = b + return fmt.Errorf("unimplemented") +} + +// Series represents a series item from DataDog POST request to /api/v2/series +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type Series struct { + // Do not decode Interval, since it isn't used by VictoriaMetrics + // Interval int64 `json:"interval"` + + // Do not decode Metadata, since it isn't used by VictoriaMetrics + // Metadata Metadata `json:"metadata"` + + // Metric is the name of the metric + Metric string `json:"metric"` + + // Points points for the given metric + Points []Point `json:"points"` + Resources []Resource `json:"resources"` + + // Do not decode SourceTypeName, since it isn't used by VictoriaMetrics + // SourceTypeName string `json:"source_type_name"` + + Tags []string + + // Do not decode Type, since it isn't used by VictoriaMetrics + // Type int `json:"type"` + + // Do not decode Unit, since it isn't used by VictoriaMetrics + // Unit string +} + +func (s *Series) reset() { + s.Metric = "" + + points := s.Points + for i := range points { + points[i].reset() + } + s.Points = points[:0] + + resources := s.Resources + for i := range resources { + resources[i].reset() + } + s.Resources = resources[:0] + + tags := s.Tags + for i := range tags { + tags[i] = "" + } + s.Tags = tags[:0] +} + +// Point represents a point from DataDog POST request to /api/v2/series +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type Point struct { + // Timestamp is point timestamp in seconds + Timestamp int64 `json:"timestamp"` + + // Value is point value + Value float64 `json:"value"` +} + +func (pt *Point) reset() { + pt.Timestamp = 0 + pt.Value = 0 +} + +// Resource is series resource from DataDog POST request to /api/v2/series +// +// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics +type Resource struct { + Name string `json:"name"` + Type string `json:"type"` +} + +func (r *Resource) reset() { + r.Name = "" + r.Type = "" +} diff --git a/lib/protoparser/datadogv2/parser_test.go b/lib/protoparser/datadogv2/parser_test.go new file mode 100644 index 000000000..c63accd0b --- /dev/null +++ b/lib/protoparser/datadogv2/parser_test.go @@ -0,0 +1,77 @@ +package datadogv2 + +import ( + "reflect" + "testing" +) + +func TestRequestUnmarshalJSONFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var req Request + if err := UnmarshalJSON(&req, []byte(s)); err == nil { + t.Fatalf("expecting non-nil error for Unmarshal(%q)", s) + } + } + f("") + f("foobar") + f(`{"series":123`) + f(`1234`) + f(`[]`) +} + +func TestRequestUnmarshalJSONSuccess(t *testing.T) { + f := func(s string, reqExpected *Request) { + t.Helper() + var req Request + if err := UnmarshalJSON(&req, []byte(s)); err != nil { + t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) + } + if !reflect.DeepEqual(&req, reqExpected) { + t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &req, reqExpected) + } + } + f("{}", &Request{}) + f(` +{ + "series": [ + { + "metric": "system.load.1", + "type": 0, + "points": [ + { + "timestamp": 1636629071, + "value": 0.7 + } + ], + "resources": [ + { + "name": "dummyhost", + "type": "host" + } + ], + "tags": ["environment:test"] + } + ] +} +`, &Request{ + Series: []Series{{ + Metric: "system.load.1", + Points: []Point{ + { + Timestamp: 1636629071, + Value: 0.7, + }, + }, + Resources: []Resource{ + { + Name: "dummyhost", + Type: "host", + }, + }, + Tags: []string{ + "environment:test", + }, + }}, + }) +} diff --git a/lib/protoparser/datadogv2/parser_timing_test.go b/lib/protoparser/datadogv2/parser_timing_test.go new file mode 100644 index 000000000..83b2c02c6 --- /dev/null +++ b/lib/protoparser/datadogv2/parser_timing_test.go @@ -0,0 +1,43 @@ +package datadogv2 + +import ( + "fmt" + "testing" +) + +func BenchmarkRequestUnmarshalJSON(b *testing.B) { + reqBody := []byte(`{ + "series": [ + { + "metric": "system.load.1", + "type": 0, + "points": [ + { + "timestamp": 1636629071, + "value": 0.7 + } + ], + "resources": [ + { + "name": "dummyhost", + "type": "host" + } + ], + "tags": ["environment:test"] + } + ] +}`) + b.SetBytes(int64(len(reqBody))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var req Request + for pb.Next() { + if err := UnmarshalJSON(&req, reqBody); err != nil { + panic(fmt.Errorf("unexpected error: %w", err)) + } + if len(req.Series) != 1 { + panic(fmt.Errorf("unexpected number of series unmarshaled: got %d; want 4", len(req.Series))) + } + } + }) +} diff --git a/lib/protoparser/datadogv2/stream/streamparser.go b/lib/protoparser/datadogv2/stream/streamparser.go new file mode 100644 index 000000000..0fe06269d --- /dev/null +++ b/lib/protoparser/datadogv2/stream/streamparser.go @@ -0,0 +1,154 @@ +package stream + +import ( + "bufio" + "fmt" + "io" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +// Parse parses DataDog POST request for /api/v2/series from reader and calls callback for the parsed request. +// +// callback shouldn't hold series after returning. +func Parse(r io.Reader, contentEncoding, contentType string, callback func(series []datadogv2.Series) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + + switch contentEncoding { + case "gzip": + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped DataDog data: %w", err) + } + defer common.PutGzipReader(zr) + r = zr + case "deflate": + zlr, err := common.GetZlibReader(r) + if err != nil { + return fmt.Errorf("cannot read deflated DataDog data: %w", err) + } + defer common.PutZlibReader(zlr) + r = zlr + } + + ctx := getPushCtx(r) + defer putPushCtx(ctx) + if err := ctx.Read(); err != nil { + return err + } + req := getRequest() + defer putRequest(req) + + var err error + switch contentType { + case "application/x-protobuf": + err = datadogv2.UnmarshalProtobuf(req, ctx.reqBuf.B) + default: + err = datadogv2.UnmarshalJSON(req, ctx.reqBuf.B) + } + if err != nil { + unmarshalErrors.Inc() + return fmt.Errorf("cannot unmarshal DataDog %s request with size %d bytes: %w", contentType, len(ctx.reqBuf.B), err) + } + + rows := 0 + series := req.Series + for i := range series { + rows += len(series[i].Points) + if *datadogutils.SanitizeMetricName { + series[i].Metric = datadogutils.SanitizeName(series[i].Metric) + } + } + rowsRead.Add(rows) + + if err := callback(series); err != nil { + return fmt.Errorf("error when processing imported data: %w", err) + } + return nil +} + +type pushCtx struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer +} + +func (ctx *pushCtx) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() +} + +func (ctx *pushCtx) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, int64(datadogutils.MaxInsertRequestSize.N)+1) + startTime := fasttime.UnixTimestamp() + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + if reqLen > int64(datadogutils.MaxInsertRequestSize.N) { + readErrors.Inc() + return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", datadogutils.MaxInsertRequestSize.N) + } + return nil +} + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadogv2"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadogv2"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadogv2"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadogv2"}`) +) + +func getPushCtx(r io.Reader) *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), + } + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) + +func getRequest() *datadogv2.Request { + v := requestPool.Get() + if v == nil { + return &datadogv2.Request{} + } + return v.(*datadogv2.Request) +} + +func putRequest(req *datadogv2.Request) { + requestPool.Put(req) +} + +var requestPool sync.Pool