From 0799834aaa9cc37790b13d7c676a1cffb84bf239 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Fri, 29 Mar 2024 14:51:24 +0200 Subject: [PATCH] opentelemetry: added cmd flag to sanitize metric names (#6035) --- docs/Single-server-VictoriaMetrics.md | 3 + lib/promrelabel/relabel.go | 5 + lib/protoparser/opentelemetry/pb/pb.go | 17 ++ .../opentelemetry/stream/streamparser.go | 145 +++++++++++++++++- .../opentelemetry/stream/streamparser_test.go | 97 ++++++++++-- .../stream/streamparser_timing_test.go | 8 +- 6 files changed, 254 insertions(+), 21 deletions(-) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 9508806c0..de59e0e3d 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1557,6 +1557,9 @@ VictoriaMetrics supports data ingestion via [OpenTelemetry protocol for metrics] VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/v1/metrics`. Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/v1/metrics`. +VictoriaMetrics automatically does not sanitize metric names for the data ingested via OpenTelemetry protocol +If you need accepting metric and label names as is with sanitizing, then pass `-opentelemetry.sanitizeMetrics=true` command-line flag to VictoriaMetrics. + See [How to use OpenTelemetry metrics with VictoriaMetrics](https://docs.victoriametrics.com/guides/getting-started-with-opentelemetry/). ## JSON line format diff --git a/lib/promrelabel/relabel.go b/lib/promrelabel/relabel.go index ba0a896ab..2e306fe70 100644 --- a/lib/promrelabel/relabel.go +++ b/lib/promrelabel/relabel.go @@ -663,6 +663,11 @@ func SanitizeLabelName(name string) string { return labelNameSanitizer.Transform(name) } +// SanitizeLabelNameParts returns label name slice generated from metric name divided by unsupported characters +func SanitizeLabelNameParts(name string) []string { + return unsupportedLabelNameChars.Split(name, -1) +} + var labelNameSanitizer = bytesutil.NewFastStringTransformer(func(s string) string { return unsupportedLabelNameChars.ReplaceAllString(s, "_") }) diff --git a/lib/protoparser/opentelemetry/pb/pb.go b/lib/protoparser/opentelemetry/pb/pb.go index 11d9c8291..883950e9f 100644 --- a/lib/protoparser/opentelemetry/pb/pb.go +++ b/lib/protoparser/opentelemetry/pb/pb.go @@ -190,6 +190,7 @@ func (sm *ScopeMetrics) unmarshalProtobuf(src []byte) (err error) { // Metric represents the corresponding OTEL protobuf message type Metric struct { Name string + Unit string Gauge *Gauge Sum *Sum Histogram *Histogram @@ -198,6 +199,7 @@ type Metric struct { func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) { mm.AppendString(1, m.Name) + mm.AppendString(3, m.Unit) switch { case m.Gauge != nil: m.Gauge.marshalProtobuf(mm.AppendMessage(5)) @@ -213,6 +215,7 @@ func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) { func (m *Metric) unmarshalProtobuf(src []byte) (err error) { // message Metric { // string name = 1; + // string unit = 3; // oneof data { // Gauge gauge = 5; // Sum sum = 7; @@ -233,6 +236,12 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) { return fmt.Errorf("cannot read metric name") } m.Name = strings.Clone(name) + case 3: + unit, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read metric unit") + } + m.Unit = strings.Clone(unit) case 5: data, ok := fc.MessageData() if !ok { @@ -617,6 +626,7 @@ func (ndp *NumberDataPoint) unmarshalProtobuf(src []byte) (err error) { type Sum struct { DataPoints []*NumberDataPoint AggregationTemporality AggregationTemporality + IsMonotonic bool } // AggregationTemporality represents the corresponding OTEL protobuf enum @@ -636,6 +646,7 @@ func (s *Sum) marshalProtobuf(mm *easyproto.MessageMarshaler) { dp.marshalProtobuf(mm.AppendMessage(1)) } mm.AppendInt64(2, int64(s.AggregationTemporality)) + mm.AppendBool(3, s.IsMonotonic) } func (s *Sum) unmarshalProtobuf(src []byte) (err error) { @@ -666,6 +677,12 @@ func (s *Sum) unmarshalProtobuf(src []byte) (err error) { return fmt.Errorf("cannot read AggregationTemporality") } s.AggregationTemporality = AggregationTemporality(at) + case 3: + im, ok := fc.Bool() + if !ok { + return fmt.Errorf("cannot read IsMonotonic") + } + s.IsMonotonic = im } } return nil diff --git a/lib/protoparser/opentelemetry/stream/streamparser.go b/lib/protoparser/opentelemetry/stream/streamparser.go index d0b18c7ce..92aeab67a 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser.go +++ b/lib/protoparser/opentelemetry/stream/streamparser.go @@ -1,10 +1,13 @@ package stream import ( + "flag" "fmt" "io" "strconv" + "strings" "sync" + "unicode" "github.com/VictoriaMetrics/metrics" @@ -13,11 +16,72 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" ) +var ( + // sanitizeMetrics controls sanitizing metric and label names ingested via OpenTelemetry protocol. + sanitizeMetrics = flag.Bool("opentelemetry.sanitizeMetrics", false, "Sanitize metric and label names for the ingested OpenTelemetry data") +) + +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L19 +var unitMap = []struct { + prefix string + units map[string]string +}{ + { + units: map[string]string{ + // Time + "d": "days", + "h": "hours", + "min": "minutes", + "s": "seconds", + "ms": "milliseconds", + "us": "microseconds", + "ns": "nanoseconds", + + // Bytes + "By": "bytes", + "KiBy": "kibibytes", + "MiBy": "mebibytes", + "GiBy": "gibibytes", + "TiBy": "tibibytes", + "KBy": "kilobytes", + "MBy": "megabytes", + "GBy": "gigabytes", + "TBy": "terabytes", + + // SI + "m": "meters", + "V": "volts", + "A": "amperes", + "J": "joules", + "W": "watts", + "g": "grams", + + // Misc + "Cel": "celsius", + "Hz": "hertz", + "1": "", + "%": "percent", + }, + }, { + prefix: "per", + units: map[string]string{ + "s": "second", + "m": "minute", + "h": "hour", + "d": "day", + "w": "week", + "mo": "month", + "y": "year", + }, + }, +} + // ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows. // // callback shouldn't hold tss items after returning. @@ -58,10 +122,11 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) { // skip metrics without names continue } + metricName := sanitizeMetricName(m) switch { case m.Gauge != nil: for _, p := range m.Gauge.DataPoints { - wr.appendSampleFromNumericPoint(m.Name, p) + wr.appendSampleFromNumericPoint(metricName, p) } case m.Sum != nil: if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative { @@ -69,11 +134,11 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) { continue } for _, p := range m.Sum.DataPoints { - wr.appendSampleFromNumericPoint(m.Name, p) + wr.appendSampleFromNumericPoint(metricName, p) } case m.Summary != nil: for _, p := range m.Summary.DataPoints { - wr.appendSamplesFromSummary(m.Name, p) + wr.appendSamplesFromSummary(metricName, p) } case m.Histogram != nil: if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative { @@ -81,11 +146,11 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) { continue } for _, p := range m.Histogram.DataPoints { - wr.appendSamplesFromHistogram(m.Name, p) + wr.appendSamplesFromHistogram(metricName, p) } default: rowsDroppedUnsupportedMetricType.Inc() - logger.Warnf("unsupported type for metric %q", m.Name) + logger.Warnf("unsupported type for metric %q", metricName) } } } @@ -209,7 +274,7 @@ func (wr *writeContext) appendSampleWithExtraLabel(metricName, labelName, labelV func appendAttributesToPromLabels(dst []prompbmarshal.Label, attributes []*pb.KeyValue) []prompbmarshal.Label { for _, at := range attributes { dst = append(dst, prompbmarshal.Label{ - Name: at.Key, + Name: sanitizeLabelName(at.Key), Value: at.Value.FormatString(), }) } @@ -290,6 +355,74 @@ func (wr *writeContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) { } } +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_label.go#L26 +func sanitizeLabelName(labelName string) string { + if !*sanitizeMetrics { + return labelName + } + if len(labelName) == 0 { + return labelName + } + labelName = promrelabel.SanitizeLabelName(labelName) + if unicode.IsDigit(rune(labelName[0])) { + return "key_" + labelName + } else if strings.HasPrefix(labelName, "_") && !strings.HasPrefix(labelName, "__") { + return "key" + labelName + } + return labelName +} + +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L83 +func sanitizeMetricName(metric *pb.Metric) string { + if !*sanitizeMetrics { + return metric.Name + } + nameTokens := promrelabel.SanitizeLabelNameParts(metric.Name) + unitTokens := strings.SplitN(metric.Unit, "/", len(unitMap)) + for i, u := range unitTokens { + unitToken := strings.TrimSpace(u) + if unitToken == "" || strings.ContainsAny(unitToken, "{}") { + continue + } + if unit, ok := unitMap[i].units[unitToken]; ok { + unitToken = unit + } + if unitToken != "" && !containsToken(nameTokens, unitToken) { + unitPrefix := unitMap[i].prefix + if unitPrefix != "" { + nameTokens = append(nameTokens, unitPrefix, unitToken) + } else { + nameTokens = append(nameTokens, unitToken) + } + } + } + if metric.Sum != nil && metric.Sum.IsMonotonic { + nameTokens = moveOrAppend(nameTokens, "total") + } else if metric.Unit == "1" && metric.Gauge != nil { + nameTokens = moveOrAppend(nameTokens, "ratio") + } + return strings.Join(nameTokens, "_") +} + +func containsToken(tokens []string, value string) bool { + for _, token := range tokens { + if token == value { + return true + } + } + return false +} + +func moveOrAppend(tokens []string, value string) []string { + for t := range tokens { + if tokens[t] == value { + tokens = append(tokens[:t], tokens[t+1:]...) + break + } + } + return append(tokens, value) +} + var wrPool sync.Pool func getWriteContext() *writeContext { diff --git a/lib/protoparser/opentelemetry/stream/streamparser_test.go b/lib/protoparser/opentelemetry/stream/streamparser_test.go index 1d5281d4a..242cb39f3 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_test.go +++ b/lib/protoparser/opentelemetry/stream/streamparser_test.go @@ -15,8 +15,9 @@ import ( ) func TestParseStream(t *testing.T) { - f := func(samples []*pb.Metric, tssExpected []prompbmarshal.TimeSeries) { + f := func(samples []*pb.Metric, tssExpected []prompbmarshal.TimeSeries, sanitize bool) { t.Helper() + *sanitizeMetrics = sanitize checkSeries := func(tss []prompbmarshal.TimeSeries) error { if len(tss) != len(tssExpected) { @@ -86,10 +87,10 @@ func TestParseStream(t *testing.T) { // Test all metric types f( []*pb.Metric{ - generateGauge("my-gauge"), - generateHistogram("my-histogram"), - generateSum("my-sum"), - generateSummary("my-summary"), + generateGauge("my-gauge", ""), + generateHistogram("my-histogram", ""), + generateSum("my-sum", "", false), + generateSummary("my-summary", ""), }, []prompbmarshal.TimeSeries{ newPromPBTs("my-gauge", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")), @@ -106,16 +107,85 @@ func TestParseStream(t *testing.T) { newPromPBTs("my-summary", 35000, 7.5, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "0.1")), newPromPBTs("my-summary", 35000, 10.0, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "0.5")), newPromPBTs("my-summary", 35000, 15.0, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "1")), - }) + }, + false, + ) // Test gauge f( []*pb.Metric{ - generateGauge("my-gauge"), + generateGauge("my-gauge", ""), }, []prompbmarshal.TimeSeries{ newPromPBTs("my-gauge", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")), }, + false, + ) + + // Test gauge with unit and sanitization + f( + []*pb.Metric{ + generateGauge("my-gauge", "ms"), + }, + []prompbmarshal.TimeSeries{ + newPromPBTs("my_gauge_milliseconds", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")), + }, + true, + ) + + // Test gauge with unit inside metric + f( + []*pb.Metric{ + generateGauge("my-gauge-milliseconds", "ms"), + }, + []prompbmarshal.TimeSeries{ + newPromPBTs("my_gauge_milliseconds", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")), + }, + true, + ) + + // Test gauge with ratio suffix + f( + []*pb.Metric{ + generateGauge("my-gauge-milliseconds", "1"), + }, + []prompbmarshal.TimeSeries{ + newPromPBTs("my_gauge_milliseconds_ratio", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")), + }, + true, + ) + + // Test sum with total suffix + f( + []*pb.Metric{ + generateSum("my-sum", "ms", true), + }, + []prompbmarshal.TimeSeries{ + newPromPBTs("my_sum_milliseconds_total", 150000, 15.5, jobLabelValue, kvLabel("label5", "value5")), + }, + true, + ) + + // Test sum with total suffix, which exists in a metric name + f( + []*pb.Metric{ + generateSum("my-total-sum", "ms", true), + }, + []prompbmarshal.TimeSeries{ + newPromPBTs("my_sum_milliseconds_total", 150000, 15.5, jobLabelValue, kvLabel("label5", "value5")), + }, + true, + ) + + // Test sum with total and complex suffix + f( + []*pb.Metric{ + generateSum("my-total-sum", "m/s", true), + }, + []prompbmarshal.TimeSeries{ + newPromPBTs("my_sum_meters_per_second_total", 150000, 15.5, jobLabelValue, kvLabel("label5", "value5")), + }, + true, ) } @@ -152,7 +222,7 @@ func attributesFromKV(k, v string) []*pb.KeyValue { } } -func generateGauge(name string) *pb.Metric { +func generateGauge(name, unit string) *pb.Metric { n := int64(15) points := []*pb.NumberDataPoint{ { @@ -163,13 +233,14 @@ func generateGauge(name string) *pb.Metric { } return &pb.Metric{ Name: name, + Unit: unit, Gauge: &pb.Gauge{ DataPoints: points, }, } } -func generateHistogram(name string) *pb.Metric { +func generateHistogram(name, unit string) *pb.Metric { points := []*pb.HistogramDataPoint{ { @@ -183,6 +254,7 @@ func generateHistogram(name string) *pb.Metric { } return &pb.Metric{ Name: name, + Unit: unit, Histogram: &pb.Histogram{ AggregationTemporality: pb.AggregationTemporalityCumulative, DataPoints: points, @@ -190,7 +262,7 @@ func generateHistogram(name string) *pb.Metric { } } -func generateSum(name string) *pb.Metric { +func generateSum(name, unit string, isMonotonic bool) *pb.Metric { d := float64(15.5) points := []*pb.NumberDataPoint{ { @@ -201,14 +273,16 @@ func generateSum(name string) *pb.Metric { } return &pb.Metric{ Name: name, + Unit: unit, Sum: &pb.Sum{ AggregationTemporality: pb.AggregationTemporalityCumulative, DataPoints: points, + IsMonotonic: isMonotonic, }, } } -func generateSummary(name string) *pb.Metric { +func generateSummary(name, unit string) *pb.Metric { points := []*pb.SummaryDataPoint{ { Attributes: attributesFromKV("label6", "value6"), @@ -233,6 +307,7 @@ func generateSummary(name string) *pb.Metric { } return &pb.Metric{ Name: name, + Unit: unit, Summary: &pb.Summary{ DataPoints: points, }, diff --git a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go b/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go index e1f73aaf1..2915786cd 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go +++ b/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go @@ -10,10 +10,10 @@ import ( func BenchmarkParseStream(b *testing.B) { samples := []*pb.Metric{ - generateGauge("my-gauge"), - generateHistogram("my-histogram"), - generateSum("my-sum"), - generateSummary("my-summary"), + generateGauge("my-gauge", ""), + generateHistogram("my-histogram", ""), + generateSum("my-sum", "", false), + generateSummary("my-summary", ""), } b.SetBytes(1) b.ReportAllocs()