From 9eb0c1fd86d6af4906d8581146eaf72412fbac6f Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Fri, 11 Oct 2024 14:44:52 +0300 Subject: [PATCH] lib/protoparser/opentelemetry: added exponential histograms support (#6354) ### Describe Your Changes added opentelemetry exponential histograms support. Such histograms are automatically converted into VictoriaMetrics histogram with `vmrange` buckets. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 Co-authored-by: hagen1778 --- docs/README.md | 7 +- docs/changelog/CHANGELOG.md | 1 + lib/protoparser/opentelemetry/pb/metrics.go | 265 +++++++++++++++++- .../opentelemetry/stream/streamparser.go | 63 ++++- .../opentelemetry/stream/streamparser_test.go | 39 +++ 5 files changed, 364 insertions(+), 11 deletions(-) diff --git a/docs/README.md b/docs/README.md index 4f91e2dce..1dc32b45f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1538,10 +1538,15 @@ VictoriaMetrics supports data ingestion via [OpenTelemetry protocol for metrics] VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/v1/metrics`. Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/v1/metrics`. +VictoriaMetrics supports only [cumulative temporality](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality) +for received measurements. The number of dropped unsupported samples is exposed via `vm_protoparser_rows_dropped_total{type="opentelemetry"` metric. + VictoriaMetrics stores the ingested OpenTelemetry [raw samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples) as is without any transformations. Pass `-opentelemetry.usePrometheusNaming` command-line flag to VictoriaMetrics for automatic conversion of metric names and labels into Prometheus-compatible format. +OpenTelemetry [exponential histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) is automatically converted +to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). -Using the following exporter configuration in the opentelemetry collector will allow you to send metrics into VictoriaMetrics: +Using the following exporter configuration in the OpenTelemetry collector will allow you to send metrics into VictoriaMetrics: ```yaml exporters: diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 2d412503e..ee34c8c28 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -21,6 +21,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: add Darwin binaries for [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/) to the release flow. The binaries will be available in the new release. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): allow using HTTP/2 client for Kubernetes service discovery if `-promscrape.kubernetes.useHTTP2Client` cmd-line flag is set. This could help to reduce the amount of opened connections to the Kubernetes API server. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5971) for the details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be usefule when rules are retrieved from via HTTP where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add support of [exponential histograms](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) ingested via [OpenTelemetry protocol for metrics](https://docs.victoriametrics.com/#sending-data-via-opentelemetry). Such histograms will be automatically converted to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6354). ## [v1.104.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.104.0) diff --git a/lib/protoparser/opentelemetry/pb/metrics.go b/lib/protoparser/opentelemetry/pb/metrics.go index c6f525abb..9e33251f5 100644 --- a/lib/protoparser/opentelemetry/pb/metrics.go +++ b/lib/protoparser/opentelemetry/pb/metrics.go @@ -151,12 +151,13 @@ func (sm *ScopeMetrics) unmarshalProtobuf(src []byte) (err error) { // Metric represents the corresponding OTEL protobuf message type Metric struct { - Name string - Unit string - Gauge *Gauge - Sum *Sum - Histogram *Histogram - Summary *Summary + Name string + Unit string + Gauge *Gauge + Sum *Sum + Histogram *Histogram + ExponentialHistogram *ExponentialHistogram + Summary *Summary } func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) { @@ -169,6 +170,8 @@ func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) { m.Sum.marshalProtobuf(mm.AppendMessage(7)) case m.Histogram != nil: m.Histogram.marshalProtobuf(mm.AppendMessage(9)) + case m.ExponentialHistogram != nil: + m.ExponentialHistogram.marshalProtobuf(mm.AppendMessage(10)) case m.Summary != nil: m.Summary.marshalProtobuf(mm.AppendMessage(11)) } @@ -182,6 +185,7 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) { // Gauge gauge = 5; // Sum sum = 7; // Histogram histogram = 9; + // ExponentialHistogram exponential_histogram = 10; // Summary summary = 11; // } // } @@ -231,6 +235,15 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) { if err := m.Histogram.unmarshalProtobuf(data); err != nil { return fmt.Errorf("cannot unmarshal Histogram: %w", err) } + case 10: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ExponentialHistogram data") + } + m.ExponentialHistogram = &ExponentialHistogram{} + if err := m.ExponentialHistogram.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal ExponentialHistogram: %w", err) + } case 11: data, ok := fc.MessageData() if !ok { @@ -473,6 +486,52 @@ func (h *Histogram) unmarshalProtobuf(src []byte) (err error) { return nil } +// ExponentialHistogram represents the corresponding OTEL protobuf message +type ExponentialHistogram struct { + DataPoints []*ExponentialHistogramDataPoint + AggregationTemporality AggregationTemporality +} + +func (h *ExponentialHistogram) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, dp := range h.DataPoints { + dp.marshalProtobuf(mm.AppendMessage(1)) + } + mm.AppendInt64(2, int64(h.AggregationTemporality)) +} + +func (h *ExponentialHistogram) unmarshalProtobuf(src []byte) (err error) { + // message ExponentialHistogram { + // repeated ExponentialHistogramDataPoint data_points = 1; + // AggregationTemporality aggregation_temporality = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ExponentialHistogram: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read DataPoint") + } + h.DataPoints = append(h.DataPoints, &ExponentialHistogramDataPoint{}) + dp := h.DataPoints[len(h.DataPoints)-1] + if err := dp.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal DataPoint: %w", err) + } + case 2: + at, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read AggregationTemporality") + } + h.AggregationTemporality = AggregationTemporality(at) + } + } + return nil +} + // Summary represents the corresponding OTEL protobuf message type Summary struct { DataPoints []*SummaryDataPoint @@ -603,6 +662,200 @@ func (dp *HistogramDataPoint) unmarshalProtobuf(src []byte) (err error) { return nil } +// ExponentialHistogramDataPoint represents the corresponding OTEL protobuf message +type ExponentialHistogramDataPoint struct { + Attributes []*KeyValue + TimeUnixNano uint64 + Count uint64 + Sum *float64 + Scale int32 + ZeroCount uint64 + Positive *Buckets + Negative *Buckets + Flags uint32 + Min *float64 + Max *float64 + ZeroThreshold float64 +} + +func (dp *ExponentialHistogramDataPoint) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, a := range dp.Attributes { + a.marshalProtobuf(mm.AppendMessage(1)) + } + mm.AppendFixed64(3, dp.TimeUnixNano) + mm.AppendFixed64(4, dp.Count) + if dp.Sum != nil { + mm.AppendDouble(5, *dp.Sum) + } + mm.AppendSint32(6, dp.Scale) + mm.AppendFixed64(7, dp.ZeroCount) + if dp.Positive != nil { + dp.Positive.marshalProtobuf(mm.AppendMessage(8)) + } + if dp.Negative != nil { + dp.Negative.marshalProtobuf(mm.AppendMessage(9)) + } + mm.AppendUint32(10, dp.Flags) + if dp.Min != nil { + mm.AppendDouble(12, *dp.Min) + } + if dp.Max != nil { + mm.AppendDouble(13, *dp.Max) + } + mm.AppendDouble(14, dp.ZeroThreshold) +} + +func (dp *ExponentialHistogramDataPoint) unmarshalProtobuf(src []byte) (err error) { + // message ExponentialHistogramDataPoint { + // repeated KeyValue attributes = 1; + // fixed64 time_unix_nano = 3; + // fixed64 count = 4; + // optional double sum = 5; + // sint32 scale = 6; + // fixed64 zero_count = 7; + // Buckets positive = 8; + // Buckets negative = 9; + // uint32 flags = 10; + // optional double min = 12; + // optional double max = 13; + // double zero_threshold = 14; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ExponentialHistogramDataPoint: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Attribute") + } + dp.Attributes = append(dp.Attributes, &KeyValue{}) + a := dp.Attributes[len(dp.Attributes)-1] + if err := a.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Attribute: %w", err) + } + case 3: + timeUnixNano, ok := fc.Fixed64() + if !ok { + return fmt.Errorf("cannot read TimeUnixNano") + } + dp.TimeUnixNano = timeUnixNano + case 4: + count, ok := fc.Fixed64() + if !ok { + return fmt.Errorf("cannot read Count") + } + dp.Count = count + case 5: + sum, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read Sum") + } + dp.Sum = &sum + case 6: + scale, ok := fc.Sint32() + if !ok { + return fmt.Errorf("cannot read Scale") + } + dp.Scale = scale + case 7: + zeroCount, ok := fc.Fixed64() + if !ok { + return fmt.Errorf("cannot read ZeroCount") + } + dp.ZeroCount = zeroCount + case 8: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Positive") + } + dp.Positive = &Buckets{} + if err := dp.Positive.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Positive: %w", err) + } + case 9: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Negative") + } + dp.Negative = &Buckets{} + if err := dp.Negative.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Negative: %w", err) + } + case 10: + flags, ok := fc.Uint32() + if !ok { + return fmt.Errorf("cannot read Flags") + } + dp.Flags = flags + case 12: + min, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read Min") + } + dp.Min = &min + case 13: + max, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read Max") + } + dp.Max = &max + case 14: + zeroThreshold, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read ZeroThreshold") + } + dp.ZeroThreshold = zeroThreshold + } + } + return nil +} + +// Buckets represents the corresponding OTEL protobuf message +type Buckets struct { + Offset int32 + BucketCounts []uint64 +} + +func (b *Buckets) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendSint32(1, b.Offset) + for _, bc := range b.BucketCounts { + mm.AppendUint64(2, bc) + } +} + +func (b *Buckets) unmarshalProtobuf(src []byte) (err error) { + // message Buckets { + // sint32 offset = 1; + // repeated uint64 bucket_counts = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in HistogramDataPoint: %w", err) + } + switch fc.FieldNum { + case 1: + offset, ok := fc.Sint32() + if !ok { + return fmt.Errorf("cannot read Offset") + } + b.Offset = offset + case 2: + bucketCounts, ok := fc.UnpackUint64s(b.BucketCounts) + if !ok { + return fmt.Errorf("cannot read BucketCounts") + } + b.BucketCounts = bucketCounts + } + } + return nil +} + // SummaryDataPoint represents the corresponding OTEL protobuf message type SummaryDataPoint struct { Attributes []*KeyValue diff --git a/lib/protoparser/opentelemetry/stream/streamparser.go b/lib/protoparser/opentelemetry/stream/streamparser.go index f2e60a820..f36291c86 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser.go +++ b/lib/protoparser/opentelemetry/stream/streamparser.go @@ -3,6 +3,7 @@ package stream import ( "fmt" "io" + "math" "strconv" "sync" @@ -84,6 +85,14 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) { for _, p := range m.Histogram.DataPoints { wr.appendSamplesFromHistogram(metricName, p) } + case m.ExponentialHistogram != nil: + if m.ExponentialHistogram.AggregationTemporality != pb.AggregationTemporalityCumulative { + rowsDroppedUnsupportedExponentialHistogram.Inc() + continue + } + for _, p := range m.ExponentialHistogram.DataPoints { + wr.appendSamplesFromExponentialHistogram(metricName, p) + } default: rowsDroppedUnsupportedMetricType.Inc() logger.Warnf("unsupported type for metric %q", metricName) @@ -158,6 +167,51 @@ func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.Hist wr.appendSampleWithExtraLabel(metricName+"_bucket", "le", "+Inf", t, float64(cumulative), isStale) } +// appendSamplesFromExponentialHistogram appends histogram p to wr.tss +func (wr *writeContext) appendSamplesFromExponentialHistogram(metricName string, p *pb.ExponentialHistogramDataPoint) { + t := int64(p.TimeUnixNano / 1e6) + isStale := (p.Flags)&uint32(1) != 0 + wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes) + wr.appendSample(metricName+"_count", t, float64(p.Count), isStale) + if p.Sum == nil { + // fast path, convert metric as simple counter. + // given buckets cannot be used for histogram functions. + // Negative threshold buckets MAY be used, but then the Histogram MetricPoint MUST NOT contain a sum value as it would no longer be a counter semantically. + // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#histogram + return + } + + wr.appendSample(metricName+"_sum", t, *p.Sum, isStale) + if p.ZeroCount > 0 { + vmRange := fmt.Sprintf("%.3e...%.3e", 0.0, p.ZeroThreshold) + wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(p.ZeroCount), isStale) + } + ratio := math.Pow(2, -float64(p.Scale)) + base := math.Pow(2, ratio) + if p.Positive != nil { + bound := math.Pow(2, float64(p.Positive.Offset)*ratio) + for i, s := range p.Positive.BucketCounts { + if s > 0 { + lowerBound := bound * math.Pow(base, float64(i)) + upperBound := lowerBound * base + vmRange := fmt.Sprintf("%.3e...%.3e", lowerBound, upperBound) + wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(s), isStale) + } + } + } + if p.Negative != nil { + bound := math.Pow(2, -float64(p.Negative.Offset)*ratio) + for i, s := range p.Negative.BucketCounts { + if s > 0 { + upperBound := bound * math.Pow(base, float64(i)) + lowerBound := upperBound / base + vmRange := fmt.Sprintf("%.3e...%.3e", lowerBound, upperBound) + wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(s), isStale) + } + } + } +} + // appendSample appends sample with the given metricName to wr.tss func (wr *writeContext) appendSample(metricName string, t int64, v float64, isStale bool) { wr.appendSampleWithExtraLabel(metricName, "", "", t, v, isStale) @@ -300,8 +354,9 @@ func putWriteContext(wr *writeContext) { } var ( - rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`) - rowsDroppedUnsupportedHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`) - rowsDroppedUnsupportedSum = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`) - rowsDroppedUnsupportedMetricType = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`) + rowsDroppedUnsupportedHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`) + rowsDroppedUnsupportedExponentialHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_exponential_histogram_aggregation"}`) + rowsDroppedUnsupportedSum = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`) + rowsDroppedUnsupportedMetricType = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`) ) diff --git a/lib/protoparser/opentelemetry/stream/streamparser_test.go b/lib/protoparser/opentelemetry/stream/streamparser_test.go index 4d5dd253e..f7f96b24e 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_test.go +++ b/lib/protoparser/opentelemetry/stream/streamparser_test.go @@ -192,6 +192,21 @@ func TestParseStream(t *testing.T) { }, true, ) + + // Test exponential histograms + f( + []*pb.Metric{ + generateExpHistogram("test-histogram", "m/s"), + }, + []prompbmarshal.TimeSeries{ + newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 5.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.061e+00...1.067e+00")), + newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 10.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.067e+00...1.073e+00")), + newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 1.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.085e+00...1.091e+00")), + newPromPBTs("test_histogram_meters_per_second_count", 15000, 20.0, jobLabelValue, kvLabel("label1", "value1")), + newPromPBTs("test_histogram_meters_per_second_sum", 15000, 4578.0, jobLabelValue, kvLabel("label1", "value1")), + }, + true, + ) } func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeries) error) error { @@ -227,6 +242,30 @@ func attributesFromKV(k, v string) []*pb.KeyValue { } } +func generateExpHistogram(name, unit string) *pb.Metric { + sum := float64(4578) + return &pb.Metric{ + Name: name, + Unit: unit, + ExponentialHistogram: &pb.ExponentialHistogram{ + AggregationTemporality: pb.AggregationTemporalityCumulative, + DataPoints: []*pb.ExponentialHistogramDataPoint{ + { + Attributes: attributesFromKV("label1", "value1"), + TimeUnixNano: uint64(15 * time.Second), + Count: 20, + Sum: &sum, + Scale: 7, + Positive: &pb.Buckets{ + Offset: 7, + BucketCounts: []uint64{0, 0, 0, 0, 5, 10, 0, 0, 1}, + }, + }, + }, + }, + } +} + func generateGauge(name, unit string) *pb.Metric { n := int64(15) points := []*pb.NumberDataPoint{