app/{vmagent,vminsert}: follow-up for NewRelic data ingestion protocol support

This is a follow-up for f60c08a7bd

Changes:

- Make sure all the urls related to NewRelic protocol start from /newrelic . Previously some urls were started from /api/v1/newrelic

- Remove /api/v1 part from NewRelic urls, since it has no sense

- Remove automatic transformation from CamelCase to snake_case for NewRelic labels and metric names,
  since it may complicate the transition from NewRelic to VictoriaMetrics. Preserve all the metric names and label names,
  so users could query metrics and labels by the same names which are used in NewRelic.
  The automatic transformation from CamelCase to snake_case can be added later as a special action for relabeling rules if needed.

- Properly update per-tenant data ingestion stats at app/vmagent/newrelic/request_handler.go . Previously it was always zero.

- Fix NewRelic urls in vmagent when multitenant data ingestion is enabled. Previously they were mistakenly started from `/`.

- Document NewRelic data ingestion url at docs/Cluster-VictoriaMetrics.md

- Remove superflouos memory allocations at lib/protoparser/newrelic

- Improve tests at lib/protoparser/newrelic/*

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712
This commit is contained in:
Aliaksandr Valialkin 2023-10-16 00:25:23 +02:00
parent ddbe713470
commit 2c334ed953
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
17 changed files with 804 additions and 667 deletions

101
README.md
View file

@ -854,71 +854,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all
## How to send data from NewRelic agent ## How to send data from NewRelic agent
VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent)
at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. at `/newrelic/infra/v2/metrics/events/bulk` HTTP path.
NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples)
according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database.
NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics.
It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key,
which can be obtained [here](https://newrelic.com/signup).
For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent:
To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example:
```console ```console
COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra
``` ```
### NewRelic agent data mapping ### NewRelic agent data mapping
VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way:
1. Every numeric field is converted into a raw sample with the corresponding name.
1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels).
1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time).
If the `timestamp` field is missing, then the raw sample is stored with the current timestamp.
For example, let's import the following NewRelic Events request to VictoriaMetrics:
As example, lets create `newrelic.json` file with the following content:
```json ```json
[ [
{ {
"Events":[ "Events":[
{ {
"eventType":"SystemSample", "eventType":"SystemSample",
"entityKey":"macbook-pro.local", "entityKey":"macbook-pro.local",
"cpuPercent":25.056660790748904, "cpuPercent":25.056660790748904,
"cpuUserPercent":8.687987912389374, "cpuUserPercent":8.687987912389374,
"cpuSystemPercent":16.36867287835953, "cpuSystemPercent":16.36867287835953,
"cpuIOWaitPercent":0, "cpuIOWaitPercent":0,
"cpuIdlePercent":74.94333920925109, "cpuIdlePercent":74.94333920925109,
"cpuStealPercent":0, "cpuStealPercent":0,
"loadAverageOneMinute":5.42333984375, "loadAverageOneMinute":5.42333984375,
"loadAverageFiveMinute":4.099609375, "loadAverageFiveMinute":4.099609375,
"loadAverageFifteenMinute":3.58203125 "loadAverageFifteenMinute":3.58203125
} }
] ]
} }
] ]
``` ```
Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics:
```console ```console
curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk
``` ```
If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format):
in vmui via query `{__name__!=""}`:
```console ```console
system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}'
system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 {"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 {"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]}
system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 {"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 {"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]}
system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 {"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]}
system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 {"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]}
system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 {"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]}
system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 {"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]}
``` ```
The fields in `newrelic.json` are transformed in the following way:
1. `eventType` filed is used as prefix for all metrics in the object;
2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object;
3. the rest fields with numeric values will be used as metrics;
4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted,
current time is used.
## Prometheus querying API usage ## Prometheus querying API usage
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):

View file

@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -29,12 +29,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
return err return err
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []parser.Series) error { return stream.Parse(req.Body, ce, func(series []datadog.Series) error {
return insertRows(at, series, extraLabels) return insertRows(at, series, extraLabels)
}) })
} }
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, series []datadog.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)
@ -63,7 +63,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars
}) })
} }
for _, tag := range ss.Tags { for _, tag := range ss.Tags {
name, value := parser.SplitTag(tag) name, value := datadog.SplitTag(tag)
if name == "host" { if name == "host" {
name = "exported_host" name = "exported_host"
} }

View file

@ -320,19 +320,19 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
return true return true
case "/newrelic/api/v1": case "/newrelic":
newrelicCheckRequest.Inc() newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`) fmt.Fprintf(w, `{"status":"ok"}`)
return true return true
case "/newrelic/api/v1/inventory/deltas": case "/newrelic/inventory/deltas":
newrelicInventoryRequests.Inc() newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk": case "/newrelic/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc() newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil { if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil {
newrelicWriteErrors.Inc() newrelicWriteErrors.Inc()
@ -543,19 +543,19 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
return true return true
case "/newrelic/api/v1": case "newrelic":
newrelicCheckRequest.Inc() newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`) fmt.Fprintf(w, `{"status":"ok"}`)
return true return true
case "/newrelic/api/v1/inventory/deltas": case "newrelic/inventory/deltas":
newrelicInventoryRequests.Inc() newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk": case "newrelic/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc() newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(at, r); err != nil { if err := newrelic.InsertHandlerForHTTP(at, r); err != nil {
newrelicWriteErrors.Inc() newrelicWriteErrors.Inc()
@ -638,11 +638,11 @@ var (
opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`)
promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`)

View file

@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
@ -29,42 +30,48 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
isGzip := ce == "gzip" isGzip := ce == "gzip"
return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error {
return insertRows(at, series, extraLabels) return insertRows(at, rows, extraLabels)
}) })
} }
func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, rows []newrelic.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)
rowsTotal := 0 samplesCount := 0
tssDst := ctx.WriteRequest.Timeseries[:0] tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0] labels := ctx.Labels[:0]
samples := ctx.Samples[:0] samples := ctx.Samples[:0]
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
labelsLen := len(labels) tags := r.Tags
labels = append(labels, prompbmarshal.Label{ srcSamples := r.Samples
Name: "__name__", for j := range srcSamples {
Value: r.Metric, s := &srcSamples[j]
}) labelsLen := len(labels)
for j := range r.Tags {
tag := &r.Tags[j]
labels = append(labels, prompbmarshal.Label{ labels = append(labels, prompbmarshal.Label{
Name: tag.Key, Name: "__name__",
Value: tag.Value, Value: bytesutil.ToUnsafeString(s.Name),
}) })
for k := range tags {
t := &tags[k]
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(t.Key),
Value: bytesutil.ToUnsafeString(t.Value),
})
}
samples = append(samples, prompbmarshal.Sample{
Value: s.Value,
Timestamp: r.Timestamp,
})
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
})
labels = append(labels, extraLabels...)
} }
samples = append(samples, prompbmarshal.Sample{ samplesCount += len(srcSamples)
Value: r.Value,
Timestamp: r.Timestamp,
})
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
})
labels = append(labels, extraLabels...)
} }
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
@ -72,8 +79,8 @@ func insertRows(at *auth.Token, rows []newrelic.Metric, extraLabels []prompbmars
remotewrite.Push(at, &ctx.WriteRequest) remotewrite.Push(at, &ctx.WriteRequest)
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
if at != nil { if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal) rowsTenantInserted.Get(at).Add(samplesCount)
} }
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(samplesCount))
return nil return nil
} }

View file

@ -222,19 +222,19 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
return true return true
case "/newrelic/api/v1": case "/newrelic":
newrelicCheckRequest.Inc() newrelicCheckRequest.Inc()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`) fmt.Fprintf(w, `{"status":"ok"}`)
return true return true
case "/newrelic/api/v1/inventory/deltas": case "/newrelic/inventory/deltas":
newrelicInventoryRequests.Inc() newrelicInventoryRequests.Inc()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`) fmt.Fprintf(w, `{"payload":{"version": 1, "state": {}, "reset": "false"}}`)
return true return true
case "/newrelic/api/v1/infra/v2/metrics/events/bulk": case "/newrelic/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc() newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(r); err != nil { if err := newrelic.InsertHandlerForHTTP(r); err != nil {
newrelicWriteErrors.Inc() newrelicWriteErrors.Inc()
@ -382,11 +382,11 @@ var (
opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushRequests = metrics.NewCounter(`vm_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`) opentelemetryPushErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) newrelicWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/api/v1/infra/v2/metrics/events/bulk", protocol="newrelic"}`) newrelicWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/newrelic/infra/v2/metrics/events/bulk", protocol="newrelic"}`)
newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1/inventory/deltas", protocol="newrelic"}`) newrelicInventoryRequests = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/inventory/deltas", protocol="newrelic"}`)
newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic/api/v1", protocol="newrelic"}`) newrelicCheckRequest = metrics.NewCounter(`vm_http_requests_total{path="/newrelic", protocol="newrelic"}`)
promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`) promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vm_http_requests_total{path="/service-discovery"}`)

View file

@ -18,7 +18,7 @@ var (
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="newrelic"}`) rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="newrelic"}`)
) )
// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request. // InsertHandlerForHTTP processes remote write for request to /newrelic/infra/v2/metrics/events/bulk request.
func InsertHandlerForHTTP(req *http.Request) error { func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req) extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil { if err != nil {
@ -26,42 +26,52 @@ func InsertHandlerForHTTP(req *http.Request) error {
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
isGzip := ce == "gzip" isGzip := ce == "gzip"
return stream.Parse(req.Body, isGzip, func(series []newrelic.Metric) error { return stream.Parse(req.Body, isGzip, func(rows []newrelic.Row) error {
return insertRows(series, extraLabels) return insertRows(rows, extraLabels)
}) })
} }
func insertRows(rows []newrelic.Metric, extraLabels []prompbmarshal.Label) error { func insertRows(rows []newrelic.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx() ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx) defer common.PutInsertCtx(ctx)
ctx.Reset(len(rows)) samplesCount := 0
for i := range rows {
samplesCount += len(rows[i].Samples)
}
ctx.Reset(samplesCount)
hasRelabeling := relabel.HasRelabeling() hasRelabeling := relabel.HasRelabeling()
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
ctx.Labels = ctx.Labels[:0] samples := r.Samples
ctx.AddLabel("", r.Metric) for j := range samples {
for j := range r.Tags { s := &samples[j]
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value) ctx.Labels = ctx.Labels[:0]
} ctx.AddLabelBytes(nil, s.Name)
for j := range extraLabels { for k := range r.Tags {
label := &extraLabels[j] t := &r.Tags[k]
ctx.AddLabel(label.Name, label.Value) ctx.AddLabelBytes(t.Key, t.Value)
} }
if hasRelabeling { for k := range extraLabels {
ctx.ApplyRelabeling() label := &extraLabels[k]
} ctx.AddLabel(label.Name, label.Value)
if len(ctx.Labels) == 0 { }
// Skip metric without labels. if hasRelabeling {
continue ctx.ApplyRelabeling()
} }
ctx.SortLabelsIfNeeded() if len(ctx.Labels) == 0 {
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { // Skip metric without labels.
return err continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, s.Value); err != nil {
return err
}
} }
} }
rowsInserted.Add(len(rows)) rowsInserted.Add(samplesCount)
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(samplesCount))
return ctx.FlushBufs() return ctx.FlushBufs()
} }

View file

@ -356,6 +356,7 @@ Check practical examples of VictoriaMetrics API [here](https://docs.victoriametr
- `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry). - `opentelemetry/api/v1/push` - for ingesting data via [OpenTelemetry protocol for metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/ffddc289462dfe0c2041e3ca42a7b1df805706de/specification/metrics/data-model.md). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-data-via-opentelemetry).
- `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. - `datadog/api/v1/series` - for ingesting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
- `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. - `influx/write` and `influx/api/v2/write` - for ingesting data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
- `newrelic/infra/v2/metrics/events/bulk` - for accepting data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent) for details.
- `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details. - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#sending-opentsdb-data-via-http-apiput-requests) for details.
- URLs for [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where: - URLs for [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): `http://<vmselect>:8481/select/<accountID>/prometheus/<suffix>`, where:

View file

@ -857,71 +857,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all
## How to send data from NewRelic agent ## How to send data from NewRelic agent
VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent)
at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. at `/newrelic/infra/v2/metrics/events/bulk` HTTP path.
NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples)
according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database.
NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics.
It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key,
which can be obtained [here](https://newrelic.com/signup).
For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent:
To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example:
```console ```console
COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra
``` ```
### NewRelic agent data mapping ### NewRelic agent data mapping
VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way:
1. Every numeric field is converted into a raw sample with the corresponding name.
1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels).
1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time).
If the `timestamp` field is missing, then the raw sample is stored with the current timestamp.
For example, let's import the following NewRelic Events request to VictoriaMetrics:
As example, lets create `newrelic.json` file with the following content:
```json ```json
[ [
{ {
"Events":[ "Events":[
{ {
"eventType":"SystemSample", "eventType":"SystemSample",
"entityKey":"macbook-pro.local", "entityKey":"macbook-pro.local",
"cpuPercent":25.056660790748904, "cpuPercent":25.056660790748904,
"cpuUserPercent":8.687987912389374, "cpuUserPercent":8.687987912389374,
"cpuSystemPercent":16.36867287835953, "cpuSystemPercent":16.36867287835953,
"cpuIOWaitPercent":0, "cpuIOWaitPercent":0,
"cpuIdlePercent":74.94333920925109, "cpuIdlePercent":74.94333920925109,
"cpuStealPercent":0, "cpuStealPercent":0,
"loadAverageOneMinute":5.42333984375, "loadAverageOneMinute":5.42333984375,
"loadAverageFiveMinute":4.099609375, "loadAverageFiveMinute":4.099609375,
"loadAverageFifteenMinute":3.58203125 "loadAverageFifteenMinute":3.58203125
} }
] ]
} }
] ]
``` ```
Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics:
```console ```console
curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk
``` ```
If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format):
in vmui via query `{__name__!=""}`:
```console ```console
system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}'
system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 {"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 {"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]}
system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 {"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 {"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]}
system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 {"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]}
system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 {"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]}
system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 {"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]}
system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 {"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]}
``` ```
The fields in `newrelic.json` are transformed in the following way:
1. `eventType` filed is used as prefix for all metrics in the object;
2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object;
3. the rest fields with numeric values will be used as metrics;
4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted,
current time is used.
## Prometheus querying API usage ## Prometheus querying API usage
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):

View file

@ -865,71 +865,76 @@ For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all
## How to send data from NewRelic agent ## How to send data from NewRelic agent
VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent) VictoriaMetrics accepts data from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent)
at `/api/v1/newrelic/infra/v2/metrics/events/bulk` path. at `/newrelic/infra/v2/metrics/events/bulk` HTTP path.
NewRelic's infrastructure agent sends so-called [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events) VictoriaMetrics receives [Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
which then transformed by VictoriaMetrics to the [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). from NewRelic agent at the given path, transforms them to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples)
according to [these docs](#newrelic-agent-data-mapping) before storing the raw samples to the database.
NewRelic's infrastructure agent allows configuring destinations for metrics forwarding via ENV variable `COLLECTOR_URL`. You need passing `COLLECTOR_URL` and `NRIA_LICENSE_KEY` environment variables to NewRelic infrastructure agent in order to send the collected metrics to VictoriaMetrics.
It is also required to specify `NRIA_LICENSE_KEY`, which is available only after registration into account of the NewRelic cloud. The `COLLECTOR_URL` must point to `/newrelic` HTTP endpoint at VictoriaMetrics, while the `NRIA_LICENSE_KEY` must contain NewRelic license key,
which can be obtained [here](https://newrelic.com/signup).
For example, if VictoriaMetrics runs at `localhost:8428`, then the following command can be used for running NewRelic infrastructure agent:
To configure NewRelic infrastructure agent for forwarding metrics to VictoriaMetrics use the following example:
```console ```console
COLLECTOR_URL="http://localhost:8428/newrelic/api/v1" NRIA_LICENSE_KEY="YOUR_LICENSE_KEY" ./newrelic-infra COLLECTOR_URL="http://localhost:8428/newrelic" NRIA_LICENSE_KEY="NEWRELIC_LICENSE_KEY" ./newrelic-infra
``` ```
### NewRelic agent data mapping ### NewRelic agent data mapping
VictoriaMetrics maps [NewRelic Events](https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events)
to [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in the following way:
1. Every numeric field is converted into a raw sample with the corresponding name.
1. The `eventType` and all the other fields with `string` value type are attached to every raw sample as [metric labels](https://docs.victoriametrics.com/keyConcepts.html#labels).
1. The `timestamp` field is used as timestamp for the ingested [raw sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
The `timestamp` field may be specified either in seconds or in milliseconds since the [Unix Epoch](https://en.wikipedia.org/wiki/Unix_time).
If the `timestamp` field is missing, then the raw sample is stored with the current timestamp.
For example, let's import the following NewRelic Events request to VictoriaMetrics:
As example, lets create `newrelic.json` file with the following content:
```json ```json
[ [
{ {
"Events":[ "Events":[
{ {
"eventType":"SystemSample", "eventType":"SystemSample",
"entityKey":"macbook-pro.local", "entityKey":"macbook-pro.local",
"cpuPercent":25.056660790748904, "cpuPercent":25.056660790748904,
"cpuUserPercent":8.687987912389374, "cpuUserPercent":8.687987912389374,
"cpuSystemPercent":16.36867287835953, "cpuSystemPercent":16.36867287835953,
"cpuIOWaitPercent":0, "cpuIOWaitPercent":0,
"cpuIdlePercent":74.94333920925109, "cpuIdlePercent":74.94333920925109,
"cpuStealPercent":0, "cpuStealPercent":0,
"loadAverageOneMinute":5.42333984375, "loadAverageOneMinute":5.42333984375,
"loadAverageFiveMinute":4.099609375, "loadAverageFiveMinute":4.099609375,
"loadAverageFifteenMinute":3.58203125 "loadAverageFifteenMinute":3.58203125
} }
] ]
} }
] ]
``` ```
Let's use cUrl to send `newrelic.json` to single-node VictoriaMetrics: Save this JSON into `newrelic.json` file and then use the following command in order to import it into VictoriaMetrics:
```console ```console
curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/api/v1/infra/v2/metrics/events/bulk curl -X POST -H 'Content-Type: application/json' --data-binary @newrelic.json http://localhost:8428/newrelic/infra/v2/metrics/events/bulk
``` ```
If data was successfully ingested, you'll get `{"status":"ok"}` response. Let's fetch ingested data from VictoriaMetrics Let's fetch the ingested data via [data export API](#how-to-export-data-in-json-line-format):
in vmui via query `{__name__!=""}`:
```console ```console
system_sample_cpu_io_wait_percent{entity_key="macbook-pro.local"} 0 curl http://localhost:8428/api/v1/export -d 'match={eventType="SystemSample"}'
system_sample_cpu_idle_percent{entity_key="macbook-pro.local"} 74.9433392092 {"metric":{"__name__":"cpuStealPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
system_sample_cpu_percent{entity_key="macbook-pro.local"} 25.056660790748 {"metric":{"__name__":"loadAverageFiveMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[4.099609375],"timestamps":[1697407970000]}
system_sample_cpu_steal_percent{entity_key="macbook-pro.local"} 0 {"metric":{"__name__":"cpuIOWaitPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[0],"timestamps":[1697407970000]}
system_sample_cpu_system_percent{entity_key="macbook-pro.local"} 16.368672878359 {"metric":{"__name__":"cpuSystemPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[16.368672878359],"timestamps":[1697407970000]}
system_sample_cpu_user_percent{entity_key="macbook-pro.local"} 8.687987912389 {"metric":{"__name__":"loadAverageOneMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[5.42333984375],"timestamps":[1697407970000]}
system_sample_load_average_fifteen_minute{entity_key="macbook-pro.local"} 3.58203125 {"metric":{"__name__":"cpuUserPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[8.687987912389],"timestamps":[1697407970000]}
system_sample_load_average_five_minute{entity_key="macbook-pro.local"} 4.099609375 {"metric":{"__name__":"cpuIdlePercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[74.9433392092],"timestamps":[1697407970000]}
system_sample_load_average_one_minute{entity_key="macbook-pro.local"} 5.42333984375 {"metric":{"__name__":"loadAverageFifteenMinute","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[3.58203125],"timestamps":[1697407970000]}
{"metric":{"__name__":"cpuPercent","entityKey":"macbook-pro.local","eventType":"SystemSample"},"values":[25.056660790748],"timestamps":[1697407970000]}
``` ```
The fields in `newrelic.json` are transformed in the following way:
1. `eventType` filed is used as prefix for all metrics in the object;
2. `entityKey` or any other field with `string` value type is used as label attached to all metrics in the object;
3. the rest fields with numeric values will be used as metrics;
4. the additional field `timestamp` can be added to the payload to set the timestamp for all metrics. If omitted,
current time is used.
## Prometheus querying API usage ## Prometheus querying API usage
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):

View file

@ -99,11 +99,11 @@ func (ctx *pushCtx) Read() error {
reqLen, err := ctx.reqBuf.ReadFrom(lr) reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil { if err != nil {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
} }
if reqLen > int64(maxInsertRequestSize.N) { if reqLen > int64(maxInsertRequestSize.N) {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) return fmt.Errorf("too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N)
} }
return nil return nil
} }

View file

@ -2,231 +2,191 @@ package newrelic
import ( import (
"fmt" "fmt"
"sync"
"unicode"
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
"github.com/valyala/fastjson/fastfloat" "github.com/valyala/fastjson/fastfloat"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
) )
var baseEventKeys = map[string]struct{}{ // Rows contains rows parsed from NewRelic Event request
"timestamp": {}, "eventType": {}, //
// See https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events
type Rows struct {
Rows []Row
} }
type tagsBuffer struct { // Reset resets r, so it can be re-used
tags []Tag func (r *Rows) Reset() {
rows := r.Rows
for i := range rows {
rows[i].reset()
}
r.Rows = rows[:0]
} }
var tagsPool = sync.Pool{ var jsonParserPool fastjson.ParserPool
New: func() interface{} {
return &tagsBuffer{tags: make([]Tag, 0)}
},
}
// NewRelic agent sends next struct to the collector // Unmarshal parses NewRelic Event request from b to r.
// MetricPost entity item for the HTTP post to be sent to the ingest service. //
// type MetricPost struct { // b can be re-used after returning from r.
// ExternalKeys []string `json:"ExternalKeys,omitempty"` func (r *Rows) Unmarshal(b []byte) error {
// EntityID uint64 `json:"EntityID,omitempty"` p := jsonParserPool.Get()
// IsAgent bool `json:"IsAgent"` defer jsonParserPool.Put(p)
// Events []json.RawMessage `json:"Events"`
// // Entity ID of the reporting agent, which will = EntityID when IsAgent == true.
// // The field is required in the backend for host metadata matching of the remote entities
// ReportingAgentID uint64 `json:"ReportingAgentID,omitempty"`
// }
// We are using only Events field because it contains all needed metrics
// Events represents Metrics collected from NewRelic MetricPost request r.Reset()
// https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events v, err := p.ParseBytes(b)
type Events struct { if err != nil {
Metrics []Metric return err
} }
metricPosts, err := v.Array()
// Unmarshal takes fastjson.Value and collects Metrics if err != nil {
func (e *Events) Unmarshal(v []*fastjson.Value) error { return fmt.Errorf("cannot find the top-level array of MetricPost objects: %w", err)
for _, value := range v { }
events := value.Get("Events") for _, mp := range metricPosts {
if events == nil { o, err := mp.Object()
return fmt.Errorf("got empty Events array from request")
}
eventsArr, err := events.Array()
if err != nil { if err != nil {
return fmt.Errorf("error collect events: %s", err) return fmt.Errorf("cannot find MetricPost object: %w", err)
} }
rows := r.Rows
for _, event := range eventsArr { o.Visit(func(k []byte, v *fastjson.Value) {
metricData, err := event.Object()
if err != nil { if err != nil {
return fmt.Errorf("error get metric data: %s", err) return
} }
var m Metric switch string(k) {
metrics, err := m.unmarshal(metricData) case "Events":
if err != nil { events, errLocal := v.Array()
return fmt.Errorf("error collect metrics from Newrelic json: %s", err) if errLocal != nil {
err = fmt.Errorf("cannot find Events array in MetricPost object: %w", errLocal)
return
}
for _, e := range events {
eventObject, errLocal := e.Object()
if errLocal != nil {
err = fmt.Errorf("cannot find EventObject: %w", errLocal)
return
}
if cap(rows) > len(rows) {
rows = rows[:len(rows)+1]
} else {
rows = append(rows, Row{})
}
r := &rows[len(rows)-1]
if errLocal := r.unmarshal(eventObject); errLocal != nil {
err = fmt.Errorf("cannot unmarshal EventObject: %w", errLocal)
return
}
}
} }
e.Metrics = append(e.Metrics, metrics...) })
r.Rows = rows
if err != nil {
return fmt.Errorf("cannot parse MetricPost object: %w", err)
} }
} }
return nil return nil
} }
// Metric represents VictoriaMetrics metrics // Row represents parsed row
type Metric struct { type Row struct {
Timestamp int64
Tags []Tag Tags []Tag
Metric string Samples []Sample
Value float64 Timestamp int64
} }
func (m *Metric) unmarshal(o *fastjson.Object) ([]Metric, error) { // Tag represents a key=value tag
m.reset() type Tag struct {
Key []byte
Value []byte
}
tgsBuffer := tagsPool.Get().(*tagsBuffer) // Sample represents parsed sample
defer func() { type Sample struct {
tgsBuffer.tags = tgsBuffer.tags[:0] Name []byte
tagsPool.Put(tgsBuffer) Value float64
}() }
metrics := make([]Metric, 0, o.Len()) func (r *Row) reset() {
rawTs := o.Get("timestamp") tags := r.Tags
if rawTs != nil { for i := range tags {
ts, err := getFloat64(rawTs) tags[i].reset()
}
r.Tags = tags[:0]
samples := r.Samples
for i := range samples {
samples[i].reset()
}
r.Samples = samples[:0]
r.Timestamp = 0
}
func (t *Tag) reset() {
t.Key = t.Key[:0]
t.Value = t.Value[:0]
}
func (s *Sample) reset() {
s.Name = s.Name[:0]
s.Value = 0
}
func (r *Row) unmarshal(o *fastjson.Object) (err error) {
r.reset()
tags := r.Tags[:0]
samples := r.Samples[:0]
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid `timestamp` in %s: %w", o, err)
}
m.Timestamp = int64(ts * 1e3)
} else {
// Allow missing timestamp. It should be automatically populated
// with the current time by the caller.
m.Timestamp = 0
}
eventType := o.Get("eventType")
if eventType == nil {
return nil, fmt.Errorf("error get eventType from Events object: %s", o)
}
prefix := bytesutil.ToUnsafeString(eventType.GetStringBytes())
prefix = camelToSnakeCase(prefix)
o.Visit(func(key []byte, v *fastjson.Value) {
k := bytesutil.ToUnsafeString(key)
// skip base event keys which should have been parsed before this
if _, ok := baseEventKeys[k]; ok {
return return
} }
if len(k) == 0 {
return
}
switch v.Type() { switch v.Type() {
case fastjson.TypeString: case fastjson.TypeString:
// this is label-value pair // Register new tag
value := v.Get() valueBytes := v.GetStringBytes()
if value == nil { if len(valueBytes) == 0 {
logger.Errorf("failed to get label value from NewRelic json: %s", v)
return return
} }
name := camelToSnakeCase(k) if cap(tags) > len(tags) {
val := bytesutil.ToUnsafeString(value.GetStringBytes()) tags = tags[:len(tags)+1]
tgsBuffer.tags = append(tgsBuffer.tags, Tag{Key: name, Value: val}) } else {
tags = append(tags, Tag{})
}
t := &tags[len(tags)-1]
t.Key = append(t.Key[:0], k...)
t.Value = append(t.Value[:0], valueBytes...)
case fastjson.TypeNumber: case fastjson.TypeNumber:
// this is metric name with value if string(k) == "timestamp" {
metricName := camelToSnakeCase(k) // Parse timestamp
if prefix != "" { ts, errLocal := getFloat64(v)
metricName = fmt.Sprintf("%s_%s", prefix, metricName) if errLocal != nil {
} err = fmt.Errorf("cannot parse `timestamp` field: %w", errLocal)
f, err := getFloat64(v) return
if err != nil { }
logger.Errorf("failed to get value for NewRelic metric %q: %w", k, err) if ts < (1 << 32) {
// The timestamp is in seconds. Convert it to milliseconds.
ts *= 1e3
}
r.Timestamp = int64(ts)
return return
} }
metrics = append(metrics, Metric{Metric: metricName, Value: f}) // Register new sample
default: if cap(samples) > len(samples) {
// unknown type samples = samples[:len(samples)+1]
logger.Errorf("got unsupported NewRelic json %s field type: %s", v, v.Type()) } else {
return samples = append(samples, Sample{})
}
s := &samples[len(samples)-1]
s.Name = append(s.Name[:0], k...)
s.Value = v.GetFloat64()
} }
}) })
r.Tags = tags
for i := range metrics { r.Samples = samples
metrics[i].Timestamp = m.Timestamp return err
metrics[i].Tags = tgsBuffer.tags
}
return metrics, nil
}
func (m *Metric) reset() {
m.Timestamp = 0
m.Tags = nil
m.Metric = ""
m.Value = 0
}
// Tag is an NewRelic tag.
type Tag struct {
Key string
Value string
}
func camelToSnakeCase(str string) string {
str = promrelabel.SanitizeLabelName(str)
length := len(str)
snakeCase := make([]byte, 0, length*2)
tokens := make([]byte, 0, length)
var allTokensUpper bool
flush := func(tokens []byte) {
for _, c := range tokens {
snakeCase = append(snakeCase, byte(unicode.ToLower(rune(c))))
}
}
for i := 0; i < length; i++ {
char := str[i]
if unicode.IsUpper(rune(char)) {
switch {
case len(tokens) == 0:
allTokensUpper = true
tokens = append(tokens, char)
case allTokensUpper:
tokens = append(tokens, char)
default:
flush(tokens)
snakeCase = append(snakeCase, '_')
tokens = tokens[:0]
tokens = append(tokens, char)
allTokensUpper = true
}
continue
}
switch {
case len(tokens) == 1:
tokens = append(tokens, char)
allTokensUpper = false
case allTokensUpper:
tail := tokens[:len(tokens)-1]
last := tokens[len(tokens)-1:]
flush(tail)
snakeCase = append(snakeCase, '_')
tokens = tokens[:0]
tokens = append(tokens, last...)
tokens = append(tokens, char)
allTokensUpper = false
default:
tokens = append(tokens, char)
}
}
if len(tokens) > 0 {
flush(tokens)
}
s := bytesutil.ToUnsafeString(snakeCase)
return s
} }
func getFloat64(v *fastjson.Value) (float64, error) { func getFloat64(v *fastjson.Value) (float64, error) {

View file

@ -1,49 +1,103 @@
package newrelic package newrelic
import ( import (
"fmt"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"github.com/valyala/fastjson"
) )
func TestEvents_Unmarshal(t *testing.T) { func TestRowsUnmarshalFailure(t *testing.T) {
tests := []struct { f := func(data string) {
name string t.Helper()
metrics []Metric
json string var r Rows
wantErr bool if err := r.Unmarshal([]byte(data)); err == nil {
}{ t.Fatalf("expecting non-nil error")
}
}
// Empty JSON
f("")
// Invalid JSON
f("123")
f("[foo]")
f(`{"foo":123}`)
}
func TestRowsUnmarshalSuccess(t *testing.T) {
f := func(data string, expectedRows []Row) {
t.Helper()
var r Rows
if err := r.Unmarshal([]byte(data)); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(r.Rows, expectedRows) {
t.Fatalf("unexpected rows parsed\ngot\n%s\nwant\n%s", rowsToString(r.Rows), rowsToString(expectedRows))
}
}
// empty array
f(`[]`, nil)
// zero events
f(`[
{
"EntityID":28257883748326179,
"IsAgent":true,
"Events":[],
"ReportingAgentID":28257883748326179
}]`, nil)
// A single event
f(`[{
"EntityID":28257883748326179,
"IsAgent":true,
"Events":[
{
"eventType":"SystemSample",
"timestamp":1690286061,
"entityKey":"macbook-pro.local",
"dc": "1",
"diskWritesPerSecond":-34.21,
"uptime":762376
}
],
"ReportingAgentID":28257883748326179
}]`, []Row{
{ {
name: "empty json", Tags: []Tag{
metrics: []Metric{},
json: "",
wantErr: true,
},
{
name: "json with correct data",
metrics: []Metric{
{ {
Timestamp: 1690286061000, Key: []byte("eventType"),
Tags: []Tag{ Value: []byte("SystemSample"),
{Key: "entity_key", Value: "macbook-pro.local"},
{Key: "dc", Value: "1"},
},
Metric: "system_sample_disk_writes_per_second",
Value: 0,
}, },
{ {
Timestamp: 1690286061000, Key: []byte("entityKey"),
Tags: []Tag{ Value: []byte("macbook-pro.local"),
{Key: "entity_key", Value: "macbook-pro.local"}, },
{Key: "dc", Value: "1"}, {
}, Key: []byte("dc"),
Metric: "system_sample_uptime", Value: []byte("1"),
Value: 762376,
}, },
}, },
json: `[ Samples: []Sample{
{
Name: []byte("diskWritesPerSecond"),
Value: -34.21,
},
{
Name: []byte("uptime"),
Value: 762376,
},
},
Timestamp: 1690286061000,
},
})
// Multiple events
f(`[
{ {
"EntityID":28257883748326179, "EntityID":28257883748326179,
"IsAgent":true, "IsAgent":true,
@ -52,123 +106,124 @@ func TestEvents_Unmarshal(t *testing.T) {
"eventType":"SystemSample", "eventType":"SystemSample",
"timestamp":1690286061, "timestamp":1690286061,
"entityKey":"macbook-pro.local", "entityKey":"macbook-pro.local",
"dc": "1", "dc": "1",
"diskWritesPerSecond":0, "diskWritesPerSecond":-34.21,
"uptime":762376 "uptime":762376
} }
], ],
"ReportingAgentID":28257883748326179 "ReportingAgentID":28257883748326179
} },
]`,
wantErr: false,
},
{
name: "empty array in json",
metrics: []Metric{},
json: `[]`,
wantErr: false,
},
{
name: "empty events in json",
metrics: []Metric{},
json: `[
{ {
"EntityID":28257883748326179, "EntityID":282579,
"IsAgent":true, "IsAgent":true,
"Events":[], "Events":[
"ReportingAgentID":28257883748326179 {
"eventType":"SystemSample",
"timestamp":1690286061,
"entityKey":"macbook-pro.local",
"diskWritesPerSecond":234.34,
"timestamp":1690286061.433,
"uptime":762376
},
{
"eventType":"ProcessSample",
"timestamp":1690286061987,
"uptime":1236
}
],
"ReportingAgentID":2879
} }
]`, ]`, []Row{
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Events{Metrics: []Metric{}}
value, err := fastjson.Parse(tt.json)
if (err != nil) != tt.wantErr {
t.Errorf("cannot parse json error: %s", err)
}
if value != nil {
v, err := value.Array()
if err != nil {
t.Errorf("cannot get array from json")
}
if err := e.Unmarshal(v); (err != nil) != tt.wantErr {
t.Errorf("Unmarshal() error = %v, wantErr %v", err, tt.wantErr)
}
if !reflect.DeepEqual(e.Metrics, tt.metrics) {
t.Errorf("got metrics => %v; expected = %v", e.Metrics, tt.metrics)
}
}
})
}
}
func Test_camelToSnakeCase(t *testing.T) {
tests := []struct {
name string
str string
want string
}{
{ {
name: "empty string", Tags: []Tag{
str: "", {
want: "", Key: []byte("eventType"),
Value: []byte("SystemSample"),
},
{
Key: []byte("entityKey"),
Value: []byte("macbook-pro.local"),
},
{
Key: []byte("dc"),
Value: []byte("1"),
},
},
Samples: []Sample{
{
Name: []byte("diskWritesPerSecond"),
Value: -34.21,
},
{
Name: []byte("uptime"),
Value: 762376,
},
},
Timestamp: 1690286061000,
}, },
{ {
name: "lowercase all chars", Tags: []Tag{
str: "somenewstring", {
want: "somenewstring", Key: []byte("eventType"),
Value: []byte("SystemSample"),
},
{
Key: []byte("entityKey"),
Value: []byte("macbook-pro.local"),
},
},
Samples: []Sample{
{
Name: []byte("diskWritesPerSecond"),
Value: 234.34,
},
{
Name: []byte("uptime"),
Value: 762376,
},
},
Timestamp: 1690286061433,
}, },
{ {
name: "first letter uppercase", Tags: []Tag{
str: "Teststring", {
want: "teststring", Key: []byte("eventType"),
Value: []byte("ProcessSample"),
},
},
Samples: []Sample{
{
Name: []byte("uptime"),
Value: 1236,
},
},
Timestamp: 1690286061987,
}, },
{
name: "two uppercase letters",
str: "TestString",
want: "test_string",
},
{
name: "first and last uppercase letters",
str: "TeststrinG",
want: "teststrin_g",
},
{
name: "three letters uppercase",
str: "TestStrinG",
want: "test_strin_g",
},
{
name: "has many upper case letters",
str: "ProgressIOTime",
want: "progress_io_time",
},
{
name: "last all uppercase letters",
str: "ProgressTSDB",
want: "progress_tsdb",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := camelToSnakeCase(tt.str); got != tt.want {
t.Errorf("camelToSnakeCase() = %v, want %v", got, tt.want)
}
})
}
}
func BenchmarkCameToSnake(b *testing.B) {
b.ReportAllocs()
str := strings.Repeat("ProgressIOTime", 20)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
camelToSnakeCase(str)
}
}) })
}
func rowsToString(rows []Row) string {
var a []string
for _, row := range rows {
s := row.String()
a = append(a, s)
}
return strings.Join(a, "\n")
}
func (r *Row) String() string {
var a []string
for _, t := range r.Tags {
s := fmt.Sprintf("%s=%q", t.Key, t.Value)
a = append(a, s)
}
tagsString := "{" + strings.Join(a, ",") + "}"
a = a[:0]
for _, sample := range r.Samples {
s := fmt.Sprintf("[%s %f]", sample.Name, sample.Value)
a = append(a, s)
}
samplesString := strings.Join(a, ",")
return fmt.Sprintf("tags=%s, samples=%s, timestamp=%d", tagsString, samplesString, r.Timestamp)
} }

View file

@ -1,13 +1,12 @@
package newrelic package newrelic
import ( import (
"fmt"
"testing" "testing"
"github.com/valyala/fastjson"
) )
func BenchmarkRequestUnmarshal(b *testing.B) { func BenchmarkRowsUnmarshal(b *testing.B) {
reqBody := `[ reqBody := []byte(`[
{ {
"EntityID":28257883748326179, "EntityID":28257883748326179,
"IsAgent":true, "IsAgent":true,
@ -52,25 +51,17 @@ func BenchmarkRequestUnmarshal(b *testing.B) {
], ],
"ReportingAgentID":28257883748326179 "ReportingAgentID":28257883748326179
} }
]` ]`)
b.SetBytes(int64(len(reqBody))) b.SetBytes(int64(len(reqBody)))
b.ReportAllocs() b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
value, err := fastjson.Parse(reqBody) var r Rows
if err != nil {
b.Errorf("cannot parse json error: %s", err)
}
v, err := value.Array()
if err != nil {
b.Errorf("cannot get array from json")
}
for pb.Next() { for pb.Next() {
e := &Events{Metrics: []Metric{}} if err := r.Unmarshal(reqBody); err != nil {
if err := e.Unmarshal(v); err != nil { panic(fmt.Errorf("unmarshal error: %s", err))
b.Errorf("Unmarshal() error = %v", err)
} }
if len(e.Metrics) == 0 { if len(r.Rows) != 1 {
b.Errorf("metrics should have at least one element") panic(fmt.Errorf("unexpected number of items unmarshaled; got %d; want %d", len(r.Rows), 1))
} }
} }
}) })

View file

@ -1,80 +0,0 @@
package stream
import (
"bufio"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
)
var (
maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic POST request to /infra/v2/metrics/events/bulk")
)
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`)
)
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
}
func (ctx *pushCtx) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1)
startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
if reqLen > maxInsertRequestSize.N {
readErrors.Inc()
return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N)
}
return nil
}
func (ctx *pushCtx) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
}
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}

View file

@ -1,23 +1,31 @@
package stream package stream
import ( import (
"bufio"
"fmt" "fmt"
"io" "io"
"sync"
"github.com/valyala/fastjson" "github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
) )
var parserPool fastjson.ParserPool var (
maxInsertRequestSize = flagutil.NewBytes("newrelic.maxInsertRequestSize", 64*1024*1024, "The maximum size in bytes of a single NewRelic request "+
"to /newrelic/infra/v2/metrics/events/bulk")
)
// Parse parses NewRelic POST request for newrelic/infra/v2/metrics/events/bulk from reader and calls callback for the parsed request. // Parse parses NewRelic POST request for /newrelic/infra/v2/metrics/events/bulk from r and calls callback for the parsed request.
// //
// callback shouldn't hold series after returning. // callback shouldn't hold rows after returning.
func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) error) error { func Parse(r io.Reader, isGzip bool, callback func(rows []newrelic.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -25,7 +33,7 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err
if isGzip { if isGzip {
zr, err := common.GetGzipReader(r) zr, err := common.GetGzipReader(r)
if err != nil { if err != nil {
return fmt.Errorf("cannot read gzipped Newrelic agent data: %w", err) return fmt.Errorf("cannot read gzipped NewRelic agent data: %w", err)
} }
defer common.PutGzipReader(zr) defer common.PutGzipReader(zr)
r = zr r = zr
@ -34,40 +42,104 @@ func Parse(r io.Reader, isGzip bool, callback func(series []newrelic.Metric) err
ctx := getPushCtx(r) ctx := getPushCtx(r)
defer putPushCtx(ctx) defer putPushCtx(ctx)
if err := ctx.Read(); err != nil { if err := ctx.Read(); err != nil {
return err return fmt.Errorf("cannot read NewRelic request: %w", err)
} }
p := parserPool.Get() rows := getRows()
defer parserPool.Put(p) defer putRows(rows)
v, err := p.ParseBytes(ctx.reqBuf.B) if err := rows.Unmarshal(ctx.reqBuf.B); err != nil {
if err != nil {
return fmt.Errorf("cannot parse NewRelic POST request with size %d bytes: %w", len(ctx.reqBuf.B), err)
}
metricsPost, err := v.Array()
if err != nil {
return fmt.Errorf("cannot fetch data from Newrelic POST request: %w", err)
}
var events newrelic.Events
if err := events.Unmarshal(metricsPost); err != nil {
unmarshalErrors.Inc() unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal NewRelic POST request: %w", err) return fmt.Errorf("cannot unmarshal NewRelic request: %w", err)
} }
// Fill in missing timestamps // Fill in missing timestamps
currentTimestamp := int64(fasttime.UnixTimestamp()) currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range events.Metrics { for i := range rows.Rows {
m := &events.Metrics[i] r := &rows.Rows[i]
if m.Timestamp == 0 { if r.Timestamp == 0 {
m.Timestamp = currentTimestamp * 1e3 r.Timestamp = currentTimestamp * 1e3
} }
} }
if err := callback(events.Metrics); err != nil { if err := callback(rows.Rows); err != nil {
return fmt.Errorf("error when processing imported data: %w", err) return fmt.Errorf("error when processing imported data: %w", err)
} }
return nil return nil
} }
func getRows() *newrelic.Rows {
v := rowsPool.Get()
if v == nil {
return &newrelic.Rows{}
}
return v.(*newrelic.Rows)
}
func putRows(rows *newrelic.Rows) {
rows.Reset()
rowsPool.Put(rows)
}
var rowsPool sync.Pool
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="newrelic"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="newrelic"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`)
)
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
}
func (ctx *pushCtx) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, maxInsertRequestSize.N+1)
startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
if reqLen > maxInsertRequestSize.N {
readErrors.Inc()
return fmt.Errorf("too big request; mustn't exceed -newrelic.maxInsertRequestSize=%d bytes", maxInsertRequestSize.N)
}
return nil
}
func (ctx *pushCtx) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
}
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}

View file

@ -0,0 +1,106 @@
package stream
import (
"bytes"
"compress/gzip"
"fmt"
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
)
func TestParseFailure(t *testing.T) {
f := func(req string) {
t.Helper()
callback := func(rows []newrelic.Row) error {
panic(fmt.Errorf("unexpected call into callback"))
}
r := bytes.NewReader([]byte(req))
if err := Parse(r, false, callback); err == nil {
t.Fatalf("expecting non-empty error")
}
}
f("")
f("foo")
f("{}")
f("[1,2,3]")
}
func TestParseSuccess(t *testing.T) {
f := func(req string, expectedRows []newrelic.Row) {
t.Helper()
callback := func(rows []newrelic.Row) error {
if !reflect.DeepEqual(rows, expectedRows) {
return fmt.Errorf("unexpected rows\ngot\n%v\nwant\n%v", rows, expectedRows)
}
return nil
}
// Parse from uncompressed reader
r := bytes.NewReader([]byte(req))
if err := Parse(r, false, callback); err != nil {
t.Fatalf("unexpected error when parsing uncompressed request: %s", err)
}
var bb bytes.Buffer
zw := gzip.NewWriter(&bb)
if _, err := zw.Write([]byte(req)); err != nil {
t.Fatalf("cannot compress request: %s", err)
}
if err := zw.Close(); err != nil {
t.Fatalf("cannot close compressed writer: %s", err)
}
if err := Parse(&bb, true, callback); err != nil {
t.Fatalf("unexpected error when parsing compressed request: %s", err)
}
}
f("[]", nil)
f(`[{"Events":[]}]`, nil)
f(`[{
"EntityID":28257883748326179,
"IsAgent":true,
"Events":[
{
"eventType":"SystemSample",
"timestamp":1690286061,
"entityKey":"macbook-pro.local",
"dc": "1",
"diskWritesPerSecond":-34.21,
"uptime":762376
}
],
"ReportingAgentID":28257883748326179
}]`, []newrelic.Row{
{
Tags: []newrelic.Tag{
{
Key: []byte("eventType"),
Value: []byte("SystemSample"),
},
{
Key: []byte("entityKey"),
Value: []byte("macbook-pro.local"),
},
{
Key: []byte("dc"),
Value: []byte("1"),
},
},
Samples: []newrelic.Sample{
{
Name: []byte("diskWritesPerSecond"),
Value: -34.21,
},
{
Name: []byte("uptime"),
Value: 762376,
},
},
Timestamp: 1690286061000,
},
})
}

View file

@ -96,7 +96,7 @@ func (ctx *pushCtx) Read() error {
} }
if reqLen > int64(maxInsertRequestSize.N) { if reqLen > int64(maxInsertRequestSize.N) {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) return fmt.Errorf("too big packed request; mustn't exceed -maxInsertRequestSize=%d bytes", maxInsertRequestSize.N)
} }
return nil return nil
} }