diff --git a/README.md b/README.md index 8610f19dd..aee7f2982 100644 --- a/README.md +++ b/README.md @@ -854,71 +854,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] ``` -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): + ```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} ``` -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. - - ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadog/request_handler.go index 056d3bcb0..b7cb1a2dc 100644 --- a/app/vmagent/datadog/request_handler.go +++ b/app/vmagent/datadog/request_handler.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -29,12 +29,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { return err } ce := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, ce, func(series []parser.Series) error { + return stream.Parse(req.Body, ce, func(series []datadog.Series) error { return insertRows(at, series, extraLabels) }) } -func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -63,7 +63,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars }) } for _, tag := range ss.Tags { - name, value := parser.SplitTag(tag) + name, value := datadog.SplitTag(tag) if name == "host" { name = "exported_host" } diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 5be243ac5..6b042f10a 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -320,19 +320,19 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "/newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "/newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "/newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil { newrelicWriteErrors.Inc() @@ -543,19 +543,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(at, r); err != nil { newrelicWriteErrors.Inc() @@ -638,11 +638,11 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) - newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) - newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`) promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`) diff --git a/app/vmagent/newrelic/request_handler.go b/app/vmagent/newrelic/request_handler.go index 378476405..b164fc973 100644 --- a/app/vmagent/newrelic/request_handler.go +++ b/app/vmagent/newrelic/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" @@ -29,42 +30,48 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { } ce := req.Header.Get("Content-Encoding") isGzip := ce == "gzip" - return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { - return insertRows(at, series, extraLabels) + return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error { + return insertRows(at, rows, extraLabels) }) } -func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) - rowsTotal := 0 + samplesCount := 0 tssDst := ctx.WriteRequest.Timeseries[:0] labels := ctx.Labels[:0] samples := ctx.Samples[:0] for i := range rows { r := &rows[i] - labelsLen := len(labels) - labels = append(labels, prompbmarshal.Label{ - Name: "__name__", - Value: r.Metric, - }) - for j := range r.Tags { - tag := &r.Tags[j] + tags := r.Tags + srcSamples := r.Samples + for j := range srcSamples { + s := &srcSamples[j] + labelsLen := len(labels) labels = append(labels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, + Name: "__name__", + Value: bytesutil.ToUnsafeString(s.Name), }) + for k := range tags { + t := &tags[k] + labels = append(labels, prompbmarshal.Label{ + Name: bytesutil.ToUnsafeString(t.Key), + Value: bytesutil.ToUnsafeString(t.Value), + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: s.Value, + Timestamp: r.Timestamp, + }) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[len(samples)-1:], + }) + labels = append(labels, extraLabels...) } - samples = append(samples, prompbmarshal.Sample{ - Value: r.Value, - Timestamp: r.Timestamp, - }) - tssDst = append(tssDst, prompbmarshal.TimeSeries{ - Labels: labels[labelsLen:], - Samples: samples[len(samples)-1:], - }) - labels = append(labels, extraLabels...) + samplesCount += len(srcSamples) } ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels @@ -72,8 +79,8 @@ func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmars remotewrite.Push(at, &ctx.WriteRequest) rowsInserted.Add(len(rows)) if at != nil { - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.Get(at).Add(samplesCount) } - rowsPerInsert.Update(float64(len(rows))) + rowsPerInsert.Update(float64(samplesCount)) return nil } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 1e993ae37..a53f8216d 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -222,19 +222,19 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "/newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "/newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "/newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(r); err != nil { newrelicWriteErrors.Inc() @@ -382,11 +382,11 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) - newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) - newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`) promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`) diff --git a/app/vminsert/newrelic/request_handler.go b/app/vminsert/newrelic/request_handler.go index ae8530d56..226e97075 100644 --- a/app/vminsert/newrelic/request_handler.go +++ b/app/vminsert/newrelic/request_handler.go @@ -18,7 +18,7 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="newrelic"}`) ) -// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request. +// InsertHandlerForHTTP processes remote write for request to /newrelic/infra/v2/metrics/events/bulk request. func InsertHandlerForHTTP(req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { @@ -26,42 +26,52 @@ func InsertHandlerForHTTP(req *http.Request) error { } ce := req.Header.Get("Content-Encoding") isGzip := ce == "gzip" - return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { - return insertRows(series, extraLabels) + return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error { + return insertRows(rows, extraLabels) }) } -func insertRows(rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { +func insertRows(rows []newrelic.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) - ctx.Reset(len(rows)) + samplesCount := 0 + for i := range rows { + samplesCount += len(rows[i].Samples) + } + ctx.Reset(samplesCount) + hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] - ctx.Labels = ctx.Labels[:0] - ctx.AddLabel("", r.Metric) - for j := range r.Tags { - tag := &r.Tags[j] - ctx.AddLabel(tag.Key, tag.Value) - } - for j := range extraLabels { - label := &extraLabels[j] - ctx.AddLabel(label.Name, label.Value) - } - if hasRelabeling { - ctx.ApplyRelabeling() - } - if len(ctx.Labels) == 0 { - // Skip metric without labels. - continue - } - ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { - return err + samples := r.Samples + for j := range samples { + s := &samples[j] + + ctx.Labels = ctx.Labels[:0] + ctx.AddLabelBytes(nil, s.Name) + for k := range r.Tags { + t := &r.Tags[k] + ctx.AddLabelBytes(t.Key, t.Value) + } + for k := range extraLabels { + label := &extraLabels[k] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, s.Value); err != nil { + return err + } } } - rowsInserted.Add(len(rows)) - rowsPerInsert.Update(float64(len(rows))) + rowsInserted.Add(samplesCount) + rowsPerInsert.Update(float64(samplesCount)) return ctx.FlushBufs() } diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 64cbcbd17..3e836f06d 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -356,6 +356,7 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr - `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry). - `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. + - `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details. - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. - URLs for [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://:8481/select//prometheus/`, where: diff --git a/docs/README.md b/docs/README.md index 399d426ac..a889e61e4 100644 --- a/docs/README.md +++ b/docs/README.md @@ -857,71 +857,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] ``` -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): + ```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} ``` -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. - - ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index d036f0ebc..d725de0a4 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -865,71 +865,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] ``` -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): + ```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} ``` -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. - - ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadog/stream/streamparser.go index a657c5618..5a6c7e04f 100644 --- a/lib/protoparser/datadog/stream/streamparser.go +++ b/lib/protoparser/datadog/stream/streamparser.go @@ -99,11 +99,11 @@ func (ctx *pushCtx) Read() error { reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() - return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) } if reqLen > int64(maxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) } return nil } diff --git a/lib/protoparser/newrelic/parser.go b/lib/protoparser/newrelic/parser.go index e6901949f..41706880c 100644 --- a/lib/protoparser/newrelic/parser.go +++ b/lib/protoparser/newrelic/parser.go @@ -2,231 +2,191 @@ package newrelic import ( "fmt" - "sync" - "unicode" "github.com/valyala/fastjson" "github.com/valyala/fastjson/fastfloat" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) -var baseEventKeys = map[string]struct{}{ - "timestamp": {}, "eventType": {}, +// Rows contains rows parsed from NewRelic Event request +// +// See https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events +type Rows struct { + Rows []Row } -type tagsBuffer struct { - tags []Tag +// Reset resets r, so it can be re-used +func (r *Rows) Reset() { + rows := r.Rows + for i := range rows { + rows[i].reset() + } + r.Rows = rows[:0] } -var tagsPool = sync.Pool{ - New: func() interface{} { - return &tagsBuffer{tags: make([]Tag, 0)} - }, -} +var jsonParserPool fastjson.ParserPool -// NewRelic agent sends next struct to the collector -// MetricPost entity item for the HTTP post to be sent to the ingest service. -// type MetricPost struct { -// ExternalKeys []string `json:"ExternalKeys,omitempty"` -// EntityID uint64 `json:"EntityID,omitempty"` -// IsAgent bool `json:"IsAgent"` -// Events []json.RawMessage `json:"Events"` -// // Entity ID of the reporting agent, which will = EntityID when IsAgent == true. -// // The field is required in the backend for host metadata matching of the remote entities -// ReportingAgentID uint64 `json:"ReportingAgentID,omitempty"` -// } -// We are using only Events field because it contains all needed metrics +// Unmarshal parses NewRelic Event request from b to r. +// +// b can be re-used after returning from r. +func (r *Rows) Unmarshal(b []byte) error { + p := jsonParserPool.Get() + defer jsonParserPool.Put(p) -// Events represents Metrics collected from NewRelic MetricPost request -// https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events -type Events struct { - Metrics []Metric -} - -// Unmarshal takes fastjson.Value and collects Metrics -func (e *Events) Unmarshal(v []*fastjson.Value) error { - for _, value := range v { - events := value.Get("Events") - if events == nil { - return fmt.Errorf("got empty Events array from request") - } - eventsArr, err := events.Array() + r.Reset() + v, err := p.ParseBytes(b) + if err != nil { + return err + } + metricPosts, err := v.Array() + if err != nil { + return fmt.Errorf("cannot find the top-level array of MetricPost objects: %w", err) + } + for _, mp := range metricPosts { + o, err := mp.Object() if err != nil { - return fmt.Errorf("error collect events: %s", err) + return fmt.Errorf("cannot find MetricPost object: %w", err) } - - for _, event := range eventsArr { - metricData, err := event.Object() + rows := r.Rows + o.Visit(func(k []byte, v *fastjson.Value) { if err != nil { - return fmt.Errorf("error get metric data: %s", err) + return } - var m Metric - metrics, err := m.unmarshal(metricData) - if err != nil { - return fmt.Errorf("error collect metrics from Newrelic json: %s", err) + switch string(k) { + case "Events": + events, errLocal := v.Array() + if errLocal != nil { + err = fmt.Errorf("cannot find Events array in MetricPost object: %w", errLocal) + return + } + for _, e := range events { + eventObject, errLocal := e.Object() + if errLocal != nil { + err = fmt.Errorf("cannot find EventObject: %w", errLocal) + return + } + if cap(rows) > len(rows) { + rows = rows[:len(rows)+1] + } else { + rows = append(rows, Row{}) + } + r := &rows[len(rows)-1] + if errLocal := r.unmarshal(eventObject); errLocal != nil { + err = fmt.Errorf("cannot unmarshal EventObject: %w", errLocal) + return + } + } } - e.Metrics = append(e.Metrics, metrics...) + }) + r.Rows = rows + if err != nil { + return fmt.Errorf("cannot parse MetricPost object: %w", err) } } - return nil } -// Metric represents VictoriaMetrics metrics -type Metric struct { - Timestamp int64 +// Row represents parsed row +type Row struct { Tags []Tag - Metric string - Value float64 + Samples []Sample + Timestamp int64 } -func (m *Metric) unmarshal(o *fastjson.Object) ([]Metric, error) { - m.reset() +// Tag represents a key=value tag +type Tag struct { + Key []byte + Value []byte +} - tgsBuffer := tagsPool.Get().(*tagsBuffer) - defer func() { - tgsBuffer.tags = tgsBuffer.tags[:0] - tagsPool.Put(tgsBuffer) - }() +// Sample represents parsed sample +type Sample struct { + Name []byte + Value float64 +} - metrics := make([]Metric, 0, o.Len()) - rawTs := o.Get("timestamp") - if rawTs != nil { - ts, err := getFloat64(rawTs) +func (r *Row) reset() { + tags := r.Tags + for i := range tags { + tags[i].reset() + } + r.Tags = tags[:0] + + samples := r.Samples + for i := range samples { + samples[i].reset() + } + r.Samples = samples[:0] + + r.Timestamp = 0 +} + +func (t *Tag) reset() { + t.Key = t.Key[:0] + t.Value = t.Value[:0] +} + +func (s *Sample) reset() { + s.Name = s.Name[:0] + s.Value = 0 +} + +func (r *Row) unmarshal(o *fastjson.Object) (err error) { + r.reset() + tags := r.Tags[:0] + samples := r.Samples[:0] + o.Visit(func(k []byte, v *fastjson.Value) { if err != nil { - return nil, fmt.Errorf("invalid `timestamp` in %s: %w", o, err) - } - m.Timestamp = int64(ts * 1e3) - } else { - // Allow missing timestamp. It should be automatically populated - // with the current time by the caller. - m.Timestamp = 0 - } - - eventType := o.Get("eventType") - if eventType == nil { - return nil, fmt.Errorf("error get eventType from Events object: %s", o) - } - prefix := bytesutil.ToUnsafeString(eventType.GetStringBytes()) - prefix = camelToSnakeCase(prefix) - - o.Visit(func(key []byte, v *fastjson.Value) { - - k := bytesutil.ToUnsafeString(key) - // skip base event keys which should have been parsed before this - if _, ok := baseEventKeys[k]; ok { return } - + if len(k) == 0 { + return + } switch v.Type() { case fastjson.TypeString: - // this is label-value pair - value := v.Get() - if value == nil { - logger.Errorf("failed to get label value from NewRelic json: %s", v) + // Register new tag + valueBytes := v.GetStringBytes() + if len(valueBytes) == 0 { return } - name := camelToSnakeCase(k) - val := bytesutil.ToUnsafeString(value.GetStringBytes()) - tgsBuffer.tags = append(tgsBuffer.tags, Tag{Key: name, Value: val}) + if cap(tags) > len(tags) { + tags = tags[:len(tags)+1] + } else { + tags = append(tags, Tag{}) + } + t := &tags[len(tags)-1] + t.Key = append(t.Key[:0], k...) + t.Value = append(t.Value[:0], valueBytes...) case fastjson.TypeNumber: - // this is metric name with value - metricName := camelToSnakeCase(k) - if prefix != "" { - metricName = fmt.Sprintf("%s_%s", prefix, metricName) - } - f, err := getFloat64(v) - if err != nil { - logger.Errorf("failed to get value for NewRelic metric %q: %w", k, err) + if string(k) == "timestamp" { + // Parse timestamp + ts, errLocal := getFloat64(v) + if errLocal != nil { + err = fmt.Errorf("cannot parse `timestamp` field: %w", errLocal) + return + } + if ts < (1 << 32) { + // The timestamp is in seconds. Convert it to milliseconds. + ts *= 1e3 + } + r.Timestamp = int64(ts) return } - metrics = append(metrics, Metric{Metric: metricName, Value: f}) - default: - // unknown type - logger.Errorf("got unsupported NewRelic json %s field type: %s", v, v.Type()) - return + // Register new sample + if cap(samples) > len(samples) { + samples = samples[:len(samples)+1] + } else { + samples = append(samples, Sample{}) + } + s := &samples[len(samples)-1] + s.Name = append(s.Name[:0], k...) + s.Value = v.GetFloat64() } }) - - for i := range metrics { - metrics[i].Timestamp = m.Timestamp - metrics[i].Tags = tgsBuffer.tags - } - - return metrics, nil -} - -func (m *Metric) reset() { - m.Timestamp = 0 - m.Tags = nil - m.Metric = "" - m.Value = 0 -} - -// Tag is an NewRelic tag. -type Tag struct { - Key string - Value string -} - -func camelToSnakeCase(str string) string { - str = promrelabel.SanitizeLabelName(str) - length := len(str) - snakeCase := make([]byte, 0, length*2) - tokens := make([]byte, 0, length) - var allTokensUpper bool - - flush := func(tokens []byte) { - for _, c := range tokens { - snakeCase = append(snakeCase, byte(unicode.ToLower(rune(c)))) - } - } - - for i := 0; i < length; i++ { - char := str[i] - if unicode.IsUpper(rune(char)) { - switch { - case len(tokens) == 0: - allTokensUpper = true - tokens = append(tokens, char) - case allTokensUpper: - tokens = append(tokens, char) - default: - flush(tokens) - snakeCase = append(snakeCase, '_') - tokens = tokens[:0] - tokens = append(tokens, char) - allTokensUpper = true - } - continue - } - - switch { - case len(tokens) == 1: - tokens = append(tokens, char) - allTokensUpper = false - case allTokensUpper: - tail := tokens[:len(tokens)-1] - last := tokens[len(tokens)-1:] - flush(tail) - snakeCase = append(snakeCase, '_') - tokens = tokens[:0] - tokens = append(tokens, last...) - tokens = append(tokens, char) - allTokensUpper = false - default: - tokens = append(tokens, char) - } - } - - if len(tokens) > 0 { - flush(tokens) - } - s := bytesutil.ToUnsafeString(snakeCase) - return s + r.Tags = tags + r.Samples = samples + return err } func getFloat64(v *fastjson.Value) (float64, error) { diff --git a/lib/protoparser/newrelic/parser_test.go b/lib/protoparser/newrelic/parser_test.go index 2daded2dd..f3f3e499b 100644 --- a/lib/protoparser/newrelic/parser_test.go +++ b/lib/protoparser/newrelic/parser_test.go @@ -1,49 +1,103 @@ package newrelic import ( + "fmt" "reflect" "strings" "testing" - - "github.com/valyala/fastjson" ) -func TestEvents_Unmarshal(t *testing.T) { - tests := []struct { - name string - metrics []Metric - json string - wantErr bool - }{ +func TestRowsUnmarshalFailure(t *testing.T) { + f := func(data string) { + t.Helper() + + var r Rows + if err := r.Unmarshal([]byte(data)); err == nil { + t.Fatalf("expecting non-nil error") + } + } + + // Empty JSON + f("") + + // Invalid JSON + f("123") + f("[foo]") + f(`{"foo":123}`) +} + +func TestRowsUnmarshalSuccess(t *testing.T) { + f := func(data string, expectedRows []Row) { + t.Helper() + + var r Rows + if err := r.Unmarshal([]byte(data)); err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(r.Rows, expectedRows) { + t.Fatalf("unexpected rows parsed\ngot\n%s\nwant\n%s", rowsToString(r.Rows), rowsToString(expectedRows)) + } + } + + // empty array + f(`[]`, nil) + + // zero events + f(`[ + { + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[], + "ReportingAgentID":28257883748326179 + }]`, nil) + + // A single event + f(`[{ + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "dc": "1", + "diskWritesPerSecond":-34.21, + "uptime":762376 + } + ], + "ReportingAgentID":28257883748326179 + }]`, []Row{ { - name: "empty json", - metrics: []Metric{}, - json: "", - wantErr: true, - }, - { - name: "json with correct data", - metrics: []Metric{ + Tags: []Tag{ { - Timestamp: 1690286061000, - Tags: []Tag{ - {Key: "entity_key", Value: "macbook-pro.local"}, - {Key: "dc", Value: "1"}, - }, - Metric: "system_sample_disk_writes_per_second", - Value: 0, + Key: []byte("eventType"), + Value: []byte("SystemSample"), }, { - Timestamp: 1690286061000, - Tags: []Tag{ - {Key: "entity_key", Value: "macbook-pro.local"}, - {Key: "dc", Value: "1"}, - }, - Metric: "system_sample_uptime", - Value: 762376, + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), }, }, - json: `[ + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, + }, + }) + + // Multiple events + f(`[ { "EntityID":28257883748326179, "IsAgent":true, @@ -52,123 +106,124 @@ func TestEvents_Unmarshal(t *testing.T) { "eventType":"SystemSample", "timestamp":1690286061, "entityKey":"macbook-pro.local", - "dc": "1", - "diskWritesPerSecond":0, + "dc": "1", + "diskWritesPerSecond":-34.21, "uptime":762376 } ], "ReportingAgentID":28257883748326179 - } - ]`, - wantErr: false, - }, - { - name: "empty array in json", - metrics: []Metric{}, - json: `[]`, - wantErr: false, - }, - { - name: "empty events in json", - metrics: []Metric{}, - json: `[ + }, { - "EntityID":28257883748326179, + "EntityID":282579, "IsAgent":true, - "Events":[], - "ReportingAgentID":28257883748326179 + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "diskWritesPerSecond":234.34, + "timestamp":1690286061.433, + "uptime":762376 + }, + { + "eventType":"ProcessSample", + "timestamp":1690286061987, + "uptime":1236 + } + ], + "ReportingAgentID":2879 } - ]`, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := &Events{Metrics: []Metric{}} - - value, err := fastjson.Parse(tt.json) - if (err != nil) != tt.wantErr { - t.Errorf("cannot parse json error: %s", err) - } - - if value != nil { - v, err := value.Array() - if err != nil { - t.Errorf("cannot get array from json") - } - if err := e.Unmarshal(v); (err != nil) != tt.wantErr { - t.Errorf("Unmarshal() error = %v, wantErr %v", err, tt.wantErr) - } - if !reflect.DeepEqual(e.Metrics, tt.metrics) { - t.Errorf("got metrics => %v; expected = %v", e.Metrics, tt.metrics) - } - } - }) - } -} - -func Test_camelToSnakeCase(t *testing.T) { - tests := []struct { - name string - str string - want string - }{ + ]`, []Row{ { - name: "empty string", - str: "", - want: "", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), + }, + }, + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, }, { - name: "lowercase all chars", - str: "somenewstring", - want: "somenewstring", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + }, + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: 234.34, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061433, }, { - name: "first letter uppercase", - str: "Teststring", - want: "teststring", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("ProcessSample"), + }, + }, + Samples: []Sample{ + { + Name: []byte("uptime"), + Value: 1236, + }, + }, + Timestamp: 1690286061987, }, - { - name: "two uppercase letters", - str: "TestString", - want: "test_string", - }, - { - name: "first and last uppercase letters", - str: "TeststrinG", - want: "teststrin_g", - }, - { - name: "three letters uppercase", - str: "TestStrinG", - want: "test_strin_g", - }, - { - name: "has many upper case letters", - str: "ProgressIOTime", - want: "progress_io_time", - }, - { - name: "last all uppercase letters", - str: "ProgressTSDB", - want: "progress_tsdb", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := camelToSnakeCase(tt.str); got != tt.want { - t.Errorf("camelToSnakeCase() = %v, want %v", got, tt.want) - } - }) - } -} - -func BenchmarkCameToSnake(b *testing.B) { - b.ReportAllocs() - str := strings.Repeat("ProgressIOTime", 20) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - camelToSnakeCase(str) - } }) + +} + +func rowsToString(rows []Row) string { + var a []string + for _, row := range rows { + s := row.String() + a = append(a, s) + } + return strings.Join(a, "\n") +} + +func (r *Row) String() string { + var a []string + for _, t := range r.Tags { + s := fmt.Sprintf("%s=%q", t.Key, t.Value) + a = append(a, s) + } + tagsString := "{" + strings.Join(a, ",") + "}" + a = a[:0] + for _, sample := range r.Samples { + s := fmt.Sprintf("[%s %f]", sample.Name, sample.Value) + a = append(a, s) + } + samplesString := strings.Join(a, ",") + return fmt.Sprintf("tags=%s, samples=%s, timestamp=%d", tagsString, samplesString, r.Timestamp) } diff --git a/lib/protoparser/newrelic/parser_timing_test.go b/lib/protoparser/newrelic/parser_timing_test.go index 94e168a66..7a5641271 100644 --- a/lib/protoparser/newrelic/parser_timing_test.go +++ b/lib/protoparser/newrelic/parser_timing_test.go @@ -1,13 +1,12 @@ package newrelic import ( + "fmt" "testing" - - "github.com/valyala/fastjson" ) -func BenchmarkRequestUnmarshal(b *testing.B) { - reqBody := `[ +func BenchmarkRowsUnmarshal(b *testing.B) { + reqBody := []byte(`[ { "EntityID":28257883748326179, "IsAgent":true, @@ -52,25 +51,17 @@ func BenchmarkRequestUnmarshal(b *testing.B) { ], "ReportingAgentID":28257883748326179 } - ]` + ]`) b.SetBytes(int64(len(reqBody))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { - value, err := fastjson.Parse(reqBody) - if err != nil { - b.Errorf("cannot parse json error: %s", err) - } - v, err := value.Array() - if err != nil { - b.Errorf("cannot get array from json") - } + var r Rows for pb.Next() { - e := &Events{Metrics: []Metric{}} - if err := e.Unmarshal(v); err != nil { - b.Errorf("Unmarshal() error = %v", err) + if err := r.Unmarshal(reqBody); err != nil { + panic(fmt.Errorf("unmarshal error: %s", err)) } - if len(e.Metrics) == 0 { - b.Errorf("metrics should have at least one element") + if len(r.Rows) != 1 { + panic(fmt.Errorf("unexpected number of items unmarshaled; got %d; want %d", len(r.Rows), 1)) } } }) diff --git a/lib/protoparser/newrelic/stream/push_context.go b/lib/protoparser/newrelic/stream/push_context.go deleted file mode 100644 index 4dbb7707c..000000000 --- a/lib/protoparser/newrelic/stream/push_context.go +++ /dev/null @@ -1,80 +0,0 @@ -package stream - -import ( - "bufio" - "fmt" - "io" - "sync" - - "github.com/VictoriaMetrics/metrics" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" -) - -var ( - maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic POST request to /infra/v2/metrics/events/bulk") -) - -var ( - readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`) - readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`) - unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) -) - -var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) - -type pushCtx struct { - br *bufio.Reader - reqBuf bytesutil.ByteBuffer -} - -func (ctx *pushCtx) Read() error { - readCalls.Inc() - lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1) - startTime := fasttime.UnixTimestamp() - reqLen, err := ctx.reqBuf.ReadFrom(lr) - if err != nil { - readErrors.Inc() - return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) - } - if reqLen > maxInsertRequestSize.N { - readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) - } - return nil -} - -func (ctx *pushCtx) reset() { - ctx.br.Reset(nil) - ctx.reqBuf.Reset() -} - -func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: - ctx.br.Reset(r) - return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } - } -} - -func putPushCtx(ctx *pushCtx) { - ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } -} diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go index 880212444..ee2887829 100644 --- a/lib/protoparser/newrelic/stream/streamparser.go +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -1,23 +1,31 @@ package stream import ( + "bufio" "fmt" "io" + "sync" - "github.com/valyala/fastjson" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" ) -var parserPool fastjson.ParserPool +var ( + maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic request "+ + "to /newrelic/infra/v2/metrics/events/bulk") +) -// Parse parses NewRelic POST request for newrelic/infra/v2/metrics/events/bulk from reader and calls callback for the parsed request. +// Parse parses NewRelic POST request for /newrelic/infra/v2/metrics/events/bulk from r and calls callback for the parsed request. // -// callback shouldn't hold series after returning. -func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) error) error { +// callback shouldn't hold rows after returning. +func Parse(r io.Reader, isGzip bool, callback func(rows []newrelic.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -25,7 +33,7 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err if isGzip { zr, err := common.GetGzipReader(r) if err != nil { - return fmt.Errorf("cannot read gzipped Newrelic agent data: %w", err) + return fmt.Errorf("cannot read gzipped NewRelic agent data: %w", err) } defer common.PutGzipReader(zr) r = zr @@ -34,40 +42,104 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err ctx := getPushCtx(r) defer putPushCtx(ctx) if err := ctx.Read(); err != nil { - return err + return fmt.Errorf("cannot read NewRelic request: %w", err) } - p := parserPool.Get() - defer parserPool.Put(p) + rows := getRows() + defer putRows(rows) - v, err := p.ParseBytes(ctx.reqBuf.B) - if err != nil { - return fmt.Errorf("cannot parse NewRelic POST request with size %d bytes: %w", len(ctx.reqBuf.B), err) - } - - metricsPost, err := v.Array() - if err != nil { - return fmt.Errorf("cannot fetch data from Newrelic POST request: %w", err) - } - - var events newrelic.Events - - if err := events.Unmarshal(metricsPost); err != nil { + if err := rows.Unmarshal(ctx.reqBuf.B); err != nil { unmarshalErrors.Inc() - return fmt.Errorf("cannot unmarshal NewRelic POST request: %w", err) + return fmt.Errorf("cannot unmarshal NewRelic request: %w", err) } // Fill in missing timestamps currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range events.Metrics { - m := &events.Metrics[i] - if m.Timestamp == 0 { - m.Timestamp = currentTimestamp * 1e3 + for i := range rows.Rows { + r := &rows.Rows[i] + if r.Timestamp == 0 { + r.Timestamp = currentTimestamp * 1e3 } } - if err := callback(events.Metrics); err != nil { + if err := callback(rows.Rows); err != nil { return fmt.Errorf("error when processing imported data: %w", err) } return nil } + +func getRows() *newrelic.Rows { + v := rowsPool.Get() + if v == nil { + return &newrelic.Rows{} + } + return v.(*newrelic.Rows) +} + +func putRows(rows *newrelic.Rows) { + rows.Reset() + rowsPool.Put(rows) +} + +var rowsPool sync.Pool + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) +) + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) + +type pushCtx struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer +} + +func (ctx *pushCtx) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1) + startTime := fasttime.UnixTimestamp() + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + if reqLen > maxInsertRequestSize.N { + readErrors.Inc() + return fmt.Errorf("too big request; mustn't exceed -newrelic.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) + } + return nil +} + +func (ctx *pushCtx) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() +} + +func getPushCtx(r io.Reader) *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), + } + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} diff --git a/lib/protoparser/newrelic/stream/streamparser_test.go b/lib/protoparser/newrelic/stream/streamparser_test.go new file mode 100644 index 000000000..4f9b3ab97 --- /dev/null +++ b/lib/protoparser/newrelic/stream/streamparser_test.go @@ -0,0 +1,106 @@ +package stream + +import ( + "bytes" + "compress/gzip" + "fmt" + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" +) + +func TestParseFailure(t *testing.T) { + f := func(req string) { + t.Helper() + + callback := func(rows []newrelic.Row) error { + panic(fmt.Errorf("unexpected call into callback")) + } + r := bytes.NewReader([]byte(req)) + if err := Parse(r, false, callback); err == nil { + t.Fatalf("expecting non-empty error") + } + } + f("") + f("foo") + f("{}") + f("[1,2,3]") +} + +func TestParseSuccess(t *testing.T) { + f := func(req string, expectedRows []newrelic.Row) { + t.Helper() + + callback := func(rows []newrelic.Row) error { + if !reflect.DeepEqual(rows, expectedRows) { + return fmt.Errorf("unexpected rows\ngot\n%v\nwant\n%v", rows, expectedRows) + } + return nil + } + + // Parse from uncompressed reader + r := bytes.NewReader([]byte(req)) + if err := Parse(r, false, callback); err != nil { + t.Fatalf("unexpected error when parsing uncompressed request: %s", err) + } + + var bb bytes.Buffer + zw := gzip.NewWriter(&bb) + if _, err := zw.Write([]byte(req)); err != nil { + t.Fatalf("cannot compress request: %s", err) + } + if err := zw.Close(); err != nil { + t.Fatalf("cannot close compressed writer: %s", err) + } + if err := Parse(&bb, true, callback); err != nil { + t.Fatalf("unexpected error when parsing compressed request: %s", err) + } + } + + f("[]", nil) + f(`[{"Events":[]}]`, nil) + f(`[{ + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "dc": "1", + "diskWritesPerSecond":-34.21, + "uptime":762376 + } + ], + "ReportingAgentID":28257883748326179 +}]`, []newrelic.Row{ + { + Tags: []newrelic.Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), + }, + }, + Samples: []newrelic.Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, + }, + }) +} diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index ee01b7681..4b751b1ad 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -96,7 +96,7 @@ func (ctx *pushCtx) Read() error { } if reqLen > int64(maxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + return fmt.Errorf("too big packed request; mustn't exceed -maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) } return nil }