lib/protoparser/opentelemetry: added exponential histograms support (#6354)

### Describe Your Changes

added opentelemetry exponential histograms support. Such histograms are automatically converted into
VictoriaMetrics histogram with `vmrange` buckets.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Andrii Chubatiuk 2024-10-11 14:44:52 +03:00 committed by GitHub
parent 8fe41b2b08
commit 9eb0c1fd86
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 364 additions and 11 deletions

View file

@ -1538,10 +1538,15 @@ VictoriaMetrics supports data ingestion via [OpenTelemetry protocol for metrics]
VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/v1/metrics`.
Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/v1/metrics`.
VictoriaMetrics supports only [cumulative temporality](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality)
for received measurements. The number of dropped unsupported samples is exposed via `vm_protoparser_rows_dropped_total{type="opentelemetry"` metric.
VictoriaMetrics stores the ingested OpenTelemetry [raw samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples) as is without any transformations.
Pass `-opentelemetry.usePrometheusNaming` command-line flag to VictoriaMetrics for automatic conversion of metric names and labels into Prometheus-compatible format.
OpenTelemetry [exponential histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) is automatically converted
to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350).
Using the following exporter configuration in the opentelemetry collector will allow you to send metrics into VictoriaMetrics:
Using the following exporter configuration in the OpenTelemetry collector will allow you to send metrics into VictoriaMetrics:
```yaml
exporters:

View file

@ -21,6 +21,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: add Darwin binaries for [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/) to the release flow. The binaries will be available in the new release.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): allow using HTTP/2 client for Kubernetes service discovery if `-promscrape.kubernetes.useHTTP2Client` cmd-line flag is set. This could help to reduce the amount of opened connections to the Kubernetes API server. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5971) for the details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be usefule when rules are retrieved from via HTTP where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add support of [exponential histograms](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) ingested via [OpenTelemetry protocol for metrics](https://docs.victoriametrics.com/#sending-data-via-opentelemetry). Such histograms will be automatically converted to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6354).
## [v1.104.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.104.0)

View file

@ -151,12 +151,13 @@ func (sm *ScopeMetrics) unmarshalProtobuf(src []byte) (err error) {
// Metric represents the corresponding OTEL protobuf message
type Metric struct {
Name string
Unit string
Gauge *Gauge
Sum *Sum
Histogram *Histogram
Summary *Summary
Name string
Unit string
Gauge *Gauge
Sum *Sum
Histogram *Histogram
ExponentialHistogram *ExponentialHistogram
Summary *Summary
}
func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
@ -169,6 +170,8 @@ func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
m.Sum.marshalProtobuf(mm.AppendMessage(7))
case m.Histogram != nil:
m.Histogram.marshalProtobuf(mm.AppendMessage(9))
case m.ExponentialHistogram != nil:
m.ExponentialHistogram.marshalProtobuf(mm.AppendMessage(10))
case m.Summary != nil:
m.Summary.marshalProtobuf(mm.AppendMessage(11))
}
@ -182,6 +185,7 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
// Gauge gauge = 5;
// Sum sum = 7;
// Histogram histogram = 9;
// ExponentialHistogram exponential_histogram = 10;
// Summary summary = 11;
// }
// }
@ -231,6 +235,15 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
if err := m.Histogram.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Histogram: %w", err)
}
case 10:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ExponentialHistogram data")
}
m.ExponentialHistogram = &ExponentialHistogram{}
if err := m.ExponentialHistogram.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ExponentialHistogram: %w", err)
}
case 11:
data, ok := fc.MessageData()
if !ok {
@ -473,6 +486,52 @@ func (h *Histogram) unmarshalProtobuf(src []byte) (err error) {
return nil
}
// ExponentialHistogram represents the corresponding OTEL protobuf message
type ExponentialHistogram struct {
DataPoints []*ExponentialHistogramDataPoint
AggregationTemporality AggregationTemporality
}
func (h *ExponentialHistogram) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, dp := range h.DataPoints {
dp.marshalProtobuf(mm.AppendMessage(1))
}
mm.AppendInt64(2, int64(h.AggregationTemporality))
}
func (h *ExponentialHistogram) unmarshalProtobuf(src []byte) (err error) {
// message ExponentialHistogram {
// repeated ExponentialHistogramDataPoint data_points = 1;
// AggregationTemporality aggregation_temporality = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ExponentialHistogram: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read DataPoint")
}
h.DataPoints = append(h.DataPoints, &ExponentialHistogramDataPoint{})
dp := h.DataPoints[len(h.DataPoints)-1]
if err := dp.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal DataPoint: %w", err)
}
case 2:
at, ok := fc.Int64()
if !ok {
return fmt.Errorf("cannot read AggregationTemporality")
}
h.AggregationTemporality = AggregationTemporality(at)
}
}
return nil
}
// Summary represents the corresponding OTEL protobuf message
type Summary struct {
DataPoints []*SummaryDataPoint
@ -603,6 +662,200 @@ func (dp *HistogramDataPoint) unmarshalProtobuf(src []byte) (err error) {
return nil
}
// ExponentialHistogramDataPoint represents the corresponding OTEL protobuf message
type ExponentialHistogramDataPoint struct {
Attributes []*KeyValue
TimeUnixNano uint64
Count uint64
Sum *float64
Scale int32
ZeroCount uint64
Positive *Buckets
Negative *Buckets
Flags uint32
Min *float64
Max *float64
ZeroThreshold float64
}
func (dp *ExponentialHistogramDataPoint) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, a := range dp.Attributes {
a.marshalProtobuf(mm.AppendMessage(1))
}
mm.AppendFixed64(3, dp.TimeUnixNano)
mm.AppendFixed64(4, dp.Count)
if dp.Sum != nil {
mm.AppendDouble(5, *dp.Sum)
}
mm.AppendSint32(6, dp.Scale)
mm.AppendFixed64(7, dp.ZeroCount)
if dp.Positive != nil {
dp.Positive.marshalProtobuf(mm.AppendMessage(8))
}
if dp.Negative != nil {
dp.Negative.marshalProtobuf(mm.AppendMessage(9))
}
mm.AppendUint32(10, dp.Flags)
if dp.Min != nil {
mm.AppendDouble(12, *dp.Min)
}
if dp.Max != nil {
mm.AppendDouble(13, *dp.Max)
}
mm.AppendDouble(14, dp.ZeroThreshold)
}
func (dp *ExponentialHistogramDataPoint) unmarshalProtobuf(src []byte) (err error) {
// message ExponentialHistogramDataPoint {
// repeated KeyValue attributes = 1;
// fixed64 time_unix_nano = 3;
// fixed64 count = 4;
// optional double sum = 5;
// sint32 scale = 6;
// fixed64 zero_count = 7;
// Buckets positive = 8;
// Buckets negative = 9;
// uint32 flags = 10;
// optional double min = 12;
// optional double max = 13;
// double zero_threshold = 14;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ExponentialHistogramDataPoint: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Attribute")
}
dp.Attributes = append(dp.Attributes, &KeyValue{})
a := dp.Attributes[len(dp.Attributes)-1]
if err := a.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Attribute: %w", err)
}
case 3:
timeUnixNano, ok := fc.Fixed64()
if !ok {
return fmt.Errorf("cannot read TimeUnixNano")
}
dp.TimeUnixNano = timeUnixNano
case 4:
count, ok := fc.Fixed64()
if !ok {
return fmt.Errorf("cannot read Count")
}
dp.Count = count
case 5:
sum, ok := fc.Double()
if !ok {
return fmt.Errorf("cannot read Sum")
}
dp.Sum = &sum
case 6:
scale, ok := fc.Sint32()
if !ok {
return fmt.Errorf("cannot read Scale")
}
dp.Scale = scale
case 7:
zeroCount, ok := fc.Fixed64()
if !ok {
return fmt.Errorf("cannot read ZeroCount")
}
dp.ZeroCount = zeroCount
case 8:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Positive")
}
dp.Positive = &Buckets{}
if err := dp.Positive.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Positive: %w", err)
}
case 9:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Negative")
}
dp.Negative = &Buckets{}
if err := dp.Negative.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Negative: %w", err)
}
case 10:
flags, ok := fc.Uint32()
if !ok {
return fmt.Errorf("cannot read Flags")
}
dp.Flags = flags
case 12:
min, ok := fc.Double()
if !ok {
return fmt.Errorf("cannot read Min")
}
dp.Min = &min
case 13:
max, ok := fc.Double()
if !ok {
return fmt.Errorf("cannot read Max")
}
dp.Max = &max
case 14:
zeroThreshold, ok := fc.Double()
if !ok {
return fmt.Errorf("cannot read ZeroThreshold")
}
dp.ZeroThreshold = zeroThreshold
}
}
return nil
}
// Buckets represents the corresponding OTEL protobuf message
type Buckets struct {
Offset int32
BucketCounts []uint64
}
func (b *Buckets) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendSint32(1, b.Offset)
for _, bc := range b.BucketCounts {
mm.AppendUint64(2, bc)
}
}
func (b *Buckets) unmarshalProtobuf(src []byte) (err error) {
// message Buckets {
// sint32 offset = 1;
// repeated uint64 bucket_counts = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in HistogramDataPoint: %w", err)
}
switch fc.FieldNum {
case 1:
offset, ok := fc.Sint32()
if !ok {
return fmt.Errorf("cannot read Offset")
}
b.Offset = offset
case 2:
bucketCounts, ok := fc.UnpackUint64s(b.BucketCounts)
if !ok {
return fmt.Errorf("cannot read BucketCounts")
}
b.BucketCounts = bucketCounts
}
}
return nil
}
// SummaryDataPoint represents the corresponding OTEL protobuf message
type SummaryDataPoint struct {
Attributes []*KeyValue

View file

@ -3,6 +3,7 @@ package stream
import (
"fmt"
"io"
"math"
"strconv"
"sync"
@ -84,6 +85,14 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
for _, p := range m.Histogram.DataPoints {
wr.appendSamplesFromHistogram(metricName, p)
}
case m.ExponentialHistogram != nil:
if m.ExponentialHistogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedExponentialHistogram.Inc()
continue
}
for _, p := range m.ExponentialHistogram.DataPoints {
wr.appendSamplesFromExponentialHistogram(metricName, p)
}
default:
rowsDroppedUnsupportedMetricType.Inc()
logger.Warnf("unsupported type for metric %q", metricName)
@ -158,6 +167,51 @@ func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.Hist
wr.appendSampleWithExtraLabel(metricName+"_bucket", "le", "+Inf", t, float64(cumulative), isStale)
}
// appendSamplesFromExponentialHistogram appends histogram p to wr.tss
func (wr *writeContext) appendSamplesFromExponentialHistogram(metricName string, p *pb.ExponentialHistogramDataPoint) {
t := int64(p.TimeUnixNano / 1e6)
isStale := (p.Flags)&uint32(1) != 0
wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
wr.appendSample(metricName+"_count", t, float64(p.Count), isStale)
if p.Sum == nil {
// fast path, convert metric as simple counter.
// given buckets cannot be used for histogram functions.
// Negative threshold buckets MAY be used, but then the Histogram MetricPoint MUST NOT contain a sum value as it would no longer be a counter semantically.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#histogram
return
}
wr.appendSample(metricName+"_sum", t, *p.Sum, isStale)
if p.ZeroCount > 0 {
vmRange := fmt.Sprintf("%.3e...%.3e", 0.0, p.ZeroThreshold)
wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(p.ZeroCount), isStale)
}
ratio := math.Pow(2, -float64(p.Scale))
base := math.Pow(2, ratio)
if p.Positive != nil {
bound := math.Pow(2, float64(p.Positive.Offset)*ratio)
for i, s := range p.Positive.BucketCounts {
if s > 0 {
lowerBound := bound * math.Pow(base, float64(i))
upperBound := lowerBound * base
vmRange := fmt.Sprintf("%.3e...%.3e", lowerBound, upperBound)
wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(s), isStale)
}
}
}
if p.Negative != nil {
bound := math.Pow(2, -float64(p.Negative.Offset)*ratio)
for i, s := range p.Negative.BucketCounts {
if s > 0 {
upperBound := bound * math.Pow(base, float64(i))
lowerBound := upperBound / base
vmRange := fmt.Sprintf("%.3e...%.3e", lowerBound, upperBound)
wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(s), isStale)
}
}
}
}
// appendSample appends sample with the given metricName to wr.tss
func (wr *writeContext) appendSample(metricName string, t int64, v float64, isStale bool) {
wr.appendSampleWithExtraLabel(metricName, "", "", t, v, isStale)
@ -300,8 +354,9 @@ func putWriteContext(wr *writeContext) {
}
var (
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`)
rowsDroppedUnsupportedHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`)
rowsDroppedUnsupportedSum = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`)
rowsDroppedUnsupportedMetricType = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`)
rowsDroppedUnsupportedHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`)
rowsDroppedUnsupportedExponentialHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_exponential_histogram_aggregation"}`)
rowsDroppedUnsupportedSum = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`)
rowsDroppedUnsupportedMetricType = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`)
)

View file

@ -192,6 +192,21 @@ func TestParseStream(t *testing.T) {
},
true,
)
// Test exponential histograms
f(
[]*pb.Metric{
generateExpHistogram("test-histogram", "m/s"),
},
[]prompbmarshal.TimeSeries{
newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 5.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.061e+00...1.067e+00")),
newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 10.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.067e+00...1.073e+00")),
newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 1.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.085e+00...1.091e+00")),
newPromPBTs("test_histogram_meters_per_second_count", 15000, 20.0, jobLabelValue, kvLabel("label1", "value1")),
newPromPBTs("test_histogram_meters_per_second_sum", 15000, 4578.0, jobLabelValue, kvLabel("label1", "value1")),
},
true,
)
}
func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeries) error) error {
@ -227,6 +242,30 @@ func attributesFromKV(k, v string) []*pb.KeyValue {
}
}
func generateExpHistogram(name, unit string) *pb.Metric {
sum := float64(4578)
return &pb.Metric{
Name: name,
Unit: unit,
ExponentialHistogram: &pb.ExponentialHistogram{
AggregationTemporality: pb.AggregationTemporalityCumulative,
DataPoints: []*pb.ExponentialHistogramDataPoint{
{
Attributes: attributesFromKV("label1", "value1"),
TimeUnixNano: uint64(15 * time.Second),
Count: 20,
Sum: &sum,
Scale: 7,
Positive: &pb.Buckets{
Offset: 7,
BucketCounts: []uint64{0, 0, 0, 0, 5, 10, 0, 0, 1},
},
},
},
},
}
}
func generateGauge(name, unit string) *pb.Metric {
n := int64(15)
points := []*pb.NumberDataPoint{