VictoriaMetrics/lib/protoparser/newrelic/parser.go
Aliaksandr Valialkin 815e9bf892
app/{vmagent,vminsert}: follow-up for NewRelic data ingestion protocol support
This is a follow-up for f60c08a7bd

Changes:

- Make sure all the urls related to NewRelic protocol start from /newrelic . Previously some urls were started from /api/v1/newrelic

- Remove /api/v1 part from NewRelic urls, since it has no sense

- Remove automatic transformation from CamelCase to snake_case for NewRelic labels and metric names,
  since it may complicate the transition from NewRelic to VictoriaMetrics. Preserve all the metric names and label names,
  so users could query metrics and labels by the same names which are used in NewRelic.
  The automatic transformation from CamelCase to snake_case can be added later as a special action for relabeling rules if needed.

- Properly update per-tenant data ingestion stats at app/vmagent/newrelic/request_handler.go . Previously it was always zero.

- Fix NewRelic urls in vmagent when multitenant data ingestion is enabled. Previously they were mistakenly started from `/`.

- Document NewRelic data ingestion url at docs/Cluster-VictoriaMetrics.md

- Remove superflouos memory allocations at lib/protoparser/newrelic

- Improve tests at lib/protoparser/newrelic/*

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712
2023-10-16 13:55:04 +02:00

206 lines
4.3 KiB
Go

package newrelic
import (
"fmt"
"github.com/valyala/fastjson"
"github.com/valyala/fastjson/fastfloat"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// Rows contains rows parsed from NewRelic Event request
//
// See https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events
type Rows struct {
Rows []Row
}
// Reset resets r, so it can be re-used
func (r *Rows) Reset() {
rows := r.Rows
for i := range rows {
rows[i].reset()
}
r.Rows = rows[:0]
}
var jsonParserPool fastjson.ParserPool
// Unmarshal parses NewRelic Event request from b to r.
//
// b can be re-used after returning from r.
func (r *Rows) Unmarshal(b []byte) error {
p := jsonParserPool.Get()
defer jsonParserPool.Put(p)
r.Reset()
v, err := p.ParseBytes(b)
if err != nil {
return err
}
metricPosts, err := v.Array()
if err != nil {
return fmt.Errorf("cannot find the top-level array of MetricPost objects: %w", err)
}
for _, mp := range metricPosts {
o, err := mp.Object()
if err != nil {
return fmt.Errorf("cannot find MetricPost object: %w", err)
}
rows := r.Rows
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
switch string(k) {
case "Events":
events, errLocal := v.Array()
if errLocal != nil {
err = fmt.Errorf("cannot find Events array in MetricPost object: %w", errLocal)
return
}
for _, e := range events {
eventObject, errLocal := e.Object()
if errLocal != nil {
err = fmt.Errorf("cannot find EventObject: %w", errLocal)
return
}
if cap(rows) > len(rows) {
rows = rows[:len(rows)+1]
} else {
rows = append(rows, Row{})
}
r := &rows[len(rows)-1]
if errLocal := r.unmarshal(eventObject); errLocal != nil {
err = fmt.Errorf("cannot unmarshal EventObject: %w", errLocal)
return
}
}
}
})
r.Rows = rows
if err != nil {
return fmt.Errorf("cannot parse MetricPost object: %w", err)
}
}
return nil
}
// Row represents parsed row
type Row struct {
Tags []Tag
Samples []Sample
Timestamp int64
}
// Tag represents a key=value tag
type Tag struct {
Key []byte
Value []byte
}
// Sample represents parsed sample
type Sample struct {
Name []byte
Value float64
}
func (r *Row) reset() {
tags := r.Tags
for i := range tags {
tags[i].reset()
}
r.Tags = tags[:0]
samples := r.Samples
for i := range samples {
samples[i].reset()
}
r.Samples = samples[:0]
r.Timestamp = 0
}
func (t *Tag) reset() {
t.Key = t.Key[:0]
t.Value = t.Value[:0]
}
func (s *Sample) reset() {
s.Name = s.Name[:0]
s.Value = 0
}
func (r *Row) unmarshal(o *fastjson.Object) (err error) {
r.reset()
tags := r.Tags[:0]
samples := r.Samples[:0]
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
if len(k) == 0 {
return
}
switch v.Type() {
case fastjson.TypeString:
// Register new tag
valueBytes := v.GetStringBytes()
if len(valueBytes) == 0 {
return
}
if cap(tags) > len(tags) {
tags = tags[:len(tags)+1]
} else {
tags = append(tags, Tag{})
}
t := &tags[len(tags)-1]
t.Key = append(t.Key[:0], k...)
t.Value = append(t.Value[:0], valueBytes...)
case fastjson.TypeNumber:
if string(k) == "timestamp" {
// Parse timestamp
ts, errLocal := getFloat64(v)
if errLocal != nil {
err = fmt.Errorf("cannot parse `timestamp` field: %w", errLocal)
return
}
if ts < (1 << 32) {
// The timestamp is in seconds. Convert it to milliseconds.
ts *= 1e3
}
r.Timestamp = int64(ts)
return
}
// Register new sample
if cap(samples) > len(samples) {
samples = samples[:len(samples)+1]
} else {
samples = append(samples, Sample{})
}
s := &samples[len(samples)-1]
s.Name = append(s.Name[:0], k...)
s.Value = v.GetFloat64()
}
})
r.Tags = tags
r.Samples = samples
return err
}
func getFloat64(v *fastjson.Value) (float64, error) {
switch v.Type() {
case fastjson.TypeNumber:
return v.Float64()
case fastjson.TypeString:
vStr, _ := v.StringBytes()
vFloat, err := fastfloat.Parse(bytesutil.ToUnsafeString(vStr))
if err != nil {
return 0, fmt.Errorf("cannot parse value %q: %w", vStr, err)
}
return vFloat, nil
default:
return 0, fmt.Errorf("value doesn't contain float64; it contains %s", v.Type())
}
}