From 2c334ed95374762292c8dc2508bce8db56505021 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 16 Oct 2023 00:25:23 +0200 Subject: [PATCH] app/{vmagent,vminsert}: follow-up for NewRelic data ingestion protocol support This is a follow-up for f60c08a7bdcc87570d1076a3f4520637d3a6ea0f Changes: - Make sure all the urls related to NewRelic protocol start from /newrelic . Previously some urls were started from /api/v1/newrelic - Remove /api/v1 part from NewRelic urls, since it has no sense - Remove automatic transformation from CamelCase to snake_case for NewRelic labels and metric names, since it may complicate the transition from NewRelic to VictoriaMetrics. Preserve all the metric names and label names, so users could query metrics and labels by the same names which are used in NewRelic. The automatic transformation from CamelCase to snake_case can be added later as a special action for relabeling rules if needed. - Properly update per-tenant data ingestion stats at app/vmagent/newrelic/request_handler.go . Previously it was always zero. - Fix NewRelic urls in vmagent when multitenant data ingestion is enabled. Previously they were mistakenly started from `/`. - Document NewRelic data ingestion url at docs/Cluster-VictoriaMetrics.md - Remove superflouos memory allocations at lib/protoparser/newrelic - Improve tests at lib/protoparser/newrelic/* Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712 --- README.md | 101 +++--- app/vmagent/datadog/request_handler.go | 8 +- app/vmagent/main.go | 20 +- app/vmagent/newrelic/request_handler.go | 55 +-- app/vminsert/main.go | 14 +- app/vminsert/newrelic/request_handler.go | 64 ++-- docs/Cluster-VictoriaMetrics.md | 1 + docs/README.md | 101 +++--- docs/Single-server-VictoriaMetrics.md | 101 +++--- .../datadog/stream/streamparser.go | 4 +- lib/protoparser/newrelic/parser.go | 330 ++++++++---------- lib/protoparser/newrelic/parser_test.go | 329 +++++++++-------- .../newrelic/parser_timing_test.go | 27 +- .../newrelic/stream/push_context.go | 80 ----- .../newrelic/stream/streamparser.go | 128 +++++-- .../newrelic/stream/streamparser_test.go | 106 ++++++ .../promremotewrite/stream/streamparser.go | 2 +- 17 files changed, 804 insertions(+), 667 deletions(-) delete mode 100644 lib/protoparser/newrelic/stream/push_context.go create mode 100644 lib/protoparser/newrelic/stream/streamparser_test.go diff --git a/README.md b/README.md index 8610f19ddf..aee7f29822 100644 --- a/README.md +++ b/README.md @@ -854,71 +854,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] ``` -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): + ```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} ``` -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. - - ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadog/request_handler.go index 056d3bcb0b..b7cb1a2dc9 100644 --- a/app/vmagent/datadog/request_handler.go +++ b/app/vmagent/datadog/request_handler.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -29,12 +29,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { return err } ce := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, ce, func(series []parser.Series) error { + return stream.Parse(req.Body, ce, func(series []datadog.Series) error { return insertRows(at, series, extraLabels) }) } -func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -63,7 +63,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars }) } for _, tag := range ss.Tags { - name, value := parser.SplitTag(tag) + name, value := datadog.SplitTag(tag) if name == "host" { name = "exported_host" } diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 5be243ac53..6b042f10ab 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -320,19 +320,19 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "/newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "/newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "/newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil { newrelicWriteErrors.Inc() @@ -543,19 +543,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(at, r); err != nil { newrelicWriteErrors.Inc() @@ -638,11 +638,11 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) - newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) - newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`) promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`) diff --git a/app/vmagent/newrelic/request_handler.go b/app/vmagent/newrelic/request_handler.go index 3784764055..b164fc9733 100644 --- a/app/vmagent/newrelic/request_handler.go +++ b/app/vmagent/newrelic/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" @@ -29,42 +30,48 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { } ce := req.Header.Get("Content-Encoding") isGzip := ce == "gzip" - return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { - return insertRows(at, series, extraLabels) + return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error { + return insertRows(at, rows, extraLabels) }) } -func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) - rowsTotal := 0 + samplesCount := 0 tssDst := ctx.WriteRequest.Timeseries[:0] labels := ctx.Labels[:0] samples := ctx.Samples[:0] for i := range rows { r := &rows[i] - labelsLen := len(labels) - labels = append(labels, prompbmarshal.Label{ - Name: "__name__", - Value: r.Metric, - }) - for j := range r.Tags { - tag := &r.Tags[j] + tags := r.Tags + srcSamples := r.Samples + for j := range srcSamples { + s := &srcSamples[j] + labelsLen := len(labels) labels = append(labels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, + Name: "__name__", + Value: bytesutil.ToUnsafeString(s.Name), }) + for k := range tags { + t := &tags[k] + labels = append(labels, prompbmarshal.Label{ + Name: bytesutil.ToUnsafeString(t.Key), + Value: bytesutil.ToUnsafeString(t.Value), + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: s.Value, + Timestamp: r.Timestamp, + }) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[len(samples)-1:], + }) + labels = append(labels, extraLabels...) } - samples = append(samples, prompbmarshal.Sample{ - Value: r.Value, - Timestamp: r.Timestamp, - }) - tssDst = append(tssDst, prompbmarshal.TimeSeries{ - Labels: labels[labelsLen:], - Samples: samples[len(samples)-1:], - }) - labels = append(labels, extraLabels...) + samplesCount += len(srcSamples) } ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels @@ -72,8 +79,8 @@ func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmars remotewrite.Push(at, &ctx.WriteRequest) rowsInserted.Add(len(rows)) if at != nil { - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.Get(at).Add(samplesCount) } - rowsPerInsert.Update(float64(len(rows))) + rowsPerInsert.Update(float64(samplesCount)) return nil } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 1e993ae379..a53f8216d6 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -222,19 +222,19 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusOK) return true - case "/newrelic/api/v1": + case "/newrelic": newrelicCheckRequest.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"status":"ok"}`) return true - case "/newrelic/api/v1/inventory/deltas": + case "/newrelic/inventory/deltas": newrelicInventoryRequests.Inc() w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) return true - case "/newrelic/api/v1/infra/v2/metrics/events/bulk": + case "/newrelic/infra/v2/metrics/events/bulk": newrelicWriteRequests.Inc() if err := newrelic.InsertHandlerForHTTP(r); err != nil { newrelicWriteErrors.Inc() @@ -382,11 +382,11 @@ var ( opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) - newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) + newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`) - newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) - newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) + newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`) + newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`) promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`) diff --git a/app/vminsert/newrelic/request_handler.go b/app/vminsert/newrelic/request_handler.go index ae8530d56a..226e97075e 100644 --- a/app/vminsert/newrelic/request_handler.go +++ b/app/vminsert/newrelic/request_handler.go @@ -18,7 +18,7 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="newrelic"}`) ) -// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request. +// InsertHandlerForHTTP processes remote write for request to /newrelic/infra/v2/metrics/events/bulk request. func InsertHandlerForHTTP(req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { @@ -26,42 +26,52 @@ func InsertHandlerForHTTP(req *http.Request) error { } ce := req.Header.Get("Content-Encoding") isGzip := ce == "gzip" - return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { - return insertRows(series, extraLabels) + return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error { + return insertRows(rows, extraLabels) }) } -func insertRows(rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { +func insertRows(rows []newrelic.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) - ctx.Reset(len(rows)) + samplesCount := 0 + for i := range rows { + samplesCount += len(rows[i].Samples) + } + ctx.Reset(samplesCount) + hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] - ctx.Labels = ctx.Labels[:0] - ctx.AddLabel("", r.Metric) - for j := range r.Tags { - tag := &r.Tags[j] - ctx.AddLabel(tag.Key, tag.Value) - } - for j := range extraLabels { - label := &extraLabels[j] - ctx.AddLabel(label.Name, label.Value) - } - if hasRelabeling { - ctx.ApplyRelabeling() - } - if len(ctx.Labels) == 0 { - // Skip metric without labels. - continue - } - ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { - return err + samples := r.Samples + for j := range samples { + s := &samples[j] + + ctx.Labels = ctx.Labels[:0] + ctx.AddLabelBytes(nil, s.Name) + for k := range r.Tags { + t := &r.Tags[k] + ctx.AddLabelBytes(t.Key, t.Value) + } + for k := range extraLabels { + label := &extraLabels[k] + ctx.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, s.Value); err != nil { + return err + } } } - rowsInserted.Add(len(rows)) - rowsPerInsert.Update(float64(len(rows))) + rowsInserted.Add(samplesCount) + rowsPerInsert.Update(float64(samplesCount)) return ctx.FlushBufs() } diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 64cbcbd17b..3e836f06db 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -356,6 +356,7 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr - `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry). - `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. + - `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details. - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. - URLs for [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://:8481/select//prometheus/`, where: diff --git a/docs/README.md b/docs/README.md index 399d426ac9..a889e61e48 100644 --- a/docs/README.md +++ b/docs/README.md @@ -857,71 +857,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] ``` -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): + ```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} ``` -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. - - ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index d036f0ebc4..d725de0a4a 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -865,71 +865,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all ## How to send data from NewRelic agent VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) -at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. -NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) -which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). +at `/newrelic/infra/v2/metrics/events/bulk` HTTP path. +VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database. -NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. -It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. +You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics. +The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key, +which can be obtained [here](https://newrelic.com/signup). +For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent: -To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example: ```console -COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra +COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra ``` -### NewRelic agent data mapping +### NewRelic agent data mapping + +VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) +to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way: + +1. Every numeric field is converted into a raw sample with the corresponding name. +1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels). +1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time). + If the `timestamp` field is missing, then the raw sample is stored with the current timestamp. + +For example, let's import the following NewRelic Events request to VictoriaMetrics: -As example, lets create `newrelic.json` file with the following content: ```json [ - { - "Events":[ - { - "eventType":"SystemSample", - "entityKey":"macbook-pro.local", - "cpuPercent":25.056660790748904, - "cpuUserPercent":8.687987912389374, - "cpuSystemPercent":16.36867287835953, - "cpuIOWaitPercent":0, - "cpuIdlePercent":74.94333920925109, - "cpuStealPercent":0, - "loadAverageOneMinute":5.42333984375, - "loadAverageFiveMinute":4.099609375, - "loadAverageFifteenMinute":3.58203125 - } - ] - } - ] + { + "Events":[ + { + "eventType":"SystemSample", + "entityKey":"macbook-pro.local", + "cpuPercent":25.056660790748904, + "cpuUserPercent":8.687987912389374, + "cpuSystemPercent":16.36867287835953, + "cpuIOWaitPercent":0, + "cpuIdlePercent":74.94333920925109, + "cpuStealPercent":0, + "loadAverageOneMinute":5.42333984375, + "loadAverageFiveMinute":4.099609375, + "loadAverageFifteenMinute":3.58203125 + } + ] + } +] ``` -Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: +Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics: ```console -curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk +curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk ``` -If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics -in vmui via query `{__name__!=""}`: +Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format): + ```console -system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 -system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 -system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 -system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 -system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 -system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 -system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 -system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 +curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}' +{"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]} +{"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]} +{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]} ``` -The fields in `newrelic.json` are transformed in the following way: -1. `eventType` filed is used as prefix for all metrics in the object; -2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object; -3. the rest fields with numeric values will be used as metrics; -4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted, -current time is used. - - ## Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): diff --git a/lib/protoparser/datadog/stream/streamparser.go b/lib/protoparser/datadog/stream/streamparser.go index a657c56189..5a6c7e04f7 100644 --- a/lib/protoparser/datadog/stream/streamparser.go +++ b/lib/protoparser/datadog/stream/streamparser.go @@ -99,11 +99,11 @@ func (ctx *pushCtx) Read() error { reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() - return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) } if reqLen > int64(maxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) } return nil } diff --git a/lib/protoparser/newrelic/parser.go b/lib/protoparser/newrelic/parser.go index e6901949fa..41706880ca 100644 --- a/lib/protoparser/newrelic/parser.go +++ b/lib/protoparser/newrelic/parser.go @@ -2,231 +2,191 @@ package newrelic import ( "fmt" - "sync" - "unicode" "github.com/valyala/fastjson" "github.com/valyala/fastjson/fastfloat" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) -var baseEventKeys = map[string]struct{}{ - "timestamp": {}, "eventType": {}, +// Rows contains rows parsed from NewRelic Event request +// +// See https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events +type Rows struct { + Rows []Row } -type tagsBuffer struct { - tags []Tag +// Reset resets r, so it can be re-used +func (r *Rows) Reset() { + rows := r.Rows + for i := range rows { + rows[i].reset() + } + r.Rows = rows[:0] } -var tagsPool = sync.Pool{ - New: func() interface{} { - return &tagsBuffer{tags: make([]Tag, 0)} - }, -} +var jsonParserPool fastjson.ParserPool -// NewRelic agent sends next struct to the collector -// MetricPost entity item for the HTTP post to be sent to the ingest service. -// type MetricPost struct { -// ExternalKeys []string `json:"ExternalKeys,omitempty"` -// EntityID uint64 `json:"EntityID,omitempty"` -// IsAgent bool `json:"IsAgent"` -// Events []json.RawMessage `json:"Events"` -// // Entity ID of the reporting agent, which will = EntityID when IsAgent == true. -// // The field is required in the backend for host metadata matching of the remote entities -// ReportingAgentID uint64 `json:"ReportingAgentID,omitempty"` -// } -// We are using only Events field because it contains all needed metrics +// Unmarshal parses NewRelic Event request from b to r. +// +// b can be re-used after returning from r. +func (r *Rows) Unmarshal(b []byte) error { + p := jsonParserPool.Get() + defer jsonParserPool.Put(p) -// Events represents Metrics collected from NewRelic MetricPost request -// https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events -type Events struct { - Metrics []Metric -} - -// Unmarshal takes fastjson.Value and collects Metrics -func (e *Events) Unmarshal(v []*fastjson.Value) error { - for _, value := range v { - events := value.Get("Events") - if events == nil { - return fmt.Errorf("got empty Events array from request") - } - eventsArr, err := events.Array() + r.Reset() + v, err := p.ParseBytes(b) + if err != nil { + return err + } + metricPosts, err := v.Array() + if err != nil { + return fmt.Errorf("cannot find the top-level array of MetricPost objects: %w", err) + } + for _, mp := range metricPosts { + o, err := mp.Object() if err != nil { - return fmt.Errorf("error collect events: %s", err) + return fmt.Errorf("cannot find MetricPost object: %w", err) } - - for _, event := range eventsArr { - metricData, err := event.Object() + rows := r.Rows + o.Visit(func(k []byte, v *fastjson.Value) { if err != nil { - return fmt.Errorf("error get metric data: %s", err) + return } - var m Metric - metrics, err := m.unmarshal(metricData) - if err != nil { - return fmt.Errorf("error collect metrics from Newrelic json: %s", err) + switch string(k) { + case "Events": + events, errLocal := v.Array() + if errLocal != nil { + err = fmt.Errorf("cannot find Events array in MetricPost object: %w", errLocal) + return + } + for _, e := range events { + eventObject, errLocal := e.Object() + if errLocal != nil { + err = fmt.Errorf("cannot find EventObject: %w", errLocal) + return + } + if cap(rows) > len(rows) { + rows = rows[:len(rows)+1] + } else { + rows = append(rows, Row{}) + } + r := &rows[len(rows)-1] + if errLocal := r.unmarshal(eventObject); errLocal != nil { + err = fmt.Errorf("cannot unmarshal EventObject: %w", errLocal) + return + } + } } - e.Metrics = append(e.Metrics, metrics...) + }) + r.Rows = rows + if err != nil { + return fmt.Errorf("cannot parse MetricPost object: %w", err) } } - return nil } -// Metric represents VictoriaMetrics metrics -type Metric struct { - Timestamp int64 +// Row represents parsed row +type Row struct { Tags []Tag - Metric string - Value float64 + Samples []Sample + Timestamp int64 } -func (m *Metric) unmarshal(o *fastjson.Object) ([]Metric, error) { - m.reset() +// Tag represents a key=value tag +type Tag struct { + Key []byte + Value []byte +} - tgsBuffer := tagsPool.Get().(*tagsBuffer) - defer func() { - tgsBuffer.tags = tgsBuffer.tags[:0] - tagsPool.Put(tgsBuffer) - }() +// Sample represents parsed sample +type Sample struct { + Name []byte + Value float64 +} - metrics := make([]Metric, 0, o.Len()) - rawTs := o.Get("timestamp") - if rawTs != nil { - ts, err := getFloat64(rawTs) +func (r *Row) reset() { + tags := r.Tags + for i := range tags { + tags[i].reset() + } + r.Tags = tags[:0] + + samples := r.Samples + for i := range samples { + samples[i].reset() + } + r.Samples = samples[:0] + + r.Timestamp = 0 +} + +func (t *Tag) reset() { + t.Key = t.Key[:0] + t.Value = t.Value[:0] +} + +func (s *Sample) reset() { + s.Name = s.Name[:0] + s.Value = 0 +} + +func (r *Row) unmarshal(o *fastjson.Object) (err error) { + r.reset() + tags := r.Tags[:0] + samples := r.Samples[:0] + o.Visit(func(k []byte, v *fastjson.Value) { if err != nil { - return nil, fmt.Errorf("invalid `timestamp` in %s: %w", o, err) - } - m.Timestamp = int64(ts * 1e3) - } else { - // Allow missing timestamp. It should be automatically populated - // with the current time by the caller. - m.Timestamp = 0 - } - - eventType := o.Get("eventType") - if eventType == nil { - return nil, fmt.Errorf("error get eventType from Events object: %s", o) - } - prefix := bytesutil.ToUnsafeString(eventType.GetStringBytes()) - prefix = camelToSnakeCase(prefix) - - o.Visit(func(key []byte, v *fastjson.Value) { - - k := bytesutil.ToUnsafeString(key) - // skip base event keys which should have been parsed before this - if _, ok := baseEventKeys[k]; ok { return } - + if len(k) == 0 { + return + } switch v.Type() { case fastjson.TypeString: - // this is label-value pair - value := v.Get() - if value == nil { - logger.Errorf("failed to get label value from NewRelic json: %s", v) + // Register new tag + valueBytes := v.GetStringBytes() + if len(valueBytes) == 0 { return } - name := camelToSnakeCase(k) - val := bytesutil.ToUnsafeString(value.GetStringBytes()) - tgsBuffer.tags = append(tgsBuffer.tags, Tag{Key: name, Value: val}) + if cap(tags) > len(tags) { + tags = tags[:len(tags)+1] + } else { + tags = append(tags, Tag{}) + } + t := &tags[len(tags)-1] + t.Key = append(t.Key[:0], k...) + t.Value = append(t.Value[:0], valueBytes...) case fastjson.TypeNumber: - // this is metric name with value - metricName := camelToSnakeCase(k) - if prefix != "" { - metricName = fmt.Sprintf("%s_%s", prefix, metricName) - } - f, err := getFloat64(v) - if err != nil { - logger.Errorf("failed to get value for NewRelic metric %q: %w", k, err) + if string(k) == "timestamp" { + // Parse timestamp + ts, errLocal := getFloat64(v) + if errLocal != nil { + err = fmt.Errorf("cannot parse `timestamp` field: %w", errLocal) + return + } + if ts < (1 << 32) { + // The timestamp is in seconds. Convert it to milliseconds. + ts *= 1e3 + } + r.Timestamp = int64(ts) return } - metrics = append(metrics, Metric{Metric: metricName, Value: f}) - default: - // unknown type - logger.Errorf("got unsupported NewRelic json %s field type: %s", v, v.Type()) - return + // Register new sample + if cap(samples) > len(samples) { + samples = samples[:len(samples)+1] + } else { + samples = append(samples, Sample{}) + } + s := &samples[len(samples)-1] + s.Name = append(s.Name[:0], k...) + s.Value = v.GetFloat64() } }) - - for i := range metrics { - metrics[i].Timestamp = m.Timestamp - metrics[i].Tags = tgsBuffer.tags - } - - return metrics, nil -} - -func (m *Metric) reset() { - m.Timestamp = 0 - m.Tags = nil - m.Metric = "" - m.Value = 0 -} - -// Tag is an NewRelic tag. -type Tag struct { - Key string - Value string -} - -func camelToSnakeCase(str string) string { - str = promrelabel.SanitizeLabelName(str) - length := len(str) - snakeCase := make([]byte, 0, length*2) - tokens := make([]byte, 0, length) - var allTokensUpper bool - - flush := func(tokens []byte) { - for _, c := range tokens { - snakeCase = append(snakeCase, byte(unicode.ToLower(rune(c)))) - } - } - - for i := 0; i < length; i++ { - char := str[i] - if unicode.IsUpper(rune(char)) { - switch { - case len(tokens) == 0: - allTokensUpper = true - tokens = append(tokens, char) - case allTokensUpper: - tokens = append(tokens, char) - default: - flush(tokens) - snakeCase = append(snakeCase, '_') - tokens = tokens[:0] - tokens = append(tokens, char) - allTokensUpper = true - } - continue - } - - switch { - case len(tokens) == 1: - tokens = append(tokens, char) - allTokensUpper = false - case allTokensUpper: - tail := tokens[:len(tokens)-1] - last := tokens[len(tokens)-1:] - flush(tail) - snakeCase = append(snakeCase, '_') - tokens = tokens[:0] - tokens = append(tokens, last...) - tokens = append(tokens, char) - allTokensUpper = false - default: - tokens = append(tokens, char) - } - } - - if len(tokens) > 0 { - flush(tokens) - } - s := bytesutil.ToUnsafeString(snakeCase) - return s + r.Tags = tags + r.Samples = samples + return err } func getFloat64(v *fastjson.Value) (float64, error) { diff --git a/lib/protoparser/newrelic/parser_test.go b/lib/protoparser/newrelic/parser_test.go index 2daded2dd8..f3f3e499b4 100644 --- a/lib/protoparser/newrelic/parser_test.go +++ b/lib/protoparser/newrelic/parser_test.go @@ -1,49 +1,103 @@ package newrelic import ( + "fmt" "reflect" "strings" "testing" - - "github.com/valyala/fastjson" ) -func TestEvents_Unmarshal(t *testing.T) { - tests := []struct { - name string - metrics []Metric - json string - wantErr bool - }{ +func TestRowsUnmarshalFailure(t *testing.T) { + f := func(data string) { + t.Helper() + + var r Rows + if err := r.Unmarshal([]byte(data)); err == nil { + t.Fatalf("expecting non-nil error") + } + } + + // Empty JSON + f("") + + // Invalid JSON + f("123") + f("[foo]") + f(`{"foo":123}`) +} + +func TestRowsUnmarshalSuccess(t *testing.T) { + f := func(data string, expectedRows []Row) { + t.Helper() + + var r Rows + if err := r.Unmarshal([]byte(data)); err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(r.Rows, expectedRows) { + t.Fatalf("unexpected rows parsed\ngot\n%s\nwant\n%s", rowsToString(r.Rows), rowsToString(expectedRows)) + } + } + + // empty array + f(`[]`, nil) + + // zero events + f(`[ + { + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[], + "ReportingAgentID":28257883748326179 + }]`, nil) + + // A single event + f(`[{ + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "dc": "1", + "diskWritesPerSecond":-34.21, + "uptime":762376 + } + ], + "ReportingAgentID":28257883748326179 + }]`, []Row{ { - name: "empty json", - metrics: []Metric{}, - json: "", - wantErr: true, - }, - { - name: "json with correct data", - metrics: []Metric{ + Tags: []Tag{ { - Timestamp: 1690286061000, - Tags: []Tag{ - {Key: "entity_key", Value: "macbook-pro.local"}, - {Key: "dc", Value: "1"}, - }, - Metric: "system_sample_disk_writes_per_second", - Value: 0, + Key: []byte("eventType"), + Value: []byte("SystemSample"), }, { - Timestamp: 1690286061000, - Tags: []Tag{ - {Key: "entity_key", Value: "macbook-pro.local"}, - {Key: "dc", Value: "1"}, - }, - Metric: "system_sample_uptime", - Value: 762376, + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), }, }, - json: `[ + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, + }, + }) + + // Multiple events + f(`[ { "EntityID":28257883748326179, "IsAgent":true, @@ -52,123 +106,124 @@ func TestEvents_Unmarshal(t *testing.T) { "eventType":"SystemSample", "timestamp":1690286061, "entityKey":"macbook-pro.local", - "dc": "1", - "diskWritesPerSecond":0, + "dc": "1", + "diskWritesPerSecond":-34.21, "uptime":762376 } ], "ReportingAgentID":28257883748326179 - } - ]`, - wantErr: false, - }, - { - name: "empty array in json", - metrics: []Metric{}, - json: `[]`, - wantErr: false, - }, - { - name: "empty events in json", - metrics: []Metric{}, - json: `[ + }, { - "EntityID":28257883748326179, + "EntityID":282579, "IsAgent":true, - "Events":[], - "ReportingAgentID":28257883748326179 + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "diskWritesPerSecond":234.34, + "timestamp":1690286061.433, + "uptime":762376 + }, + { + "eventType":"ProcessSample", + "timestamp":1690286061987, + "uptime":1236 + } + ], + "ReportingAgentID":2879 } - ]`, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := &Events{Metrics: []Metric{}} - - value, err := fastjson.Parse(tt.json) - if (err != nil) != tt.wantErr { - t.Errorf("cannot parse json error: %s", err) - } - - if value != nil { - v, err := value.Array() - if err != nil { - t.Errorf("cannot get array from json") - } - if err := e.Unmarshal(v); (err != nil) != tt.wantErr { - t.Errorf("Unmarshal() error = %v, wantErr %v", err, tt.wantErr) - } - if !reflect.DeepEqual(e.Metrics, tt.metrics) { - t.Errorf("got metrics => %v; expected = %v", e.Metrics, tt.metrics) - } - } - }) - } -} - -func Test_camelToSnakeCase(t *testing.T) { - tests := []struct { - name string - str string - want string - }{ + ]`, []Row{ { - name: "empty string", - str: "", - want: "", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), + }, + }, + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, }, { - name: "lowercase all chars", - str: "somenewstring", - want: "somenewstring", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + }, + Samples: []Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: 234.34, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061433, }, { - name: "first letter uppercase", - str: "Teststring", - want: "teststring", + Tags: []Tag{ + { + Key: []byte("eventType"), + Value: []byte("ProcessSample"), + }, + }, + Samples: []Sample{ + { + Name: []byte("uptime"), + Value: 1236, + }, + }, + Timestamp: 1690286061987, }, - { - name: "two uppercase letters", - str: "TestString", - want: "test_string", - }, - { - name: "first and last uppercase letters", - str: "TeststrinG", - want: "teststrin_g", - }, - { - name: "three letters uppercase", - str: "TestStrinG", - want: "test_strin_g", - }, - { - name: "has many upper case letters", - str: "ProgressIOTime", - want: "progress_io_time", - }, - { - name: "last all uppercase letters", - str: "ProgressTSDB", - want: "progress_tsdb", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := camelToSnakeCase(tt.str); got != tt.want { - t.Errorf("camelToSnakeCase() = %v, want %v", got, tt.want) - } - }) - } -} - -func BenchmarkCameToSnake(b *testing.B) { - b.ReportAllocs() - str := strings.Repeat("ProgressIOTime", 20) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - camelToSnakeCase(str) - } }) + +} + +func rowsToString(rows []Row) string { + var a []string + for _, row := range rows { + s := row.String() + a = append(a, s) + } + return strings.Join(a, "\n") +} + +func (r *Row) String() string { + var a []string + for _, t := range r.Tags { + s := fmt.Sprintf("%s=%q", t.Key, t.Value) + a = append(a, s) + } + tagsString := "{" + strings.Join(a, ",") + "}" + a = a[:0] + for _, sample := range r.Samples { + s := fmt.Sprintf("[%s %f]", sample.Name, sample.Value) + a = append(a, s) + } + samplesString := strings.Join(a, ",") + return fmt.Sprintf("tags=%s, samples=%s, timestamp=%d", tagsString, samplesString, r.Timestamp) } diff --git a/lib/protoparser/newrelic/parser_timing_test.go b/lib/protoparser/newrelic/parser_timing_test.go index 94e168a66d..7a56412715 100644 --- a/lib/protoparser/newrelic/parser_timing_test.go +++ b/lib/protoparser/newrelic/parser_timing_test.go @@ -1,13 +1,12 @@ package newrelic import ( + "fmt" "testing" - - "github.com/valyala/fastjson" ) -func BenchmarkRequestUnmarshal(b *testing.B) { - reqBody := `[ +func BenchmarkRowsUnmarshal(b *testing.B) { + reqBody := []byte(`[ { "EntityID":28257883748326179, "IsAgent":true, @@ -52,25 +51,17 @@ func BenchmarkRequestUnmarshal(b *testing.B) { ], "ReportingAgentID":28257883748326179 } - ]` + ]`) b.SetBytes(int64(len(reqBody))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { - value, err := fastjson.Parse(reqBody) - if err != nil { - b.Errorf("cannot parse json error: %s", err) - } - v, err := value.Array() - if err != nil { - b.Errorf("cannot get array from json") - } + var r Rows for pb.Next() { - e := &Events{Metrics: []Metric{}} - if err := e.Unmarshal(v); err != nil { - b.Errorf("Unmarshal() error = %v", err) + if err := r.Unmarshal(reqBody); err != nil { + panic(fmt.Errorf("unmarshal error: %s", err)) } - if len(e.Metrics) == 0 { - b.Errorf("metrics should have at least one element") + if len(r.Rows) != 1 { + panic(fmt.Errorf("unexpected number of items unmarshaled; got %d; want %d", len(r.Rows), 1)) } } }) diff --git a/lib/protoparser/newrelic/stream/push_context.go b/lib/protoparser/newrelic/stream/push_context.go deleted file mode 100644 index 4dbb7707cf..0000000000 --- a/lib/protoparser/newrelic/stream/push_context.go +++ /dev/null @@ -1,80 +0,0 @@ -package stream - -import ( - "bufio" - "fmt" - "io" - "sync" - - "github.com/VictoriaMetrics/metrics" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" -) - -var ( - maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic POST request to /infra/v2/metrics/events/bulk") -) - -var ( - readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`) - readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`) - unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) -) - -var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) - -type pushCtx struct { - br *bufio.Reader - reqBuf bytesutil.ByteBuffer -} - -func (ctx *pushCtx) Read() error { - readCalls.Inc() - lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1) - startTime := fasttime.UnixTimestamp() - reqLen, err := ctx.reqBuf.ReadFrom(lr) - if err != nil { - readErrors.Inc() - return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) - } - if reqLen > maxInsertRequestSize.N { - readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) - } - return nil -} - -func (ctx *pushCtx) reset() { - ctx.br.Reset(nil) - ctx.reqBuf.Reset() -} - -func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: - ctx.br.Reset(r) - return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } - } -} - -func putPushCtx(ctx *pushCtx) { - ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } -} diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go index 8802124443..ee28878295 100644 --- a/lib/protoparser/newrelic/stream/streamparser.go +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -1,23 +1,31 @@ package stream import ( + "bufio" "fmt" "io" + "sync" - "github.com/valyala/fastjson" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" ) -var parserPool fastjson.ParserPool +var ( + maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic request "+ + "to /newrelic/infra/v2/metrics/events/bulk") +) -// Parse parses NewRelic POST request for newrelic/infra/v2/metrics/events/bulk from reader and calls callback for the parsed request. +// Parse parses NewRelic POST request for /newrelic/infra/v2/metrics/events/bulk from r and calls callback for the parsed request. // -// callback shouldn't hold series after returning. -func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) error) error { +// callback shouldn't hold rows after returning. +func Parse(r io.Reader, isGzip bool, callback func(rows []newrelic.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -25,7 +33,7 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err if isGzip { zr, err := common.GetGzipReader(r) if err != nil { - return fmt.Errorf("cannot read gzipped Newrelic agent data: %w", err) + return fmt.Errorf("cannot read gzipped NewRelic agent data: %w", err) } defer common.PutGzipReader(zr) r = zr @@ -34,40 +42,104 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err ctx := getPushCtx(r) defer putPushCtx(ctx) if err := ctx.Read(); err != nil { - return err + return fmt.Errorf("cannot read NewRelic request: %w", err) } - p := parserPool.Get() - defer parserPool.Put(p) + rows := getRows() + defer putRows(rows) - v, err := p.ParseBytes(ctx.reqBuf.B) - if err != nil { - return fmt.Errorf("cannot parse NewRelic POST request with size %d bytes: %w", len(ctx.reqBuf.B), err) - } - - metricsPost, err := v.Array() - if err != nil { - return fmt.Errorf("cannot fetch data from Newrelic POST request: %w", err) - } - - var events newrelic.Events - - if err := events.Unmarshal(metricsPost); err != nil { + if err := rows.Unmarshal(ctx.reqBuf.B); err != nil { unmarshalErrors.Inc() - return fmt.Errorf("cannot unmarshal NewRelic POST request: %w", err) + return fmt.Errorf("cannot unmarshal NewRelic request: %w", err) } // Fill in missing timestamps currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range events.Metrics { - m := &events.Metrics[i] - if m.Timestamp == 0 { - m.Timestamp = currentTimestamp * 1e3 + for i := range rows.Rows { + r := &rows.Rows[i] + if r.Timestamp == 0 { + r.Timestamp = currentTimestamp * 1e3 } } - if err := callback(events.Metrics); err != nil { + if err := callback(rows.Rows); err != nil { return fmt.Errorf("error when processing imported data: %w", err) } return nil } + +func getRows() *newrelic.Rows { + v := rowsPool.Get() + if v == nil { + return &newrelic.Rows{} + } + return v.(*newrelic.Rows) +} + +func putRows(rows *newrelic.Rows) { + rows.Reset() + rowsPool.Put(rows) +} + +var rowsPool sync.Pool + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`) + unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) +) + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) + +type pushCtx struct { + br *bufio.Reader + reqBuf bytesutil.ByteBuffer +} + +func (ctx *pushCtx) Read() error { + readCalls.Inc() + lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1) + startTime := fasttime.UnixTimestamp() + reqLen, err := ctx.reqBuf.ReadFrom(lr) + if err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + if reqLen > maxInsertRequestSize.N { + readErrors.Inc() + return fmt.Errorf("too big request; mustn't exceed -newrelic.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) + } + return nil +} + +func (ctx *pushCtx) reset() { + ctx.br.Reset(nil) + ctx.reqBuf.Reset() +} + +func getPushCtx(r io.Reader) *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + ctx.br.Reset(r) + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) + ctx.br.Reset(r) + return ctx + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), + } + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} diff --git a/lib/protoparser/newrelic/stream/streamparser_test.go b/lib/protoparser/newrelic/stream/streamparser_test.go new file mode 100644 index 0000000000..4f9b3ab97c --- /dev/null +++ b/lib/protoparser/newrelic/stream/streamparser_test.go @@ -0,0 +1,106 @@ +package stream + +import ( + "bytes" + "compress/gzip" + "fmt" + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" +) + +func TestParseFailure(t *testing.T) { + f := func(req string) { + t.Helper() + + callback := func(rows []newrelic.Row) error { + panic(fmt.Errorf("unexpected call into callback")) + } + r := bytes.NewReader([]byte(req)) + if err := Parse(r, false, callback); err == nil { + t.Fatalf("expecting non-empty error") + } + } + f("") + f("foo") + f("{}") + f("[1,2,3]") +} + +func TestParseSuccess(t *testing.T) { + f := func(req string, expectedRows []newrelic.Row) { + t.Helper() + + callback := func(rows []newrelic.Row) error { + if !reflect.DeepEqual(rows, expectedRows) { + return fmt.Errorf("unexpected rows\ngot\n%v\nwant\n%v", rows, expectedRows) + } + return nil + } + + // Parse from uncompressed reader + r := bytes.NewReader([]byte(req)) + if err := Parse(r, false, callback); err != nil { + t.Fatalf("unexpected error when parsing uncompressed request: %s", err) + } + + var bb bytes.Buffer + zw := gzip.NewWriter(&bb) + if _, err := zw.Write([]byte(req)); err != nil { + t.Fatalf("cannot compress request: %s", err) + } + if err := zw.Close(); err != nil { + t.Fatalf("cannot close compressed writer: %s", err) + } + if err := Parse(&bb, true, callback); err != nil { + t.Fatalf("unexpected error when parsing compressed request: %s", err) + } + } + + f("[]", nil) + f(`[{"Events":[]}]`, nil) + f(`[{ + "EntityID":28257883748326179, + "IsAgent":true, + "Events":[ + { + "eventType":"SystemSample", + "timestamp":1690286061, + "entityKey":"macbook-pro.local", + "dc": "1", + "diskWritesPerSecond":-34.21, + "uptime":762376 + } + ], + "ReportingAgentID":28257883748326179 +}]`, []newrelic.Row{ + { + Tags: []newrelic.Tag{ + { + Key: []byte("eventType"), + Value: []byte("SystemSample"), + }, + { + Key: []byte("entityKey"), + Value: []byte("macbook-pro.local"), + }, + { + Key: []byte("dc"), + Value: []byte("1"), + }, + }, + Samples: []newrelic.Sample{ + { + Name: []byte("diskWritesPerSecond"), + Value: -34.21, + }, + { + Name: []byte("uptime"), + Value: 762376, + }, + }, + Timestamp: 1690286061000, + }, + }) +} diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index ee01b76818..4b751b1ad5 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -96,7 +96,7 @@ func (ctx *pushCtx) Read() error { } if reqLen > int64(maxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + return fmt.Errorf("too big packed request; mustn't exceed -maxInsertRequestSize=%d bytes", maxInsertRequestSize.N) } return nil }