VictoriaMetrics/lib/protoparser/opentelemetry/stream/streamparser.go

446 lines
13 KiB
Go

package stream
import (
"flag"
"fmt"
"io"
"strconv"
"strings"
"sync"
"unicode"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
)
var (
// sanitizeMetrics controls sanitizing metric and label names ingested via OpenTelemetry protocol.
sanitizeMetrics = flag.Bool("opentelemetry.sanitizeMetrics", false, "Sanitize metric and label names for the ingested OpenTelemetry data")
)
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L19
var unitMap = []struct {
prefix string
units map[string]string
}{
{
units: map[string]string{
// Time
"d": "days",
"h": "hours",
"min": "minutes",
"s": "seconds",
"ms": "milliseconds",
"us": "microseconds",
"ns": "nanoseconds",
// Bytes
"By": "bytes",
"KiBy": "kibibytes",
"MiBy": "mebibytes",
"GiBy": "gibibytes",
"TiBy": "tibibytes",
"KBy": "kilobytes",
"MBy": "megabytes",
"GBy": "gigabytes",
"TBy": "terabytes",
// SI
"m": "meters",
"V": "volts",
"A": "amperes",
"J": "joules",
"W": "watts",
"g": "grams",
// Misc
"Cel": "celsius",
"Hz": "hertz",
"1": "",
"%": "percent",
},
}, {
prefix: "per",
units: map[string]string{
"s": "second",
"m": "minute",
"h": "hour",
"d": "day",
"w": "week",
"mo": "month",
"y": "year",
},
},
}
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
//
// callback shouldn't hold tss items after returning.
//
// optional processBody can be used for pre-processing the read request body from r before parsing it in OpenTelemetry format.
func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzip-compressed OpenTelemetry protocol data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
wr := getWriteContext()
defer putWriteContext(wr)
req, err := wr.readAndUnpackRequest(r, processBody)
if err != nil {
return fmt.Errorf("cannot unpack OpenTelemetry metrics: %w", err)
}
wr.parseRequestToTss(req)
if err := callback(wr.tss); err != nil {
return fmt.Errorf("error when processing OpenTelemetry samples: %w", err)
}
return nil
}
func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
for _, m := range sc.Metrics {
if len(m.Name) == 0 {
// skip metrics without names
continue
}
metricName := sanitizeMetricName(m)
switch {
case m.Gauge != nil:
for _, p := range m.Gauge.DataPoints {
wr.appendSampleFromNumericPoint(metricName, p)
}
case m.Sum != nil:
if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedSum.Inc()
continue
}
for _, p := range m.Sum.DataPoints {
wr.appendSampleFromNumericPoint(metricName, p)
}
case m.Summary != nil:
for _, p := range m.Summary.DataPoints {
wr.appendSamplesFromSummary(metricName, p)
}
case m.Histogram != nil:
if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedHistogram.Inc()
continue
}
for _, p := range m.Histogram.DataPoints {
wr.appendSamplesFromHistogram(metricName, p)
}
default:
rowsDroppedUnsupportedMetricType.Inc()
logger.Warnf("unsupported type for metric %q", metricName)
}
}
}
// appendSampleFromNumericPoint appends p to wr.tss
func (wr *writeContext) appendSampleFromNumericPoint(metricName string, p *pb.NumberDataPoint) {
var v float64
switch {
case p.IntValue != nil:
v = float64(*p.IntValue)
case p.DoubleValue != nil:
v = *p.DoubleValue
}
t := int64(p.TimeUnixNano / 1e6)
isStale := (p.Flags)&uint32(1) != 0
wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
wr.appendSample(metricName, t, v, isStale)
}
// appendSamplesFromSummary appends summary p to wr.tss
func (wr *writeContext) appendSamplesFromSummary(metricName string, p *pb.SummaryDataPoint) {
t := int64(p.TimeUnixNano / 1e6)
isStale := (p.Flags)&uint32(1) != 0
wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
wr.appendSample(metricName+"_sum", t, p.Sum, isStale)
wr.appendSample(metricName+"_count", t, float64(p.Count), isStale)
for _, q := range p.QuantileValues {
qValue := strconv.FormatFloat(q.Quantile, 'f', -1, 64)
wr.appendSampleWithExtraLabel(metricName, "quantile", qValue, t, q.Value, isStale)
}
}
// appendSamplesFromHistogram appends histogram p to wr.tss
func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.HistogramDataPoint) {
if len(p.BucketCounts) == 0 {
// nothing to append
return
}
if len(p.BucketCounts) != len(p.ExplicitBounds)+1 {
// fast path, broken data format
logger.Warnf("opentelemetry bad histogram format: %q, size of buckets: %d, size of bounds: %d", metricName, len(p.BucketCounts), len(p.ExplicitBounds))
return
}
t := int64(p.TimeUnixNano / 1e6)
isStale := (p.Flags)&uint32(1) != 0
wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
wr.appendSample(metricName+"_count", t, float64(p.Count), isStale)
if p.Sum == nil {
// fast path, convert metric as simple counter.
// given buckets cannot be used for histogram functions.
// Negative threshold buckets MAY be used, but then the Histogram MetricPoint MUST NOT contain a sum value as it would no longer be a counter semantically.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#histogram
return
}
wr.appendSample(metricName+"_sum", t, *p.Sum, isStale)
var cumulative uint64
for index, bound := range p.ExplicitBounds {
cumulative += p.BucketCounts[index]
boundLabelValue := strconv.FormatFloat(bound, 'f', -1, 64)
wr.appendSampleWithExtraLabel(metricName+"_bucket", "le", boundLabelValue, t, float64(cumulative), isStale)
}
cumulative += p.BucketCounts[len(p.BucketCounts)-1]
wr.appendSampleWithExtraLabel(metricName+"_bucket", "le", "+Inf", t, float64(cumulative), isStale)
}
// appendSample appends sample with the given metricName to wr.tss
func (wr *writeContext) appendSample(metricName string, t int64, v float64, isStale bool) {
wr.appendSampleWithExtraLabel(metricName, "", "", t, v, isStale)
}
// appendSampleWithExtraLabel appends sample with the given metricName and the given (labelName=labelValue) extra label to wr.tss
func (wr *writeContext) appendSampleWithExtraLabel(metricName, labelName, labelValue string, t int64, v float64, isStale bool) {
if isStale {
v = decimal.StaleNaN
}
if t <= 0 {
// Set the current timestamp if t isn't set.
t = int64(fasttime.UnixTimestamp()) * 1000
}
labelsPool := wr.labelsPool
labelsLen := len(labelsPool)
labelsPool = append(labelsPool, prompbmarshal.Label{
Name: "__name__",
Value: metricName,
})
labelsPool = append(labelsPool, wr.baseLabels...)
labelsPool = append(labelsPool, wr.pointLabels...)
if labelName != "" && labelValue != "" {
labelsPool = append(labelsPool, prompbmarshal.Label{
Name: labelName,
Value: labelValue,
})
}
samplesPool := wr.samplesPool
samplesLen := len(samplesPool)
samplesPool = append(samplesPool, prompbmarshal.Sample{
Timestamp: t,
Value: v,
})
wr.tss = append(wr.tss, prompbmarshal.TimeSeries{
Labels: labelsPool[labelsLen:],
Samples: samplesPool[samplesLen:],
})
wr.labelsPool = labelsPool
wr.samplesPool = samplesPool
rowsRead.Inc()
}
// appendAttributesToPromLabels appends attributes to dst and returns the result.
func appendAttributesToPromLabels(dst []prompbmarshal.Label, attributes []*pb.KeyValue) []prompbmarshal.Label {
for _, at := range attributes {
dst = append(dst, prompbmarshal.Label{
Name: sanitizeLabelName(at.Key),
Value: at.Value.FormatString(),
})
}
return dst
}
type writeContext struct {
// bb holds the original data (json or protobuf), which must be parsed.
bb bytesutil.ByteBuffer
// tss holds parsed time series
tss []prompbmarshal.TimeSeries
// baseLabels are labels, which must be added to all the ingested samples
baseLabels []prompbmarshal.Label
// pointLabels are labels, which must be added to the ingested OpenTelemetry points
pointLabels []prompbmarshal.Label
// pools are used for reducing memory allocations when parsing time series
labelsPool []prompbmarshal.Label
samplesPool []prompbmarshal.Sample
}
func (wr *writeContext) reset() {
wr.bb.Reset()
tss := wr.tss
for i := range tss {
ts := &tss[i]
ts.Labels = nil
ts.Samples = nil
}
wr.tss = tss[:0]
wr.baseLabels = resetLabels(wr.baseLabels)
wr.pointLabels = resetLabels(wr.pointLabels)
wr.labelsPool = resetLabels(wr.labelsPool)
wr.samplesPool = wr.samplesPool[:0]
}
func resetLabels(labels []prompbmarshal.Label) []prompbmarshal.Label {
for i := range labels {
labels[i] = prompbmarshal.Label{}
}
return labels[:0]
}
func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func([]byte) ([]byte, error)) (*pb.ExportMetricsServiceRequest, error) {
if _, err := wr.bb.ReadFrom(r); err != nil {
return nil, fmt.Errorf("cannot read request: %w", err)
}
var req pb.ExportMetricsServiceRequest
if processBody != nil {
data, err := processBody(wr.bb.B)
if err != nil {
return nil, fmt.Errorf("cannot process request body: %w", err)
}
wr.bb.B = append(wr.bb.B[:0], data...)
}
if err := req.UnmarshalProtobuf(wr.bb.B); err != nil {
return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err)
}
return &req, nil
}
func (wr *writeContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) {
for _, rm := range req.ResourceMetrics {
var attributes []*pb.KeyValue
if rm.Resource != nil {
attributes = rm.Resource.Attributes
}
wr.baseLabels = appendAttributesToPromLabels(wr.baseLabels[:0], attributes)
for _, sc := range rm.ScopeMetrics {
wr.appendSamplesFromScopeMetrics(sc)
}
}
}
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_label.go#L26
func sanitizeLabelName(labelName string) string {
if !*sanitizeMetrics {
return labelName
}
if len(labelName) == 0 {
return labelName
}
labelName = promrelabel.SanitizeLabelName(labelName)
if unicode.IsDigit(rune(labelName[0])) {
return "key_" + labelName
} else if strings.HasPrefix(labelName, "_") && !strings.HasPrefix(labelName, "__") {
return "key" + labelName
}
return labelName
}
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L83
func sanitizeMetricName(metric *pb.Metric) string {
if !*sanitizeMetrics {
return metric.Name
}
nameTokens := promrelabel.SanitizeLabelNameParts(metric.Name)
unitTokens := strings.SplitN(metric.Unit, "/", len(unitMap))
for i, u := range unitTokens {
unitToken := strings.TrimSpace(u)
if unitToken == "" || strings.ContainsAny(unitToken, "{}") {
continue
}
if unit, ok := unitMap[i].units[unitToken]; ok {
unitToken = unit
}
if unitToken != "" && !containsToken(nameTokens, unitToken) {
unitPrefix := unitMap[i].prefix
if unitPrefix != "" {
nameTokens = append(nameTokens, unitPrefix, unitToken)
} else {
nameTokens = append(nameTokens, unitToken)
}
}
}
if metric.Sum != nil && metric.Sum.IsMonotonic {
nameTokens = moveOrAppend(nameTokens, "total")
} else if metric.Unit == "1" && metric.Gauge != nil {
nameTokens = moveOrAppend(nameTokens, "ratio")
}
return strings.Join(nameTokens, "_")
}
func containsToken(tokens []string, value string) bool {
for _, token := range tokens {
if token == value {
return true
}
}
return false
}
func moveOrAppend(tokens []string, value string) []string {
for t := range tokens {
if tokens[t] == value {
tokens = append(tokens[:t], tokens[t+1:]...)
break
}
}
return append(tokens, value)
}
var wrPool sync.Pool
func getWriteContext() *writeContext {
v := wrPool.Get()
if v == nil {
return &writeContext{}
}
return v.(*writeContext)
}
func putWriteContext(wr *writeContext) {
wr.reset()
wrPool.Put(wr)
}
var (
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`)
rowsDroppedUnsupportedHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`)
rowsDroppedUnsupportedSum = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`)
rowsDroppedUnsupportedMetricType = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`)
)