diff --git a/app/vminsert/datadog/request_handler.go b/app/vminsert/datadog/request_handler.go index c8dcea745..14c13bdf4 100644 --- a/app/vminsert/datadog/request_handler.go +++ b/app/vminsert/datadog/request_handler.go @@ -51,10 +51,10 @@ func insertRows(series prompbmarshal.TimeSeries) error { return nil } ctx.SortLabelsIfNeeded() - for _, sample := range series.Samples { - _, err := ctx.WriteDataPointExt(nil, ctx.Labels, sample.Timestamp, sample.Value) - if err != nil { + if _, err := ctx.WriteDataPointExt( + []byte{}, ctx.Labels, sample.Timestamp, sample.Value, + ); err != nil { return err } } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index eda6d9952..174083791 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -34,7 +34,6 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-enableMultitenantHandlers` command-line flag, which allows receiving data via [VictoriaMetrics cluster urls](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) at `vmagent` and converting [tenant ids](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) to (`vm_account_id`, `vm_project_id`) labels before sending the data to the configured `-remoteWrite.url`. See [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration). -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Datadog `/api/v2/series` and `/api/beta/sketches` ingestion protocols to vmagent/vminsert components. See this [doc](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) for examples. Thanks to @AndrewChubatiuk for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show all the dropped targets together with the reason why they are dropped at `http://vmagent:8429/service-discovery` page. Previously targets, which were dropped because of [target sharding](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) weren't displayed on this page. This could complicate service discovery debugging. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389). * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `keep_if_contains` and `drop_if_contains` relabeling actions. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) for details. diff --git a/lib/protoparser/datadog/api/series/v1/api.go b/lib/protoparser/datadog/api/series/v1/api.go index cecf98f0a..135b91193 100644 --- a/lib/protoparser/datadog/api/series/v1/api.go +++ b/lib/protoparser/datadog/api/series/v1/api.go @@ -30,10 +30,10 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu if ts <= 0 { ts = float64(currentTimestamp) } - samples = append(samples, prompbmarshal.Sample{ + samples[j] = prompbmarshal.Sample{ Timestamp: int64(ts * 1000), Value: val, - }) + } } ts := prompbmarshal.TimeSeries{ Samples: samples, diff --git a/lib/protoparser/datadog/api/series/v1/api_test.go b/lib/protoparser/datadog/api/series/v1/api_test.go index b218e0665..7573ccd42 100644 --- a/lib/protoparser/datadog/api/series/v1/api_test.go +++ b/lib/protoparser/datadog/api/series/v1/api_test.go @@ -3,8 +3,6 @@ package datadog import ( "reflect" "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) func TestRequestUnmarshalFailure(t *testing.T) { @@ -22,37 +20,22 @@ func TestRequestUnmarshalFailure(t *testing.T) { f(`[]`) } -func TestRequestExtract(t *testing.T) { - fn := func(s []byte, reqExpected *Request, samplesExp int) { - t.Helper() - req := new(Request) - if err := req.Unmarshal(s); err != nil { - t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) - } - if !reflect.DeepEqual(req, reqExpected) { - t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) - } - - var samplesTotal int - cb := func(ts prompbmarshal.TimeSeries) error { - samplesTotal += len(ts.Samples) - return nil - } - sanitizeFn := func(name string) string { - return name - } - if err := req.Extract(cb, sanitizeFn); err != nil { - t.Fatalf("error when extracting data: %s", err) - } - - if samplesTotal != samplesExp { - t.Fatalf("expected to extract %d samples; got %d", samplesExp, samplesTotal) - } - +func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) { + t.Helper() + req := new(Request) + if err := req.Unmarshal(s); err != nil { + t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) } + if !reflect.DeepEqual(req, reqExpected) { + t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) + } +} - fn([]byte("{}"), new(Request), 0) - fn([]byte(` +func TestRequestUnmarshalSuccess(t *testing.T) { + unmarshalRequestValidator( + t, []byte("{}"), new(Request), + ) + unmarshalRequestValidator(t, []byte(` { "series": [ { @@ -84,5 +67,5 @@ func TestRequestExtract(t *testing.T) { "environment:test", }, }}, - }, 1) + }) } diff --git a/lib/protoparser/datadog/api/series/v2/api_test.go b/lib/protoparser/datadog/api/series/v2/api_test.go index 4655d2e83..a40a4776a 100644 --- a/lib/protoparser/datadog/api/series/v2/api_test.go +++ b/lib/protoparser/datadog/api/series/v2/api_test.go @@ -3,8 +3,6 @@ package datadog import ( "reflect" "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) func TestRequestUnmarshalFailure(t *testing.T) { @@ -22,37 +20,22 @@ func TestRequestUnmarshalFailure(t *testing.T) { f(`[]`) } -func TestRequestExtract(t *testing.T) { - fn := func(s []byte, reqExpected *Request, samplesExp int) { - t.Helper() - req := new(Request) - if err := req.Unmarshal(s); err != nil { - t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) - } - if !reflect.DeepEqual(req, reqExpected) { - t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) - } - - var samplesTotal int - cb := func(ts prompbmarshal.TimeSeries) error { - samplesTotal += len(ts.Samples) - return nil - } - sanitizeFn := func(name string) string { - return name - } - if err := req.Extract(cb, sanitizeFn); err != nil { - t.Fatalf("error when extracting data: %s", err) - } - - if samplesTotal != samplesExp { - t.Fatalf("expected to extract %d samples; got %d", samplesExp, samplesTotal) - } - +func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) { + t.Helper() + req := new(Request) + if err := req.Unmarshal(s); err != nil { + t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err) } - fn([]byte("{}"), new(Request), 0) + if !reflect.DeepEqual(req, reqExpected) { + t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected) + } +} - fn([]byte(` +func TestRequestUnmarshalSuccess(t *testing.T) { + unmarshalRequestValidator( + t, []byte("{}"), new(Request), + ) + unmarshalRequestValidator(t, []byte(` { "series": [ { @@ -70,9 +53,6 @@ func TestRequestExtract(t *testing.T) { "points": [{ "timestamp": 1575317847, "value": 0.5 - },{ - "timestamp": 1575317848, - "value": 0.6 }], "tags": [ "environment:test" @@ -94,13 +74,10 @@ func TestRequestExtract(t *testing.T) { Points: []point{{ Timestamp: 1575317847, Value: 0.5, - }, { - Timestamp: 1575317848, - Value: 0.6, }}, Tags: []string{ "environment:test", }, }}, - }, 2) + }) } diff --git a/lib/protoparser/datadog/api/sketches/beta/api.go b/lib/protoparser/datadog/api/sketches/beta/api.go index 4c857871d..5577a6c9c 100644 --- a/lib/protoparser/datadog/api/sketches/beta/api.go +++ b/lib/protoparser/datadog/api/sketches/beta/api.go @@ -25,7 +25,7 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu for _, sketch := range r.SketchPayload.Sketches { sketchSeries := make([]prompbmarshal.TimeSeries, 5) for _, point := range sketch.Dogsketches { - timestamp := point.Ts * 1000 + timestamp := int64(point.Ts * 1000) updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{ "max": point.Max, "min": point.Min, @@ -35,7 +35,7 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu }) } for _, point := range sketch.Distributions { - timestamp := point.Ts * 1000 + timestamp := int64(point.Ts * 1000) updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{ "max": point.Max, "min": point.Min, diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadog/stream/streamparser.go index 24beb0681..1279829b7 100644 --- a/lib/protoparser/datadog/stream/streamparser.go +++ b/lib/protoparser/datadog/stream/streamparser.go @@ -70,36 +70,35 @@ func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) err apiVersion := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${version}") apiKind := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${kind}") - var ddReq datadog.Request + ddReq := getRequest() + defer putRequest(ddReq) + switch apiKind { case "series": switch apiVersion { case "v1": - ddReq = getSeriesV1Request() - defer putSeriesV1Request(ddReq) + ddReq = new(apiSeriesV1.Request) case "v2": - ddReq = getSeriesV2Request() - defer putSeriesV2Request(ddReq) + ddReq = new(apiSeriesV2.Request) default: return fmt.Errorf( - "API version %q of DataDog series endpoint is not supported", + "API version %q of Datadog series endpoint is not supported", apiVersion, ) } case "sketches": switch apiVersion { case "beta": - ddReq = getSketchesBetaRequest() - defer putSketchesBetaRequest(ddReq) + ddReq = new(apiSketchesBeta.Request) default: return fmt.Errorf( - "API version %q of DataDog sketches endpoint is not supported", + "API version %q of Datadog sketches endpoint is not supported", apiVersion, ) } default: return fmt.Errorf( - "API kind %q of DataDog API is not supported", + "API kind %q of Datadog API is not supported", apiKind, ) } @@ -183,47 +182,19 @@ func putPushCtx(ctx *pushCtx) { var pushCtxPool sync.Pool var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) -func getSeriesV1Request() *apiSeriesV1.Request { - v := seriesV1RequestPool.Get() +func getRequest() datadog.Request { + v := requestPool.Get() if v == nil { - return &apiSeriesV1.Request{} + return nil } - return v.(*apiSeriesV1.Request) + return v.(datadog.Request) } -func putSeriesV1Request(req datadog.Request) { - seriesV1RequestPool.Put(req) +func putRequest(req datadog.Request) { + requestPool.Put(req) } -var seriesV1RequestPool sync.Pool - -func getSeriesV2Request() *apiSeriesV2.Request { - v := seriesV2RequestPool.Get() - if v == nil { - return &apiSeriesV2.Request{} - } - return v.(*apiSeriesV2.Request) -} - -func putSeriesV2Request(req datadog.Request) { - seriesV2RequestPool.Put(req) -} - -var seriesV2RequestPool sync.Pool - -func getSketchesBetaRequest() *apiSketchesBeta.Request { - v := sketchesBetaRequestPool.Get() - if v == nil { - return &apiSketchesBeta.Request{} - } - return v.(*apiSketchesBeta.Request) -} - -func putSketchesBetaRequest(req datadog.Request) { - sketchesBetaRequestPool.Put(req) -} - -var sketchesBetaRequestPool sync.Pool +var requestPool sync.Pool // sanitizeName performs DataDog-compatible sanitizing for metric names //