diff --git a/app/vmagent/datadogsketches/request_handler.go b/app/vmagent/datadogsketches/request_handler.go new file mode 100644 index 000000000..42a8665ff --- /dev/null +++ b/app/vmagent/datadogsketches/request_handler.go @@ -0,0 +1,99 @@ +package datadogsketches + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadogsketches"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadogsketches"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadogsketches"}`) +) + +// InsertHandlerForHTTP processes remote write for DataDog POST /api/beta/sketches request. +func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + ce := req.Header.Get("Content-Encoding") + return stream.Parse(req.Body, ce, func(sketches []*datadogsketches.Sketch) error { + return insertRows(at, sketches, extraLabels) + }) +} + +func insertRows(at *auth.Token, sketches []*datadogsketches.Sketch, extraLabels []prompbmarshal.Label) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + rowsTotal := 0 + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range sketches { + sketch := sketches[i] + metrics := sketch.ToHistogram() + rowsTotal += sketch.RowsCount() + for m := range metrics { + metric := metrics[m] + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: metric.Name, + }) + for l := range metric.Labels { + label := metric.Labels[l] + labels = append(labels, prompbmarshal.Label{ + Name: label.Name, + Value: label.Value, + }) + } + for _, tag := range sketch.Tags { + name, value := datadogutils.SplitTag(tag) + if name == "host" { + name = "exported_host" + } + labels = append(labels, prompbmarshal.Label{ + Name: name, + Value: value, + }) + } + labels = append(labels, extraLabels...) + samplesLen := len(samples) + for p := range metric.Points { + point := metric.Points[p] + samples = append(samples, prompbmarshal.Sample{ + Timestamp: sketch.Dogsketches[p].Ts * 1000, + Value: point, + }) + } + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[samplesLen:], + }) + } + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + if !remotewrite.TryPush(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } + rowsInserted.Add(rowsTotal) + if at != nil { + rowsTenantInserted.Get(at).Add(rowsTotal) + } + rowsPerInsert.Update(float64(rowsTotal)) + return nil +} diff --git a/app/vmagent/main.go b/app/vmagent/main.go index aaf7cf7c8..913742265 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -12,6 +12,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogsketches" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv1" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadogv2" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" @@ -368,6 +369,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true + case "/datadog/api/beta/sketches": + datadogsketchesWriteRequests.Inc() + if err := datadogsketches.InsertHandlerForHTTP(nil, r); err != nil { + datadogsketchesWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(202) + return true case "/datadog/api/v1/validate": datadogValidateRequests.Inc() // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key @@ -659,6 +669,9 @@ var ( datadogv2WriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`) datadogv2WriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`) + datadogsketchesWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) + datadogsketchesWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) + datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) datadogIntakeRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/intake", protocol="datadog"}`) diff --git a/app/vminsert/datadogsketches/request_handler.go b/app/vminsert/datadogsketches/request_handler.go new file mode 100644 index 000000000..b00f885aa --- /dev/null +++ b/app/vminsert/datadogsketches/request_handler.go @@ -0,0 +1,91 @@ +package datadogsketches + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="datadogsketches"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="datadogsketches"}`) +) + +// InsertHandlerForHTTP processes remote write for DataDog POST /api/beta/sketches request. +func InsertHandlerForHTTP(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + ce := req.Header.Get("Content-Encoding") + return stream.Parse(req.Body, ce, func(sketches []*datadogsketches.Sketch) error { + return insertRows(sketches, extraLabels) + }) +} + +func insertRows(sketches []*datadogsketches.Sketch, extraLabels []prompbmarshal.Label) error { + ctx := common.GetInsertCtx() + defer common.PutInsertCtx(ctx) + + rowsLen := 0 + for i := range sketches { + sketch := sketches[i] + rowsLen += sketch.RowsCount() + } + ctx.Reset(rowsLen) + rowsTotal := 0 + hasRelabeling := relabel.HasRelabeling() + for i := range sketches { + sketch := sketches[i] + metrics := sketch.ToHistogram() + rowsTotal += sketch.RowsCount() + for m := range metrics { + metric := metrics[m] + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", metric.Name) + for l := range metric.Labels { + label := metric.Labels[l] + ctx.AddLabel(label.Name, label.Value) + } + for _, tag := range sketch.Tags { + name, value := datadogutils.SplitTag(tag) + if name == "host" { + name = "exported_host" + } + ctx.AddLabel(name, value) + } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + var metricNameRaw []byte + var err error + for p := range metric.Points { + value := metric.Points[p] + timestamp := sketch.Dogsketches[p].Ts * 1000 + metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value) + if err != nil { + return err + } + } + } + } + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return ctx.FlushBufs() +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 91fe14fa9..f63d7d4d1 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -13,6 +13,7 @@ import ( vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv1" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv2" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" @@ -271,6 +272,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true + case "/datadog/api/beta/sketches": + datadogsketchesWriteRequests.Inc() + if err := datadogsketches.InsertHandlerForHTTP(r); err != nil { + datadogsketchesWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(202) + return true case "/datadog/api/v1/validate": datadogValidateRequests.Inc() // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key @@ -394,6 +404,9 @@ var ( datadogv2WriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v2/series", protocol="datadog"}`) datadogv2WriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/v2/series", protocol="datadog"}`) + datadogsketchesWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) + datadogsketchesWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/datadog/api/beta/sketches", protocol="datadog"}`) + datadogValidateRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`) datadogCheckRunRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`) datadogIntakeRequests = metrics.NewCounter(`vm_http_requests_total{path="/datadog/intake", protocol="datadog"}`) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 286cedb5c..4c88059e9 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -520,8 +520,8 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be ## How to send data from DataDog agent -VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) -via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. +VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) or [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) +via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path or via "sketches" API at `/datadog/api/beta/sketches`. ### Sending metrics to VictoriaMetrics @@ -547,10 +547,10 @@ add the following line: dd_url: http://victoriametrics:8428/datadog ``` -[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, +[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept DataDog metrics format. Depending on where vmagent will forward data, pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. -### Sending metrics to Datadog and VictoriaMetrics +### Sending metrics to DataDog and VictoriaMetrics DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`. @@ -576,6 +576,19 @@ additional_endpoints: ``` +### Send via Serverless DataDog plugin + +Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml +``` +custom: + datadog: + enableDDLogs: false # Disabled not supported DD logs + apiKey: fakekey # Set any key, otherwise plugin fails +provider: + environment: + DD_DD_URL: <>/datadog # Victoria Metrics endpoint for DataDog +``` + ### Send via cURL See how to send data to VictoriaMetrics via DataDog "submit metrics" API [here](https://docs.victoriametrics.com/url-examples.html#datadogapiv2series). diff --git a/go.mod b/go.mod index 712de6a25..a17325da1 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,8 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) +require github.com/stretchr/testify v1.8.4 + require ( cloud.google.com/go v0.112.0 // indirect cloud.google.com/go/compute v1.23.4 // indirect @@ -102,7 +104,6 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/rivo/uniseg v0.4.6 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/stretchr/testify v1.8.4 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect go.opencensus.io v0.24.0 // indirect diff --git a/lib/protoparser/datadogsketches/parser.go b/lib/protoparser/datadogsketches/parser.go new file mode 100644 index 000000000..b04dd4616 --- /dev/null +++ b/lib/protoparser/datadogsketches/parser.go @@ -0,0 +1,298 @@ +package datadogsketches + +import ( + "fmt" + "math" + "strconv" + + "github.com/VictoriaMetrics/easyproto" +) + +var ( + epsillon = 1.0 / 128 + gamma = 1 + 2*epsillon + gammaLn = math.Log(gamma) + defaultMin = 0.981e-9 + bias = 1 - int(math.Floor(math.Log(defaultMin)/gammaLn)) + quantiles = []float64{0.5, 0.75, 0.9, 0.95, 0.99} +) + +type label struct { + Name string + Value string +} + +// Metric stores metrics extracted from sketches +type Metric struct { + Name string + Labels []label + Points []float64 +} + +// SketchPayload stores sketches extracted from /api/beta/sketches endpoint +// +// message SketchPayload { +// repeated Sketch sketches = 1 +// } +type SketchPayload struct { + Sketches []*Sketch +} + +// UnmarshalProtobuf decodes byte array to SketchPayload struct +func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) { + sp.Sketches = sp.Sketches[:0] + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in SketchPayload message: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read SketchPayload sketches data") + } + sp.Sketches = append(sp.Sketches, &Sketch{}) + s := sp.Sketches[len(sp.Sketches)-1] + if err := s.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal sketch: %w", err) + } + } + } + return nil +} + +// Sketch proto struct +// +// message Sketch { +// string metric = 1; +// string host = 2; +// repeated string tags = 4; +// repeated Dogsketch dogsketches = 7 +// } +type Sketch struct { + Metric string + Host string + Tags []string + Dogsketches []*Dogsketch +} + +// unmarshalProtobuf decodes byte array to Sketch struct +func (s *Sketch) unmarshalProtobuf(src []byte) (err error) { + s.Metric = "" + s.Host = "" + s.Tags = s.Tags[:0] + s.Dogsketches = s.Dogsketches[:0] + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in Sketch message: %w", err) + } + switch fc.FieldNum { + case 1: + metric, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read Sketch metric") + } + s.Metric = metric + case 2: + host, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read Sketch host") + } + s.Host = host + case 4: + tag, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read Sketch tag") + } + s.Tags = append(s.Tags, tag) + case 7: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Sketch dogsketches data") + } + s.Dogsketches = append(s.Dogsketches, &Dogsketch{}) + d := s.Dogsketches[len(s.Dogsketches)-1] + if err := d.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal dogsketch: %w", err) + } + } + } + return nil +} + +// RowsCount calculates generated rows num from sketch +func (s *Sketch) RowsCount() int { + return (len(quantiles) + len(s.extractAggr())) * len(s.Dogsketches) +} + +func (s *Sketch) extractAggr() []*Metric { + return []*Metric{ + { + Name: s.Metric + "_sum", + Labels: []label{}, + Points: make([]float64, len(s.Dogsketches)), + }, { + Name: s.Metric + "_count", + Labels: []label{}, + Points: make([]float64, len(s.Dogsketches)), + }, + } +} + +// ToHistogram generates histogram metrics +func (s *Sketch) ToHistogram() []*Metric { + dogsketches := s.Dogsketches + aggr := s.extractAggr() + metrics := make([]*Metric, len(quantiles)) + for q := range quantiles { + quantile := quantiles[q] + metrics[q] = &Metric{ + Name: s.Metric, + Labels: []label{{ + Name: "quantile", + Value: strconv.FormatFloat(quantile, 'g', 3, 64), + }}, + Points: make([]float64, len(dogsketches)), + } + for d := range dogsketches { + dogsketch := dogsketches[d] + aggr[0].Points[d] = dogsketch.Sum + aggr[1].Points[d] = float64(dogsketch.Cnt) + metrics[q].Points[d] = dogsketch.pointForQuantile(quantile) + } + } + return append(metrics, aggr...) +} + +// Dogsketch proto struct +// +// message Dogsketch { +// int64 ts = 1; +// int64 cnt = 2; +// double min = 3; +// double max = 4; +// double sum = 6; +// repeated sint32 k = 7; +// repeated uint32 n = 8; +// } +type Dogsketch struct { + Ts int64 + Cnt int64 + Min float64 + Max float64 + Sum float64 + K []int32 + N []uint32 +} + +// unmarshalProtobuf decodes byte array to Dogsketch struct +func (d *Dogsketch) unmarshalProtobuf(src []byte) (err error) { + d.Ts = 0 + d.Cnt = 0 + d.Min = 0.0 + d.Max = 0.0 + d.Sum = 0.0 + d.K = d.K[:0] + d.N = d.N[:0] + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in Dogsketch message, %w", err) + } + switch fc.FieldNum { + case 1: + ts, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read Dogsketch timestamp") + } + d.Ts = ts + case 2: + cnt, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read Dogsketch count") + } + d.Cnt = cnt + case 3: + min, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read Dogsketch min") + } + d.Min = min + case 4: + max, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read Dogsketch max") + } + d.Max = max + case 6: + sum, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read Dogsketch sum") + } + d.Sum = sum + case 7: + var ok bool + d.K, ok = fc.UnpackSint32s(d.K) + if !ok { + return fmt.Errorf("cannot read Dogsketch k") + } + case 8: + var ok bool + d.N, ok = fc.UnpackUint32s(d.N) + if !ok { + return fmt.Errorf("cannot read Dogsketch n") + } + } + } + return nil +} + +func (d *Dogsketch) pointForQuantile(quantile float64) float64 { + switch { + case d.Cnt == 0: + return 0 + case quantile <= 0: + return d.Min + case quantile >= 1: + return d.Max + } + + rank := quantile * float64(d.Cnt-1) + nLen := len(d.N) + for cnt, i := 0.0, 0; i < nLen; i++ { + cnt += float64(d.N[i]) + if cnt <= rank { + continue + } + weight := (cnt - rank) / float64(d.N[i]) + vLow := f64(d.K[i]) + vHigh := vLow * gamma + switch i { + case nLen: + vHigh = d.Max + case 0: + vLow = d.Min + } + return vLow*weight + vHigh*(1-weight) + } + return d.Max +} + +func f64(k int32) float64 { + switch { + case k < 0: + return -f64(-k) + case k == math.MaxInt16 || k == math.MinInt16: + return math.Inf(int(k)) + case k == 0: + return 0 + } + exp := float64(int(k) - bias) + return math.Pow(gamma, exp) +} diff --git a/lib/protoparser/datadogsketches/parser_test.go b/lib/protoparser/datadogsketches/parser_test.go new file mode 100644 index 000000000..ac52efb98 --- /dev/null +++ b/lib/protoparser/datadogsketches/parser_test.go @@ -0,0 +1,28 @@ +package datadogsketches + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPointsForQuantile(t *testing.T) { + f := func(d *Dogsketch, quantile float64, expected float64) { + t.Helper() + point := d.pointForQuantile(quantile) + assert.InDelta(t, point, expected, 0.5) + } + sketches := &Dogsketch{ + Min: 8.0, + Max: 20.0, + Cnt: 17, + N: []uint32{0x0, 0x0, 0x1, 0x0, 0x1, 0x4, 0x6, 0x1, 0x2, 0x0, 0x1, 0x0, 0x1}, + K: []int32{0, 1472, 1473, 1479, 1480, 1503, 1504, 1512, 1513, 1514, 1515, 1531, 1532}, + } + f(sketches, 0.1, 8.96) + f(sketches, 0.5, 13.01) + f(sketches, 0.75, 14.96) + f(sketches, 0.9, 14.96) + f(sketches, 0.95, 15.43) + f(sketches, 0.99, 15.43) +} diff --git a/lib/protoparser/datadogsketches/stream/streamparser.go b/lib/protoparser/datadogsketches/stream/streamparser.go new file mode 100644 index 000000000..177741663 --- /dev/null +++ b/lib/protoparser/datadogsketches/stream/streamparser.go @@ -0,0 +1,149 @@ +package stream + +import ( + "bufio" + "fmt" + "io" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +// Parse parses DataDog POST request for /api/beta/sketches from reader and calls callback for the parsed request. +// +// callback shouldn't hold series after returning. +func Parse(r io.Reader, contentEncoding string, callback func(series []*datadogsketches.Sketch) error) error { + var err error + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + + switch contentEncoding { + case "gzip": + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped DataDog data: %w", err) + } + defer common.PutGzipReader(zr) + r = zr + case "deflate": + zlr, err := common.GetZlibReader(r) + if err != nil { + return fmt.Errorf("cannot read deflated DataDog data: %w", err) + } + defer common.PutZlibReader(zlr) + r = zlr + } + + ctx := getPushCtx(r) + defer putPushCtx(ctx) + if err := ctx.Read(); err != nil { + return err + } + req := getRequest() + defer putRequest(req) + + err = req.UnmarshalProtobuf(ctx.reqBuf.B) + if err != nil { + unmarshalErrors.Inc() + return fmt.Errorf("cannot unmarshal DataDog request with size %d bytes: %w", len(ctx.reqBuf.B), err) + } + + rows := 0 + sketches := req.Sketches + for i := range sketches { + rows += len(sketches[i].Dogsketches) + if *datadogutils.SanitizeMetricName { + sketches[i].Metric = datadogutils.SanitizeName(sketches[i].Metric) + } + } + rowsRead.Add(rows) + + if err := callback(sketches); err != nil { + return fmt.Errorf("error when processing imported data: %w", err) + } + return nil +} + +type pushCtx struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer +} + +func (ctx *pushCtx) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() +} + +func (ctx *pushCtx) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, int64(datadogutils.MaxInsertRequestSize.N)+1) + startTime := fasttime.UnixTimestamp() + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + if reqLen > int64(datadogutils.MaxInsertRequestSize.N) { + readErrors.Inc() + return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", datadogutils.MaxInsertRequestSize.N) + } + return nil +} + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="datadogsketches"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="datadogsketches"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="datadogsketches"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="datadogsketches"}`) +) + +func getPushCtx(r io.Reader) *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), + } + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) + +func getRequest() *datadogsketches.SketchPayload { + v := requestPool.Get() + if v == nil { + return &datadogsketches.SketchPayload{} + } + return v.(*datadogsketches.SketchPayload) +} + +func putRequest(req *datadogsketches.SketchPayload) { + requestPool.Put(req) +} + +var requestPool sync.Pool