From 61db92cdc7630cfa1d741a023e6958d23e819ae4 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Tue, 5 Dec 2023 02:29:00 +0200
Subject: [PATCH] Revert "lib/protoparser/datadog: follow-up after
 543f218fe96574b9b2189c8350bb09afa349e3bb"

This reverts commit 73d18fbc7a39a87c04456aac7ff0303e0784cd19.

Reason for revert: https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094#issuecomment-1839789080
---
 docs/CHANGELOG.md                             |  1 -
 lib/protoparser/datadog/api/series/v1/api.go  |  4 +-
 .../datadog/api/series/v1/api_test.go         | 47 +++++---------
 .../datadog/api/series/v2/api_test.go         | 53 +++++-----------
 .../datadog/api/sketches/beta/api.go          |  4 +-
 .../datadog/stream/streamparser.go            | 61 +++++--------------
 6 files changed, 50 insertions(+), 120 deletions(-)

diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 0b82d1b684..1b7778f8e1 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -34,7 +34,6 @@ The sandbox cluster installation is running under the constant load generated by
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-enableMultitenantHandlers` command-line flag, which allows receiving data via [VictoriaMetrics cluster urls](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) at `vmagent` and converting [tenant ids](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) to (`vm_account_id`, `vm_project_id`) labels before sending the data to the configured `-remoteWrite.url`. See [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy) for details.
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for reading and writing samples via [Google PubSub](https://cloud.google.com/pubsub). See [these docs](https://docs.victoriametrics.com/vmagent.html#google-pubsub-integration).
-* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Datadog `/api/v2/series` and `/api/beta/sketches` ingestion protocols to vmagent/vminsert components. See this [doc](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) for examples. Thanks to @AndrewChubatiuk for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show all the dropped targets together with the reason why they are dropped at `http://vmagent:8429/service-discovery` page. Previously targets, which were dropped because of [target sharding](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) weren't displayed on this page. This could complicate service discovery debugging. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389).
 * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `keep_if_contains` and `drop_if_contains` relabeling actions. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) for details.
diff --git a/lib/protoparser/datadog/api/series/v1/api.go b/lib/protoparser/datadog/api/series/v1/api.go
index cecf98f0a8..135b911934 100644
--- a/lib/protoparser/datadog/api/series/v1/api.go
+++ b/lib/protoparser/datadog/api/series/v1/api.go
@@ -30,10 +30,10 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu
 			if ts <= 0 {
 				ts = float64(currentTimestamp)
 			}
-			samples = append(samples, prompbmarshal.Sample{
+			samples[j] = prompbmarshal.Sample{
 				Timestamp: int64(ts * 1000),
 				Value:     val,
-			})
+			}
 		}
 		ts := prompbmarshal.TimeSeries{
 			Samples: samples,
diff --git a/lib/protoparser/datadog/api/series/v1/api_test.go b/lib/protoparser/datadog/api/series/v1/api_test.go
index b218e0665a..7573ccd428 100644
--- a/lib/protoparser/datadog/api/series/v1/api_test.go
+++ b/lib/protoparser/datadog/api/series/v1/api_test.go
@@ -3,8 +3,6 @@ package datadog
 import (
 	"reflect"
 	"testing"
-
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
 )
 
 func TestRequestUnmarshalFailure(t *testing.T) {
@@ -22,37 +20,22 @@ func TestRequestUnmarshalFailure(t *testing.T) {
 	f(`[]`)
 }
 
-func TestRequestExtract(t *testing.T) {
-	fn := func(s []byte, reqExpected *Request, samplesExp int) {
-		t.Helper()
-		req := new(Request)
-		if err := req.Unmarshal(s); err != nil {
-			t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
-		}
-		if !reflect.DeepEqual(req, reqExpected) {
-			t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
-		}
-
-		var samplesTotal int
-		cb := func(ts prompbmarshal.TimeSeries) error {
-			samplesTotal += len(ts.Samples)
-			return nil
-		}
-		sanitizeFn := func(name string) string {
-			return name
-		}
-		if err := req.Extract(cb, sanitizeFn); err != nil {
-			t.Fatalf("error when extracting data: %s", err)
-		}
-
-		if samplesTotal != samplesExp {
-			t.Fatalf("expected to extract %d samples; got %d", samplesExp, samplesTotal)
-		}
-
+func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
+	t.Helper()
+	req := new(Request)
+	if err := req.Unmarshal(s); err != nil {
+		t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
 	}
+	if !reflect.DeepEqual(req, reqExpected) {
+		t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
+	}
+}
 
-	fn([]byte("{}"), new(Request), 0)
-	fn([]byte(`
+func TestRequestUnmarshalSuccess(t *testing.T) {
+	unmarshalRequestValidator(
+		t, []byte("{}"), new(Request),
+	)
+	unmarshalRequestValidator(t, []byte(`
 {
   "series": [
     {
@@ -84,5 +67,5 @@ func TestRequestExtract(t *testing.T) {
 				"environment:test",
 			},
 		}},
-	}, 1)
+	})
 }
diff --git a/lib/protoparser/datadog/api/series/v2/api_test.go b/lib/protoparser/datadog/api/series/v2/api_test.go
index 4655d2e834..a40a4776ad 100644
--- a/lib/protoparser/datadog/api/series/v2/api_test.go
+++ b/lib/protoparser/datadog/api/series/v2/api_test.go
@@ -3,8 +3,6 @@ package datadog
 import (
 	"reflect"
 	"testing"
-
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
 )
 
 func TestRequestUnmarshalFailure(t *testing.T) {
@@ -22,37 +20,22 @@ func TestRequestUnmarshalFailure(t *testing.T) {
 	f(`[]`)
 }
 
-func TestRequestExtract(t *testing.T) {
-	fn := func(s []byte, reqExpected *Request, samplesExp int) {
-		t.Helper()
-		req := new(Request)
-		if err := req.Unmarshal(s); err != nil {
-			t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
-		}
-		if !reflect.DeepEqual(req, reqExpected) {
-			t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
-		}
-
-		var samplesTotal int
-		cb := func(ts prompbmarshal.TimeSeries) error {
-			samplesTotal += len(ts.Samples)
-			return nil
-		}
-		sanitizeFn := func(name string) string {
-			return name
-		}
-		if err := req.Extract(cb, sanitizeFn); err != nil {
-			t.Fatalf("error when extracting data: %s", err)
-		}
-
-		if samplesTotal != samplesExp {
-			t.Fatalf("expected to extract %d samples; got %d", samplesExp, samplesTotal)
-		}
-
+func unmarshalRequestValidator(t *testing.T, s []byte, reqExpected *Request) {
+	t.Helper()
+	req := new(Request)
+	if err := req.Unmarshal(s); err != nil {
+		t.Fatalf("unexpected error in Unmarshal(%q): %s", s, err)
 	}
-	fn([]byte("{}"), new(Request), 0)
+	if !reflect.DeepEqual(req, reqExpected) {
+		t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", req, reqExpected)
+	}
+}
 
-	fn([]byte(`
+func TestRequestUnmarshalSuccess(t *testing.T) {
+	unmarshalRequestValidator(
+		t, []byte("{}"), new(Request),
+	)
+	unmarshalRequestValidator(t, []byte(`
 {
   "series": [
     {
@@ -70,9 +53,6 @@ func TestRequestExtract(t *testing.T) {
 			"points": [{
 				"timestamp": 1575317847,
 				"value": 0.5
-			},{
-				"timestamp": 1575317848,
-				"value": 0.6
 			}],
       "tags": [
         "environment:test"
@@ -94,13 +74,10 @@ func TestRequestExtract(t *testing.T) {
 			Points: []point{{
 				Timestamp: 1575317847,
 				Value:     0.5,
-			}, {
-				Timestamp: 1575317848,
-				Value:     0.6,
 			}},
 			Tags: []string{
 				"environment:test",
 			},
 		}},
-	}, 2)
+	})
 }
diff --git a/lib/protoparser/datadog/api/sketches/beta/api.go b/lib/protoparser/datadog/api/sketches/beta/api.go
index 4c857871d0..5577a6c9c2 100644
--- a/lib/protoparser/datadog/api/sketches/beta/api.go
+++ b/lib/protoparser/datadog/api/sketches/beta/api.go
@@ -25,7 +25,7 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu
 	for _, sketch := range r.SketchPayload.Sketches {
 		sketchSeries := make([]prompbmarshal.TimeSeries, 5)
 		for _, point := range sketch.Dogsketches {
-			timestamp := point.Ts * 1000
+			timestamp := int64(point.Ts * 1000)
 			updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
 				"max": point.Max,
 				"min": point.Min,
@@ -35,7 +35,7 @@ func (r *Request) Extract(fn func(prompbmarshal.TimeSeries) error, sanitizeFn fu
 			})
 		}
 		for _, point := range sketch.Distributions {
-			timestamp := point.Ts * 1000
+			timestamp := int64(point.Ts * 1000)
 			updateSeries(sketchSeries, sanitizeFn(sketch.Metric), timestamp, map[string]float64{
 				"max": point.Max,
 				"min": point.Min,
diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadog/stream/streamparser.go
index 24beb06814..1279829b75 100644
--- a/lib/protoparser/datadog/stream/streamparser.go
+++ b/lib/protoparser/datadog/stream/streamparser.go
@@ -70,36 +70,35 @@ func Parse(req *http.Request, callback func(prompbmarshal.TimeSeries) error) err
 	apiVersion := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${version}")
 	apiKind := insertApisVersionRegex.ReplaceAllString(req.URL.Path, "${kind}")
 
-	var ddReq datadog.Request
+	ddReq := getRequest()
+	defer putRequest(ddReq)
+
 	switch apiKind {
 	case "series":
 		switch apiVersion {
 		case "v1":
-			ddReq = getSeriesV1Request()
-			defer putSeriesV1Request(ddReq)
+			ddReq = new(apiSeriesV1.Request)
 		case "v2":
-			ddReq = getSeriesV2Request()
-			defer putSeriesV2Request(ddReq)
+			ddReq = new(apiSeriesV2.Request)
 		default:
 			return fmt.Errorf(
-				"API version %q of DataDog series endpoint is not supported",
+				"API version %q of Datadog series endpoint is not supported",
 				apiVersion,
 			)
 		}
 	case "sketches":
 		switch apiVersion {
 		case "beta":
-			ddReq = getSketchesBetaRequest()
-			defer putSketchesBetaRequest(ddReq)
+			ddReq = new(apiSketchesBeta.Request)
 		default:
 			return fmt.Errorf(
-				"API version %q of DataDog sketches endpoint is not supported",
+				"API version %q of Datadog sketches endpoint is not supported",
 				apiVersion,
 			)
 		}
 	default:
 		return fmt.Errorf(
-			"API kind %q of DataDog API is not supported",
+			"API kind %q of Datadog API is not supported",
 			apiKind,
 		)
 	}
@@ -183,47 +182,19 @@ func putPushCtx(ctx *pushCtx) {
 var pushCtxPool sync.Pool
 var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
 
-func getSeriesV1Request() *apiSeriesV1.Request {
-	v := seriesV1RequestPool.Get()
+func getRequest() datadog.Request {
+	v := requestPool.Get()
 	if v == nil {
-		return &apiSeriesV1.Request{}
+		return nil
 	}
-	return v.(*apiSeriesV1.Request)
+	return v.(datadog.Request)
 }
 
-func putSeriesV1Request(req datadog.Request) {
-	seriesV1RequestPool.Put(req)
+func putRequest(req datadog.Request) {
+	requestPool.Put(req)
 }
 
-var seriesV1RequestPool sync.Pool
-
-func getSeriesV2Request() *apiSeriesV2.Request {
-	v := seriesV2RequestPool.Get()
-	if v == nil {
-		return &apiSeriesV2.Request{}
-	}
-	return v.(*apiSeriesV2.Request)
-}
-
-func putSeriesV2Request(req datadog.Request) {
-	seriesV2RequestPool.Put(req)
-}
-
-var seriesV2RequestPool sync.Pool
-
-func getSketchesBetaRequest() *apiSketchesBeta.Request {
-	v := sketchesBetaRequestPool.Get()
-	if v == nil {
-		return &apiSketchesBeta.Request{}
-	}
-	return v.(*apiSketchesBeta.Request)
-}
-
-func putSketchesBetaRequest(req datadog.Request) {
-	sketchesBetaRequestPool.Put(req)
-}
-
-var sketchesBetaRequestPool sync.Pool
+var requestPool sync.Pool
 
 // sanitizeName performs DataDog-compatible sanitizing for metric names
 //