VictoriaMetrics/lib/protoparser/newrelic/parser.go

207 lines
4.3 KiB
Go
Raw Permalink Normal View History

package newrelic
import (
"fmt"
"github.com/valyala/fastjson"
"github.com/valyala/fastjson/fastfloat"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// Rows contains rows parsed from NewRelic Event request
//
// See https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events
type Rows struct {
Rows []Row
}
// Reset resets r, so it can be re-used
func (r *Rows) Reset() {
rows := r.Rows
for i := range rows {
rows[i].reset()
}
r.Rows = rows[:0]
}
var jsonParserPool fastjson.ParserPool
// Unmarshal parses NewRelic Event request from b to r.
//
// b can be re-used after returning from r.
func (r *Rows) Unmarshal(b []byte) error {
p := jsonParserPool.Get()
defer jsonParserPool.Put(p)
r.Reset()
v, err := p.ParseBytes(b)
if err != nil {
return err
}
metricPosts, err := v.Array()
if err != nil {
return fmt.Errorf("cannot find the top-level array of MetricPost objects: %w", err)
}
for _, mp := range metricPosts {
o, err := mp.Object()
if err != nil {
return fmt.Errorf("cannot find MetricPost object: %w", err)
}
rows := r.Rows
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
switch string(k) {
case "Events":
events, errLocal := v.Array()
if errLocal != nil {
err = fmt.Errorf("cannot find Events array in MetricPost object: %w", errLocal)
return
}
for _, e := range events {
eventObject, errLocal := e.Object()
if errLocal != nil {
err = fmt.Errorf("cannot find EventObject: %w", errLocal)
return
}
if cap(rows) > len(rows) {
rows = rows[:len(rows)+1]
} else {
rows = append(rows, Row{})
}
r := &rows[len(rows)-1]
if errLocal := r.unmarshal(eventObject); errLocal != nil {
err = fmt.Errorf("cannot unmarshal EventObject: %w", errLocal)
return
}
}
}
})
r.Rows = rows
if err != nil {
return fmt.Errorf("cannot parse MetricPost object: %w", err)
}
}
return nil
}
// Row represents parsed row
type Row struct {
Tags []Tag
Samples []Sample
Timestamp int64
}
// Tag represents a key=value tag
type Tag struct {
Key []byte
Value []byte
}
// Sample represents parsed sample
type Sample struct {
Name []byte
Value float64
}
func (r *Row) reset() {
tags := r.Tags
for i := range tags {
tags[i].reset()
}
r.Tags = tags[:0]
samples := r.Samples
for i := range samples {
samples[i].reset()
}
r.Samples = samples[:0]
r.Timestamp = 0
}
func (t *Tag) reset() {
t.Key = t.Key[:0]
t.Value = t.Value[:0]
}
func (s *Sample) reset() {
s.Name = s.Name[:0]
s.Value = 0
}
func (r *Row) unmarshal(o *fastjson.Object) (err error) {
r.reset()
tags := r.Tags[:0]
samples := r.Samples[:0]
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
if len(k) == 0 {
return
}
switch v.Type() {
case fastjson.TypeString:
// Register new tag
valueBytes := v.GetStringBytes()
if len(valueBytes) == 0 {
return
}
if cap(tags) > len(tags) {
tags = tags[:len(tags)+1]
} else {
tags = append(tags, Tag{})
}
t := &tags[len(tags)-1]
t.Key = append(t.Key[:0], k...)
t.Value = append(t.Value[:0], valueBytes...)
case fastjson.TypeNumber:
if string(k) == "timestamp" {
// Parse timestamp
ts, errLocal := getFloat64(v)
if errLocal != nil {
err = fmt.Errorf("cannot parse `timestamp` field: %w", errLocal)
return
}
if ts < (1 << 32) {
// The timestamp is in seconds. Convert it to milliseconds.
ts *= 1e3
}
r.Timestamp = int64(ts)
return
}
// Register new sample
if cap(samples) > len(samples) {
samples = samples[:len(samples)+1]
} else {
samples = append(samples, Sample{})
}
s := &samples[len(samples)-1]
s.Name = append(s.Name[:0], k...)
s.Value = v.GetFloat64()
}
})
r.Tags = tags
r.Samples = samples
return err
}
func getFloat64(v *fastjson.Value) (float64, error) {
switch v.Type() {
case fastjson.TypeNumber:
return v.Float64()
case fastjson.TypeString:
vStr, _ := v.StringBytes()
vFloat, err := fastfloat.Parse(bytesutil.ToUnsafeString(vStr))
if err != nil {
return 0, fmt.Errorf("cannot parse value %q: %w", vStr, err)
}
return vFloat, nil
default:
return 0, fmt.Errorf("value doesn't contain float64; it contains %s", v.Type())
}
}