app/{vmagent,vminsert}: follow-up after a1d1ccd6f2

- Document the change at docs/CHANGELOG.md
- Copy changes from docs/Single-server-VictoriaMetrics.md to README.md
- Add missing handler for processing multitenant requests ( https://docs.victoriametrics.com/vmagent/#multitenancy )
- Substitute github.com/stretchr/testify dependency with 3 lines of code in the added tests
- Comment unclear code at lib/protoparser/datadogsketches/parser.go , so @AndrewChubatiuk could update it
  and add permalinks to the original source code there.
- Various code cleanups

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5584
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3091
This commit is contained in:
Aliaksandr Valialkin 2024-02-07 01:15:25 +02:00
parent 4a31bd9661
commit 541b644d3d
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
12 changed files with 207 additions and 133 deletions

View file

@ -509,8 +509,9 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent ## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) and
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` or via "sketches" API at `/datadog/api/beta/sketches`.
### Sending metrics to VictoriaMetrics ### Sending metrics to VictoriaMetrics
@ -536,10 +537,10 @@ add the following line:
dd_url: http://victoriametrics:8428/datadog dd_url: http://victoriametrics:8428/datadog
``` ```
[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, [vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept DataDog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics ### Sending metrics to DataDog and VictoriaMetrics
DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics
sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`. sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`.
@ -564,6 +565,19 @@ additional_endpoints:
- apikey - apikey
``` ```
### Send metrics via Serverless DataDog plugin
Disable logs (logs ingestion is not supported by VictoriaMetrics) and set a custom endpoint in `serverless.yaml`:
```
custom:
datadog:
enableDDLogs: false # Disabled not supported DD logs
apiKey: fakekey # Set any key, otherwise plugin fails
provider:
environment:
DD_DD_URL: <<vm-url>>/datadog # VictoriaMetrics endpoint for DataDog
```
### Send via cURL ### Send via cURL

View file

@ -41,19 +41,15 @@ func insertRows(at *auth.Token, sketches []*datadogsketches.Sketch, extraLabels
tssDst := ctx.WriteRequest.Timeseries[:0] tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0] labels := ctx.Labels[:0]
samples := ctx.Samples[:0] samples := ctx.Samples[:0]
for i := range sketches { for _, sketch := range sketches {
sketch := sketches[i] ms := sketch.ToSummary()
metrics := sketch.ToHistogram() for _, m := range ms {
rowsTotal += sketch.RowsCount()
for m := range metrics {
metric := metrics[m]
labelsLen := len(labels) labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{ labels = append(labels, prompbmarshal.Label{
Name: "__name__", Name: "__name__",
Value: metric.Name, Value: m.Name,
}) })
for l := range metric.Labels { for _, label := range m.Labels {
label := metric.Labels[l]
labels = append(labels, prompbmarshal.Label{ labels = append(labels, prompbmarshal.Label{
Name: label.Name, Name: label.Name,
Value: label.Value, Value: label.Value,
@ -71,13 +67,13 @@ func insertRows(at *auth.Token, sketches []*datadogsketches.Sketch, extraLabels
} }
labels = append(labels, extraLabels...) labels = append(labels, extraLabels...)
samplesLen := len(samples) samplesLen := len(samples)
for p := range metric.Points { for _, p := range m.Points {
point := metric.Points[p]
samples = append(samples, prompbmarshal.Sample{ samples = append(samples, prompbmarshal.Sample{
Timestamp: sketch.Dogsketches[p].Ts * 1000, Timestamp: p.Timestamp,
Value: point, Value: p.Value,
}) })
} }
rowsTotal += len(m.Points)
tssDst = append(tssDst, prompbmarshal.TimeSeries{ tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:], Labels: labels[labelsLen:],
Samples: samples[samplesLen:], Samples: samples[samplesLen:],

View file

@ -613,6 +613,15 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`) fmt.Fprintf(w, `{"status":"ok"}`)
return true return true
case "datadog/api/beta/sketches":
datadogsketchesWriteRequests.Inc()
if err := datadogsketches.InsertHandlerForHTTP(at, r); err != nil {
datadogsketchesWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(202)
return true
case "datadog/api/v1/validate": case "datadog/api/v1/validate":
datadogValidateRequests.Inc() datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key // See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key

View file

@ -35,23 +35,18 @@ func insertRows(sketches []*datadogsketches.Sketch, extraLabels []prompbmarshal.
defer common.PutInsertCtx(ctx) defer common.PutInsertCtx(ctx)
rowsLen := 0 rowsLen := 0
for i := range sketches { for _, sketch := range sketches {
sketch := sketches[i]
rowsLen += sketch.RowsCount() rowsLen += sketch.RowsCount()
} }
ctx.Reset(rowsLen) ctx.Reset(rowsLen)
rowsTotal := 0 rowsTotal := 0
hasRelabeling := relabel.HasRelabeling() hasRelabeling := relabel.HasRelabeling()
for i := range sketches { for _, sketch := range sketches {
sketch := sketches[i] ms := sketch.ToSummary()
metrics := sketch.ToHistogram() for _, m := range ms {
rowsTotal += sketch.RowsCount()
for m := range metrics {
metric := metrics[m]
ctx.Labels = ctx.Labels[:0] ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", metric.Name) ctx.AddLabel("", m.Name)
for l := range metric.Labels { for _, label := range m.Labels {
label := metric.Labels[l]
ctx.AddLabel(label.Name, label.Value) ctx.AddLabel(label.Name, label.Value)
} }
for _, tag := range sketch.Tags { for _, tag := range sketch.Tags {
@ -75,14 +70,13 @@ func insertRows(sketches []*datadogsketches.Sketch, extraLabels []prompbmarshal.
ctx.SortLabelsIfNeeded() ctx.SortLabelsIfNeeded()
var metricNameRaw []byte var metricNameRaw []byte
var err error var err error
for p := range metric.Points { for _, p := range m.Points {
value := metric.Points[p] metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, p.Timestamp, p.Value)
timestamp := sketch.Dogsketches[p].Ts * 1000
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, timestamp, value)
if err != nil { if err != nil {
return err return err
} }
} }
rowsTotal += len(m.Points)
} }
} }
rowsInserted.Add(rowsTotal) rowsInserted.Add(rowsTotal)

View file

@ -29,6 +29,7 @@ The sandbox cluster installation is running under the constant load generated by
## tip ## tip
* FEATURE: all VictoriaMetrics components: add support for TLS client certificate verification at `-httpListenAddr` (aka [mTLS](https://en.wikipedia.org/wiki/Mutual_authentication)). See [these docs](https://docs.victoriametrics.com/#mtls-protection). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5458). * FEATURE: all VictoriaMetrics components: add support for TLS client certificate verification at `-httpListenAddr` (aka [mTLS](https://en.wikipedia.org/wiki/Mutual_authentication)). See [these docs](https://docs.victoriametrics.com/#mtls-protection). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5458).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com): add support for data ingestion via [DataDog lambda extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) aka `/api/beta/sketches` endpoint. See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3091). Thanks to @AndrewChubatiuk for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5584).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `-disableReroutingOnUnavailable` command-line flag, which can be used for reducing resource usage spikes at `vmstorage` nodes during rolling restart. Thanks to @Muxa1L for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5713). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `-disableReroutingOnUnavailable` command-line flag, which can be used for reducing resource usage spikes at `vmstorage` nodes during rolling restart. Thanks to @Muxa1L for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5713).
* FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add `Targets scraped/s` stat panel showing the number of targets scraped by the vmagent per-second. * FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add `Targets scraped/s` stat panel showing the number of targets scraped by the vmagent per-second.
* FEATURE: [dashboards/all](https://grafana.com/orgs/victoriametrics): add new panel `CPU spent on GC`. It should help identifying cases when too much CPU is spent on garbage collection, and advice users on how this can be addressed. * FEATURE: [dashboards/all](https://grafana.com/orgs/victoriametrics): add new panel `CPU spent on GC`. It should help identifying cases when too much CPU is spent on garbage collection, and advice users on how this can be addressed.

View file

@ -512,8 +512,9 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent ## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/) or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) and
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path. [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` or via "sketches" API at `/datadog/api/beta/sketches`.
### Sending metrics to VictoriaMetrics ### Sending metrics to VictoriaMetrics
@ -539,10 +540,10 @@ add the following line:
dd_url: http://victoriametrics:8428/datadog dd_url: http://victoriametrics:8428/datadog
``` ```
[vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept Datadog metrics format. Depending on where vmagent will forward data, [vmagent](https://docs.victoriametrics.com/vmagent.html) also can accept DataDog metrics format. Depending on where vmagent will forward data,
pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats. pick [single-node or cluster URL](https://docs.victoriametrics.com/url-examples.html#datadog) formats.
### Sending metrics to Datadog and VictoriaMetrics ### Sending metrics to DataDog and VictoriaMetrics
DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics DataDog allows configuring [Dual Shipping](https://docs.datadoghq.com/agent/guide/dual-shipping/) for metrics
sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`. sending via ENV variable `DD_ADDITIONAL_ENDPOINTS` or via configuration file `additional_endpoints`.
@ -567,6 +568,19 @@ additional_endpoints:
- apikey - apikey
``` ```
### Send metrics via Serverless DataDog plugin
Disable logs (logs ingestion is not supported by VictoriaMetrics) and set a custom endpoint in `serverless.yaml`:
```
custom:
datadog:
enableDDLogs: false # Disabled not supported DD logs
apiKey: fakekey # Set any key, otherwise plugin fails
provider:
environment:
DD_DD_URL: <<vm-url>>/datadog # VictoriaMetrics endpoint for DataDog
```
### Send via cURL ### Send via cURL

View file

@ -520,8 +520,9 @@ See also [vmagent](https://docs.victoriametrics.com/vmagent.html), which can be
## How to send data from DataDog agent ## How to send data from DataDog agent
VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) or [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/) VictoriaMetrics accepts data from [DataDog agent](https://docs.datadoghq.com/agent/), [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) and
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` path or via "sketches" API at `/datadog/api/beta/sketches`. [DataDog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/)
via ["submit metrics" API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics) at `/datadog/api/v2/series` or via "sketches" API at `/datadog/api/beta/sketches`.
### Sending metrics to VictoriaMetrics ### Sending metrics to VictoriaMetrics
@ -575,10 +576,10 @@ additional_endpoints:
- apikey - apikey
``` ```
### Send metrics via Serverless DataDog plugin
### Send via Serverless DataDog plugin Disable logs (logs ingestion is not supported by VictoriaMetrics) and set a custom endpoint in `serverless.yaml`:
Disable logs (logs ingestion is not supported by Victoria Metrics) and set a custom endpoint in serverless.yaml
``` ```
custom: custom:
datadog: datadog:

3
go.mod
View file

@ -37,8 +37,6 @@ require (
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
) )
require github.com/stretchr/testify v1.8.4
require ( require (
cloud.google.com/go v0.112.0 // indirect cloud.google.com/go v0.112.0 // indirect
cloud.google.com/go/compute v1.23.4 // indirect cloud.google.com/go/compute v1.23.4 // indirect
@ -104,6 +102,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect
github.com/rivo/uniseg v0.4.6 // indirect github.com/rivo/uniseg v0.4.6 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect
go.opencensus.io v0.24.0 // indirect go.opencensus.io v0.24.0 // indirect

View file

@ -9,6 +9,7 @@ import (
) )
var ( var (
// TODO: @AndrewChubatiuk, please provide a permalink for the original source code where these constants were extracted
epsillon = 1.0 / 128 epsillon = 1.0 / 128
gamma = 1 + 2*epsillon gamma = 1 + 2*epsillon
gammaLn = math.Log(gamma) gammaLn = math.Log(gamma)
@ -17,7 +18,16 @@ var (
quantiles = []float64{0.5, 0.75, 0.9, 0.95, 0.99} quantiles = []float64{0.5, 0.75, 0.9, 0.95, 0.99}
) )
type label struct { var quantilesStr = func() []string {
a := make([]string, len(quantiles))
for i, q := range quantiles {
a[i] = strconv.FormatFloat(q, 'g', 3, 64)
}
return a
}()
// Label is a single label for Metric
type Label struct {
Name string Name string
Value string Value string
} }
@ -25,8 +35,14 @@ type label struct {
// Metric stores metrics extracted from sketches // Metric stores metrics extracted from sketches
type Metric struct { type Metric struct {
Name string Name string
Labels []label Labels []Label
Points []float64 Points []Point
}
// Point stores a single point extracted from sketches
type Point struct {
Value float64
Timestamp int64
} }
// SketchPayload stores sketches extracted from /api/beta/sketches endpoint // SketchPayload stores sketches extracted from /api/beta/sketches endpoint
@ -34,13 +50,15 @@ type Metric struct {
// message SketchPayload { // message SketchPayload {
// repeated Sketch sketches = 1 // repeated Sketch sketches = 1
// } // }
//
// See https://github.com/DataDog/agent-payload/blob/38db68d9641c8a0bd2e1eac53b9d54793448850f/proto/metrics/agent_payload.proto#L90
type SketchPayload struct { type SketchPayload struct {
Sketches []*Sketch Sketches []*Sketch
} }
// UnmarshalProtobuf decodes byte array to SketchPayload struct // UnmarshalProtobuf decodes src to SketchPayload struct
func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) { func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) {
sp.Sketches = sp.Sketches[:0] sp.Sketches = nil
var fc easyproto.FieldContext var fc easyproto.FieldContext
for len(src) > 0 { for len(src) > 0 {
src, err = fc.NextField(src) src, err = fc.NextField(src)
@ -51,13 +69,13 @@ func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) {
case 1: case 1:
data, ok := fc.MessageData() data, ok := fc.MessageData()
if !ok { if !ok {
return fmt.Errorf("cannot read SketchPayload sketches data") return fmt.Errorf("cannot read Sketch data")
} }
sp.Sketches = append(sp.Sketches, &Sketch{}) var s Sketch
s := sp.Sketches[len(sp.Sketches)-1]
if err := s.unmarshalProtobuf(data); err != nil { if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal sketch: %w", err) return fmt.Errorf("cannot unmarshal Sketch: %w", err)
} }
sp.Sketches = append(sp.Sketches, &s)
} }
} }
return nil return nil
@ -71,6 +89,8 @@ func (sp *SketchPayload) UnmarshalProtobuf(src []byte) (err error) {
// repeated string tags = 4; // repeated string tags = 4;
// repeated Dogsketch dogsketches = 7 // repeated Dogsketch dogsketches = 7
// } // }
//
// See https://github.com/DataDog/agent-payload/blob/38db68d9641c8a0bd2e1eac53b9d54793448850f/proto/metrics/agent_payload.proto#L91
type Sketch struct { type Sketch struct {
Metric string Metric string
Host string Host string
@ -78,12 +98,12 @@ type Sketch struct {
Dogsketches []*Dogsketch Dogsketches []*Dogsketch
} }
// unmarshalProtobuf decodes byte array to Sketch struct // unmarshalProtobuf decodes src to Sketch struct
func (s *Sketch) unmarshalProtobuf(src []byte) (err error) { func (s *Sketch) unmarshalProtobuf(src []byte) (err error) {
s.Metric = "" s.Metric = ""
s.Host = "" s.Host = ""
s.Tags = s.Tags[:0] s.Tags = nil
s.Dogsketches = s.Dogsketches[:0] s.Dogsketches = nil
var fc easyproto.FieldContext var fc easyproto.FieldContext
for len(src) > 0 { for len(src) > 0 {
@ -95,78 +115,87 @@ func (s *Sketch) unmarshalProtobuf(src []byte) (err error) {
case 1: case 1:
metric, ok := fc.String() metric, ok := fc.String()
if !ok { if !ok {
return fmt.Errorf("cannot read Sketch metric") return fmt.Errorf("cannot read metric")
} }
s.Metric = metric s.Metric = metric
case 2: case 2:
host, ok := fc.String() host, ok := fc.String()
if !ok { if !ok {
return fmt.Errorf("cannot read Sketch host") return fmt.Errorf("cannot read host")
} }
s.Host = host s.Host = host
case 4: case 4:
tag, ok := fc.String() tag, ok := fc.String()
if !ok { if !ok {
return fmt.Errorf("cannot read Sketch tag") return fmt.Errorf("cannot read tag")
} }
s.Tags = append(s.Tags, tag) s.Tags = append(s.Tags, tag)
case 7: case 7:
data, ok := fc.MessageData() data, ok := fc.MessageData()
if !ok { if !ok {
return fmt.Errorf("cannot read Sketch dogsketches data") return fmt.Errorf("cannot read Dogsketch data")
} }
s.Dogsketches = append(s.Dogsketches, &Dogsketch{}) var d Dogsketch
d := s.Dogsketches[len(s.Dogsketches)-1]
if err := d.unmarshalProtobuf(data); err != nil { if err := d.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal dogsketch: %w", err) return fmt.Errorf("cannot unmarshal Dogsketch: %w", err)
} }
s.Dogsketches = append(s.Dogsketches, &d)
} }
} }
return nil return nil
} }
// RowsCount calculates generated rows num from sketch // RowsCount returns the number of samples s generates.
func (s *Sketch) RowsCount() int { func (s *Sketch) RowsCount() int {
return (len(quantiles) + len(s.extractAggr())) * len(s.Dogsketches) // The sketch contains len(quantiles) plus *_sum and *_count metrics
// per each Dogsketch in s.Dogsketches.
return (len(quantiles) + 2) * len(s.Dogsketches)
} }
func (s *Sketch) extractAggr() []*Metric { // ToSummary generates Prometheus summary from the given s.
return []*Metric{ func (s *Sketch) ToSummary() []*Metric {
{ metrics := make([]*Metric, len(quantiles)+2)
Name: s.Metric + "_sum",
Labels: []label{},
Points: make([]float64, len(s.Dogsketches)),
}, {
Name: s.Metric + "_count",
Labels: []label{},
Points: make([]float64, len(s.Dogsketches)),
},
}
}
// ToHistogram generates histogram metrics
func (s *Sketch) ToHistogram() []*Metric {
dogsketches := s.Dogsketches dogsketches := s.Dogsketches
aggr := s.extractAggr()
metrics := make([]*Metric, len(quantiles)) sumPoints := make([]Point, len(dogsketches))
for q := range quantiles { countPoints := make([]Point, len(dogsketches))
quantile := quantiles[q] metrics[len(metrics)-2] = &Metric{
metrics[q] = &Metric{ Name: s.Metric + "_sum",
Points: sumPoints,
}
metrics[len(metrics)-1] = &Metric{
Name: s.Metric + "_count",
Points: countPoints,
}
for i, q := range quantiles {
points := make([]Point, len(dogsketches))
for j, d := range dogsketches {
timestamp := d.Ts * 1000
points[j] = Point{
Timestamp: timestamp,
Value: d.valueForQuantile(q),
}
sumPoints[j] = Point{
Timestamp: timestamp,
Value: d.Sum,
}
countPoints[j] = Point{
Timestamp: timestamp,
Value: float64(d.Cnt),
}
}
metrics[i] = &Metric{
Name: s.Metric, Name: s.Metric,
Labels: []label{{ Labels: []Label{{
Name: "quantile", Name: "quantile",
Value: strconv.FormatFloat(quantile, 'g', 3, 64), Value: quantilesStr[i],
}}, }},
Points: make([]float64, len(dogsketches)), Points: points,
}
for d := range dogsketches {
dogsketch := dogsketches[d]
aggr[0].Points[d] = dogsketch.Sum
aggr[1].Points[d] = float64(dogsketch.Cnt)
metrics[q].Points[d] = dogsketch.pointForQuantile(quantile)
} }
} }
return append(metrics, aggr...)
return metrics
} }
// Dogsketch proto struct // Dogsketch proto struct
@ -180,6 +209,8 @@ func (s *Sketch) ToHistogram() []*Metric {
// repeated sint32 k = 7; // repeated sint32 k = 7;
// repeated uint32 n = 8; // repeated uint32 n = 8;
// } // }
//
// See https://github.com/DataDog/agent-payload/blob/38db68d9641c8a0bd2e1eac53b9d54793448850f/proto/metrics/agent_payload.proto#L104
type Dogsketch struct { type Dogsketch struct {
Ts int64 Ts int64
Cnt int64 Cnt int64
@ -190,91 +221,102 @@ type Dogsketch struct {
N []uint32 N []uint32
} }
// unmarshalProtobuf decodes byte array to Dogsketch struct // unmarshalProtobuf decodes src to Dogsketch struct
func (d *Dogsketch) unmarshalProtobuf(src []byte) (err error) { func (d *Dogsketch) unmarshalProtobuf(src []byte) (err error) {
d.Ts = 0 d.Ts = 0
d.Cnt = 0 d.Cnt = 0
d.Min = 0.0 d.Min = 0.0
d.Max = 0.0 d.Max = 0.0
d.Sum = 0.0 d.Sum = 0.0
d.K = d.K[:0] d.K = nil
d.N = d.N[:0] d.N = nil
var fc easyproto.FieldContext var fc easyproto.FieldContext
for len(src) > 0 { for len(src) > 0 {
src, err = fc.NextField(src) src, err = fc.NextField(src)
if err != nil { if err != nil {
return fmt.Errorf("cannot read next field in Dogsketch message, %w", err) return fmt.Errorf("cannot read next field in Dogsketch message: %w", err)
} }
switch fc.FieldNum { switch fc.FieldNum {
case 1: case 1:
ts, ok := fc.Int64() ts, ok := fc.Int64()
if !ok { if !ok {
return fmt.Errorf("cannot read Dogsketch timestamp") return fmt.Errorf("cannot read timestamp")
} }
d.Ts = ts d.Ts = ts
case 2: case 2:
cnt, ok := fc.Int64() cnt, ok := fc.Int64()
if !ok { if !ok {
return fmt.Errorf("cannot read Dogsketch count") return fmt.Errorf("cannot read count")
} }
d.Cnt = cnt d.Cnt = cnt
case 3: case 3:
min, ok := fc.Double() min, ok := fc.Double()
if !ok { if !ok {
return fmt.Errorf("cannot read Dogsketch min") return fmt.Errorf("cannot read min")
} }
d.Min = min d.Min = min
case 4: case 4:
max, ok := fc.Double() max, ok := fc.Double()
if !ok { if !ok {
return fmt.Errorf("cannot read Dogsketch max") return fmt.Errorf("cannot read max")
} }
d.Max = max d.Max = max
case 6: case 6:
sum, ok := fc.Double() sum, ok := fc.Double()
if !ok { if !ok {
return fmt.Errorf("cannot read Dogsketch sum") return fmt.Errorf("cannot read sum")
} }
d.Sum = sum d.Sum = sum
case 7: case 7:
var ok bool var ok bool
d.K, ok = fc.UnpackSint32s(d.K) d.K, ok = fc.UnpackSint32s(d.K)
if !ok { if !ok {
return fmt.Errorf("cannot read Dogsketch k") return fmt.Errorf("cannot read k")
} }
case 8: case 8:
var ok bool var ok bool
d.N, ok = fc.UnpackUint32s(d.N) d.N, ok = fc.UnpackUint32s(d.N)
if !ok { if !ok {
return fmt.Errorf("cannot read Dogsketch n") return fmt.Errorf("cannot read n")
} }
} }
} }
return nil return nil
} }
func (d *Dogsketch) pointForQuantile(quantile float64) float64 { func (d *Dogsketch) valueForQuantile(q float64) float64 {
switch { switch {
case d.Cnt == 0: case d.Cnt == 0:
return 0 return 0
case quantile <= 0: case q <= 0:
return d.Min return d.Min
case quantile >= 1: case q >= 1:
return d.Max return d.Max
} }
rank := quantile * float64(d.Cnt-1) ns := d.N
nLen := len(d.N) ks := d.K
for cnt, i := 0.0, 0; i < nLen; i++ { if len(ns) != len(ks) {
cnt += float64(d.N[i]) // Avoid index out of range panic in the loop below.
return 0
}
rank := q * float64(d.Cnt-1)
cnt := float64(0)
for i, n := range ns {
cnt += float64(n)
if cnt <= rank { if cnt <= rank {
continue continue
} }
weight := (cnt - rank) / float64(d.N[i]) weight := (cnt - rank) / float64(n)
vLow := f64(d.K[i]) vLow := f64(ks[i])
vHigh := vLow * gamma vHigh := vLow * gamma
switch i { switch i {
case nLen: // TODO: I'm unsure this code is correct. i cannot equal len(ns) in this loop.
// @AndrewChubatiuk, please add a permalink to the original source code, which was used
// for writing this code, in the comments to this function.
case len(ns):
vHigh = d.Max vHigh = d.Max
case 0: case 0:
vLow = d.Min vLow = d.Min
@ -288,6 +330,8 @@ func f64(k int32) float64 {
switch { switch {
case k < 0: case k < 0:
return -f64(-k) return -f64(-k)
// TODO: I'm unsure this logic is correct, since k can be smaller than math.MinInt16 and bigger than math.MaxInt16
// @AndrewChubatiuk, please add a permalink to the original source code, which was used for writing this code.
case k == math.MaxInt16 || k == math.MinInt16: case k == math.MaxInt16 || k == math.MinInt16:
return math.Inf(int(k)) return math.Inf(int(k))
case k == 0: case k == 0:

View file

@ -1,16 +1,17 @@
package datadogsketches package datadogsketches
import ( import (
"math"
"testing" "testing"
"github.com/stretchr/testify/assert"
) )
func TestPointsForQuantile(t *testing.T) { func TestPointsForQuantile(t *testing.T) {
f := func(d *Dogsketch, quantile float64, expected float64) { f := func(d *Dogsketch, q float64, vExpected float64) {
t.Helper() t.Helper()
point := d.pointForQuantile(quantile) v := d.valueForQuantile(q)
assert.InDelta(t, point, expected, 0.5) if math.Abs(v-vExpected) > 0.4 {
t.Fatalf("unexpected value; got %v; want %v", v, vExpected)
}
} }
sketches := &Dogsketch{ sketches := &Dogsketch{
Min: 8.0, Min: 8.0,

View file

@ -20,7 +20,6 @@ import (
// //
// callback shouldn't hold series after returning. // callback shouldn't hold series after returning.
func Parse(r io.Reader, contentEncoding string, callback func(series []*datadogsketches.Sketch) error) error { func Parse(r io.Reader, contentEncoding string, callback func(series []*datadogsketches.Sketch) error) error {
var err error
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -50,18 +49,17 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []*datadogs
req := getRequest() req := getRequest()
defer putRequest(req) defer putRequest(req)
err = req.UnmarshalProtobuf(ctx.reqBuf.B) if err := req.UnmarshalProtobuf(ctx.reqBuf.B); err != nil {
if err != nil {
unmarshalErrors.Inc() unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal DataDog request with size %d bytes: %w", len(ctx.reqBuf.B), err) return fmt.Errorf("cannot unmarshal DataDog Sketches request with size %d bytes: %w", len(ctx.reqBuf.B), err)
} }
rows := 0 rows := 0
sketches := req.Sketches sketches := req.Sketches
for i := range sketches { for _, sketch := range sketches {
rows += len(sketches[i].Dogsketches) rows += sketch.RowsCount()
if *datadogutils.SanitizeMetricName { if *datadogutils.SanitizeMetricName {
sketches[i].Metric = datadogutils.SanitizeName(sketches[i].Metric) sketch.Metric = datadogutils.SanitizeName(sketch.Metric)
} }
} }
rowsRead.Add(rows) rowsRead.Add(rows)

View file

@ -40,6 +40,7 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadogv1
defer common.PutZlibReader(zlr) defer common.PutZlibReader(zlr)
r = zlr r = zlr
} }
ctx := getPushCtx(r) ctx := getPushCtx(r)
defer putPushCtx(ctx) defer putPushCtx(ctx)
if err := ctx.Read(); err != nil { if err := ctx.Read(); err != nil {
@ -47,10 +48,12 @@ func Parse(r io.Reader, contentEncoding string, callback func(series []datadogv1
} }
req := getRequest() req := getRequest()
defer putRequest(req) defer putRequest(req)
if err := req.Unmarshal(ctx.reqBuf.B); err != nil { if err := req.Unmarshal(ctx.reqBuf.B); err != nil {
unmarshalErrors.Inc() unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %w", len(ctx.reqBuf.B), err) return fmt.Errorf("cannot unmarshal DataDog POST request with size %d bytes: %w", len(ctx.reqBuf.B), err)
} }
rows := 0 rows := 0
series := req.Series series := req.Series
for i := range series { for i := range series {