mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
a89cf4ce12
Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit f60c08a7bd
)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
246 lines
6.2 KiB
Go
246 lines
6.2 KiB
Go
package newrelic
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"unicode"
|
|
|
|
"github.com/valyala/fastjson"
|
|
"github.com/valyala/fastjson/fastfloat"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
)
|
|
|
|
var baseEventKeys = map[string]struct{}{
|
|
"timestamp": {}, "eventType": {},
|
|
}
|
|
|
|
type tagsBuffer struct {
|
|
tags []Tag
|
|
}
|
|
|
|
var tagsPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return &tagsBuffer{tags: make([]Tag, 0)}
|
|
},
|
|
}
|
|
|
|
// NewRelic agent sends next struct to the collector
|
|
// MetricPost entity item for the HTTP post to be sent to the ingest service.
|
|
// type MetricPost struct {
|
|
// ExternalKeys []string `json:"ExternalKeys,omitempty"`
|
|
// EntityID uint64 `json:"EntityID,omitempty"`
|
|
// IsAgent bool `json:"IsAgent"`
|
|
// Events []json.RawMessage `json:"Events"`
|
|
// // Entity ID of the reporting agent, which will = EntityID when IsAgent == true.
|
|
// // The field is required in the backend for host metadata matching of the remote entities
|
|
// ReportingAgentID uint64 `json:"ReportingAgentID,omitempty"`
|
|
// }
|
|
// We are using only Events field because it contains all needed metrics
|
|
|
|
// Events represents Metrics collected from NewRelic MetricPost request
|
|
// https://docs.newrelic.com/docs/infrastructure/manage-your-data/data-instrumentation/default-infrastructure-monitoring-data/#infrastructure-events
|
|
type Events struct {
|
|
Metrics []Metric
|
|
}
|
|
|
|
// Unmarshal takes fastjson.Value and collects Metrics
|
|
func (e *Events) Unmarshal(v []*fastjson.Value) error {
|
|
for _, value := range v {
|
|
events := value.Get("Events")
|
|
if events == nil {
|
|
return fmt.Errorf("got empty Events array from request")
|
|
}
|
|
eventsArr, err := events.Array()
|
|
if err != nil {
|
|
return fmt.Errorf("error collect events: %s", err)
|
|
}
|
|
|
|
for _, event := range eventsArr {
|
|
metricData, err := event.Object()
|
|
if err != nil {
|
|
return fmt.Errorf("error get metric data: %s", err)
|
|
}
|
|
var m Metric
|
|
metrics, err := m.unmarshal(metricData)
|
|
if err != nil {
|
|
return fmt.Errorf("error collect metrics from Newrelic json: %s", err)
|
|
}
|
|
e.Metrics = append(e.Metrics, metrics...)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Metric represents VictoriaMetrics metrics
|
|
type Metric struct {
|
|
Timestamp int64
|
|
Tags []Tag
|
|
Metric string
|
|
Value float64
|
|
}
|
|
|
|
func (m *Metric) unmarshal(o *fastjson.Object) ([]Metric, error) {
|
|
m.reset()
|
|
|
|
tgsBuffer := tagsPool.Get().(*tagsBuffer)
|
|
defer func() {
|
|
tgsBuffer.tags = tgsBuffer.tags[:0]
|
|
tagsPool.Put(tgsBuffer)
|
|
}()
|
|
|
|
metrics := make([]Metric, 0, o.Len())
|
|
rawTs := o.Get("timestamp")
|
|
if rawTs != nil {
|
|
ts, err := getFloat64(rawTs)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid `timestamp` in %s: %w", o, err)
|
|
}
|
|
m.Timestamp = int64(ts * 1e3)
|
|
} else {
|
|
// Allow missing timestamp. It should be automatically populated
|
|
// with the current time by the caller.
|
|
m.Timestamp = 0
|
|
}
|
|
|
|
eventType := o.Get("eventType")
|
|
if eventType == nil {
|
|
return nil, fmt.Errorf("error get eventType from Events object: %s", o)
|
|
}
|
|
prefix := bytesutil.ToUnsafeString(eventType.GetStringBytes())
|
|
prefix = camelToSnakeCase(prefix)
|
|
|
|
o.Visit(func(key []byte, v *fastjson.Value) {
|
|
|
|
k := bytesutil.ToUnsafeString(key)
|
|
// skip base event keys which should have been parsed before this
|
|
if _, ok := baseEventKeys[k]; ok {
|
|
return
|
|
}
|
|
|
|
switch v.Type() {
|
|
case fastjson.TypeString:
|
|
// this is label-value pair
|
|
value := v.Get()
|
|
if value == nil {
|
|
logger.Errorf("failed to get label value from NewRelic json: %s", v)
|
|
return
|
|
}
|
|
name := camelToSnakeCase(k)
|
|
val := bytesutil.ToUnsafeString(value.GetStringBytes())
|
|
tgsBuffer.tags = append(tgsBuffer.tags, Tag{Key: name, Value: val})
|
|
case fastjson.TypeNumber:
|
|
// this is metric name with value
|
|
metricName := camelToSnakeCase(k)
|
|
if prefix != "" {
|
|
metricName = fmt.Sprintf("%s_%s", prefix, metricName)
|
|
}
|
|
f, err := getFloat64(v)
|
|
if err != nil {
|
|
logger.Errorf("failed to get value for NewRelic metric %q: %w", k, err)
|
|
return
|
|
}
|
|
metrics = append(metrics, Metric{Metric: metricName, Value: f})
|
|
default:
|
|
// unknown type
|
|
logger.Errorf("got unsupported NewRelic json %s field type: %s", v, v.Type())
|
|
return
|
|
}
|
|
})
|
|
|
|
for i := range metrics {
|
|
metrics[i].Timestamp = m.Timestamp
|
|
metrics[i].Tags = tgsBuffer.tags
|
|
}
|
|
|
|
return metrics, nil
|
|
}
|
|
|
|
func (m *Metric) reset() {
|
|
m.Timestamp = 0
|
|
m.Tags = nil
|
|
m.Metric = ""
|
|
m.Value = 0
|
|
}
|
|
|
|
// Tag is an NewRelic tag.
|
|
type Tag struct {
|
|
Key string
|
|
Value string
|
|
}
|
|
|
|
func camelToSnakeCase(str string) string {
|
|
str = promrelabel.SanitizeLabelName(str)
|
|
length := len(str)
|
|
snakeCase := make([]byte, 0, length*2)
|
|
tokens := make([]byte, 0, length)
|
|
var allTokensUpper bool
|
|
|
|
flush := func(tokens []byte) {
|
|
for _, c := range tokens {
|
|
snakeCase = append(snakeCase, byte(unicode.ToLower(rune(c))))
|
|
}
|
|
}
|
|
|
|
for i := 0; i < length; i++ {
|
|
char := str[i]
|
|
if unicode.IsUpper(rune(char)) {
|
|
switch {
|
|
case len(tokens) == 0:
|
|
allTokensUpper = true
|
|
tokens = append(tokens, char)
|
|
case allTokensUpper:
|
|
tokens = append(tokens, char)
|
|
default:
|
|
flush(tokens)
|
|
snakeCase = append(snakeCase, '_')
|
|
tokens = tokens[:0]
|
|
tokens = append(tokens, char)
|
|
allTokensUpper = true
|
|
}
|
|
continue
|
|
}
|
|
|
|
switch {
|
|
case len(tokens) == 1:
|
|
tokens = append(tokens, char)
|
|
allTokensUpper = false
|
|
case allTokensUpper:
|
|
tail := tokens[:len(tokens)-1]
|
|
last := tokens[len(tokens)-1:]
|
|
flush(tail)
|
|
snakeCase = append(snakeCase, '_')
|
|
tokens = tokens[:0]
|
|
tokens = append(tokens, last...)
|
|
tokens = append(tokens, char)
|
|
allTokensUpper = false
|
|
default:
|
|
tokens = append(tokens, char)
|
|
}
|
|
}
|
|
|
|
if len(tokens) > 0 {
|
|
flush(tokens)
|
|
}
|
|
s := bytesutil.ToUnsafeString(snakeCase)
|
|
return s
|
|
}
|
|
|
|
func getFloat64(v *fastjson.Value) (float64, error) {
|
|
switch v.Type() {
|
|
case fastjson.TypeNumber:
|
|
return v.Float64()
|
|
case fastjson.TypeString:
|
|
vStr, _ := v.StringBytes()
|
|
vFloat, err := fastfloat.Parse(bytesutil.ToUnsafeString(vStr))
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse value %q: %w", vStr, err)
|
|
}
|
|
return vFloat, nil
|
|
default:
|
|
return 0, fmt.Errorf("value doesn't contain float64; it contains %s", v.Type())
|
|
}
|
|
}
|