From 68b6834542f1962e9ec84a682e8cf7c944ba651d Mon Sep 17 00:00:00 2001
From: Andrii Chubatiuk <andrew.chubatiuk@gmail.com>
Date: Fri, 11 Oct 2024 14:44:52 +0300
Subject: [PATCH] lib/protoparser/opentelemetry: added exponential histograms
 support (#6354)

### Describe Your Changes

added opentelemetry exponential histograms support. Such histograms are automatically converted into
VictoriaMetrics histogram with `vmrange` buckets.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 9eb0c1fd86d6af4906d8581146eaf72412fbac6f)
---
 docs/README.md                                |   7 +-
 docs/changelog/CHANGELOG.md                   |   1 +
 lib/protoparser/opentelemetry/pb/metrics.go   | 265 +++++++++++++++++-
 .../opentelemetry/stream/streamparser.go      |  63 ++++-
 .../opentelemetry/stream/streamparser_test.go |  39 +++
 5 files changed, 364 insertions(+), 11 deletions(-)

diff --git a/docs/README.md b/docs/README.md
index 257270d870..8e54104fac 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -1535,10 +1535,15 @@ VictoriaMetrics supports data ingestion via [OpenTelemetry protocol for metrics]
 VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/v1/metrics`.
 Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/v1/metrics`.
 
+VictoriaMetrics supports only [cumulative temporality](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality)
+for received measurements. The number of dropped unsupported samples is exposed via `vm_protoparser_rows_dropped_total{type="opentelemetry"` metric.
+
 VictoriaMetrics stores the ingested OpenTelemetry [raw samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples) as is without any transformations.
 Pass `-opentelemetry.usePrometheusNaming` command-line flag to VictoriaMetrics for automatic conversion of metric names and labels into Prometheus-compatible format.
+OpenTelemetry [exponential histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) is automatically converted 
+to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350).
 
-Using the following exporter configuration in the opentelemetry collector will allow you to send metrics into VictoriaMetrics:
+Using the following exporter configuration in the OpenTelemetry collector will allow you to send metrics into VictoriaMetrics:
 
 ```yaml
 exporters:
diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md
index 2d412503ee..ee34c8c28c 100644
--- a/docs/changelog/CHANGELOG.md
+++ b/docs/changelog/CHANGELOG.md
@@ -21,6 +21,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
 * FEATURE: add Darwin binaries for [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/) to the release flow. The binaries will be available in the new release.
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): allow using HTTP/2 client for Kubernetes service discovery if `-promscrape.kubernetes.useHTTP2Client` cmd-line flag is set. This could help to reduce the amount of opened connections to the Kubernetes API server. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5971) for the details.
 * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be usefule when rules are retrieved from via HTTP where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995).
+* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add support of [exponential histograms](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram) ingested via [OpenTelemetry protocol for metrics](https://docs.victoriametrics.com/#sending-data-via-opentelemetry). Such histograms will be automatically converted to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6354).
 
 ## [v1.104.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.104.0)
 
diff --git a/lib/protoparser/opentelemetry/pb/metrics.go b/lib/protoparser/opentelemetry/pb/metrics.go
index c6f525abba..9e33251f55 100644
--- a/lib/protoparser/opentelemetry/pb/metrics.go
+++ b/lib/protoparser/opentelemetry/pb/metrics.go
@@ -151,12 +151,13 @@ func (sm *ScopeMetrics) unmarshalProtobuf(src []byte) (err error) {
 
 // Metric represents the corresponding OTEL protobuf message
 type Metric struct {
-	Name      string
-	Unit      string
-	Gauge     *Gauge
-	Sum       *Sum
-	Histogram *Histogram
-	Summary   *Summary
+	Name                 string
+	Unit                 string
+	Gauge                *Gauge
+	Sum                  *Sum
+	Histogram            *Histogram
+	ExponentialHistogram *ExponentialHistogram
+	Summary              *Summary
 }
 
 func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
@@ -169,6 +170,8 @@ func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
 		m.Sum.marshalProtobuf(mm.AppendMessage(7))
 	case m.Histogram != nil:
 		m.Histogram.marshalProtobuf(mm.AppendMessage(9))
+	case m.ExponentialHistogram != nil:
+		m.ExponentialHistogram.marshalProtobuf(mm.AppendMessage(10))
 	case m.Summary != nil:
 		m.Summary.marshalProtobuf(mm.AppendMessage(11))
 	}
@@ -182,6 +185,7 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
 	//     Gauge gauge = 5;
 	//     Sum sum = 7;
 	//     Histogram histogram = 9;
+	//     ExponentialHistogram exponential_histogram = 10;
 	//     Summary summary = 11;
 	//   }
 	// }
@@ -231,6 +235,15 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
 			if err := m.Histogram.unmarshalProtobuf(data); err != nil {
 				return fmt.Errorf("cannot unmarshal Histogram: %w", err)
 			}
+		case 10:
+			data, ok := fc.MessageData()
+			if !ok {
+				return fmt.Errorf("cannot read ExponentialHistogram data")
+			}
+			m.ExponentialHistogram = &ExponentialHistogram{}
+			if err := m.ExponentialHistogram.unmarshalProtobuf(data); err != nil {
+				return fmt.Errorf("cannot unmarshal ExponentialHistogram: %w", err)
+			}
 		case 11:
 			data, ok := fc.MessageData()
 			if !ok {
@@ -473,6 +486,52 @@ func (h *Histogram) unmarshalProtobuf(src []byte) (err error) {
 	return nil
 }
 
+// ExponentialHistogram represents the corresponding OTEL protobuf message
+type ExponentialHistogram struct {
+	DataPoints             []*ExponentialHistogramDataPoint
+	AggregationTemporality AggregationTemporality
+}
+
+func (h *ExponentialHistogram) marshalProtobuf(mm *easyproto.MessageMarshaler) {
+	for _, dp := range h.DataPoints {
+		dp.marshalProtobuf(mm.AppendMessage(1))
+	}
+	mm.AppendInt64(2, int64(h.AggregationTemporality))
+}
+
+func (h *ExponentialHistogram) unmarshalProtobuf(src []byte) (err error) {
+	// message ExponentialHistogram {
+	//   repeated ExponentialHistogramDataPoint data_points = 1;
+	//   AggregationTemporality aggregation_temporality = 2;
+	// }
+	var fc easyproto.FieldContext
+	for len(src) > 0 {
+		src, err = fc.NextField(src)
+		if err != nil {
+			return fmt.Errorf("cannot read next field in ExponentialHistogram: %w", err)
+		}
+		switch fc.FieldNum {
+		case 1:
+			data, ok := fc.MessageData()
+			if !ok {
+				return fmt.Errorf("cannot read DataPoint")
+			}
+			h.DataPoints = append(h.DataPoints, &ExponentialHistogramDataPoint{})
+			dp := h.DataPoints[len(h.DataPoints)-1]
+			if err := dp.unmarshalProtobuf(data); err != nil {
+				return fmt.Errorf("cannot unmarshal DataPoint: %w", err)
+			}
+		case 2:
+			at, ok := fc.Int64()
+			if !ok {
+				return fmt.Errorf("cannot read AggregationTemporality")
+			}
+			h.AggregationTemporality = AggregationTemporality(at)
+		}
+	}
+	return nil
+}
+
 // Summary represents the corresponding OTEL protobuf message
 type Summary struct {
 	DataPoints []*SummaryDataPoint
@@ -603,6 +662,200 @@ func (dp *HistogramDataPoint) unmarshalProtobuf(src []byte) (err error) {
 	return nil
 }
 
+// ExponentialHistogramDataPoint represents the corresponding OTEL protobuf message
+type ExponentialHistogramDataPoint struct {
+	Attributes    []*KeyValue
+	TimeUnixNano  uint64
+	Count         uint64
+	Sum           *float64
+	Scale         int32
+	ZeroCount     uint64
+	Positive      *Buckets
+	Negative      *Buckets
+	Flags         uint32
+	Min           *float64
+	Max           *float64
+	ZeroThreshold float64
+}
+
+func (dp *ExponentialHistogramDataPoint) marshalProtobuf(mm *easyproto.MessageMarshaler) {
+	for _, a := range dp.Attributes {
+		a.marshalProtobuf(mm.AppendMessage(1))
+	}
+	mm.AppendFixed64(3, dp.TimeUnixNano)
+	mm.AppendFixed64(4, dp.Count)
+	if dp.Sum != nil {
+		mm.AppendDouble(5, *dp.Sum)
+	}
+	mm.AppendSint32(6, dp.Scale)
+	mm.AppendFixed64(7, dp.ZeroCount)
+	if dp.Positive != nil {
+		dp.Positive.marshalProtobuf(mm.AppendMessage(8))
+	}
+	if dp.Negative != nil {
+		dp.Negative.marshalProtobuf(mm.AppendMessage(9))
+	}
+	mm.AppendUint32(10, dp.Flags)
+	if dp.Min != nil {
+		mm.AppendDouble(12, *dp.Min)
+	}
+	if dp.Max != nil {
+		mm.AppendDouble(13, *dp.Max)
+	}
+	mm.AppendDouble(14, dp.ZeroThreshold)
+}
+
+func (dp *ExponentialHistogramDataPoint) unmarshalProtobuf(src []byte) (err error) {
+	// message ExponentialHistogramDataPoint {
+	//   repeated KeyValue attributes = 1;
+	//   fixed64 time_unix_nano = 3;
+	//   fixed64 count = 4;
+	//   optional double sum = 5;
+	//   sint32 scale = 6;
+	//   fixed64 zero_count = 7;
+	//   Buckets positive = 8;
+	//   Buckets negative = 9;
+	//   uint32 flags = 10;
+	//   optional double min = 12;
+	//   optional double max = 13;
+	//   double zero_threshold = 14;
+	// }
+	var fc easyproto.FieldContext
+	for len(src) > 0 {
+		src, err = fc.NextField(src)
+		if err != nil {
+			return fmt.Errorf("cannot read next field in ExponentialHistogramDataPoint: %w", err)
+		}
+		switch fc.FieldNum {
+		case 1:
+			data, ok := fc.MessageData()
+			if !ok {
+				return fmt.Errorf("cannot read Attribute")
+			}
+			dp.Attributes = append(dp.Attributes, &KeyValue{})
+			a := dp.Attributes[len(dp.Attributes)-1]
+			if err := a.unmarshalProtobuf(data); err != nil {
+				return fmt.Errorf("cannot unmarshal Attribute: %w", err)
+			}
+		case 3:
+			timeUnixNano, ok := fc.Fixed64()
+			if !ok {
+				return fmt.Errorf("cannot read TimeUnixNano")
+			}
+			dp.TimeUnixNano = timeUnixNano
+		case 4:
+			count, ok := fc.Fixed64()
+			if !ok {
+				return fmt.Errorf("cannot read Count")
+			}
+			dp.Count = count
+		case 5:
+			sum, ok := fc.Double()
+			if !ok {
+				return fmt.Errorf("cannot read Sum")
+			}
+			dp.Sum = &sum
+		case 6:
+			scale, ok := fc.Sint32()
+			if !ok {
+				return fmt.Errorf("cannot read Scale")
+			}
+			dp.Scale = scale
+		case 7:
+			zeroCount, ok := fc.Fixed64()
+			if !ok {
+				return fmt.Errorf("cannot read ZeroCount")
+			}
+			dp.ZeroCount = zeroCount
+		case 8:
+			data, ok := fc.MessageData()
+			if !ok {
+				return fmt.Errorf("cannot read Positive")
+			}
+			dp.Positive = &Buckets{}
+			if err := dp.Positive.unmarshalProtobuf(data); err != nil {
+				return fmt.Errorf("cannot unmarshal Positive: %w", err)
+			}
+		case 9:
+			data, ok := fc.MessageData()
+			if !ok {
+				return fmt.Errorf("cannot read Negative")
+			}
+			dp.Negative = &Buckets{}
+			if err := dp.Negative.unmarshalProtobuf(data); err != nil {
+				return fmt.Errorf("cannot unmarshal Negative: %w", err)
+			}
+		case 10:
+			flags, ok := fc.Uint32()
+			if !ok {
+				return fmt.Errorf("cannot read Flags")
+			}
+			dp.Flags = flags
+		case 12:
+			min, ok := fc.Double()
+			if !ok {
+				return fmt.Errorf("cannot read Min")
+			}
+			dp.Min = &min
+		case 13:
+			max, ok := fc.Double()
+			if !ok {
+				return fmt.Errorf("cannot read Max")
+			}
+			dp.Max = &max
+		case 14:
+			zeroThreshold, ok := fc.Double()
+			if !ok {
+				return fmt.Errorf("cannot read ZeroThreshold")
+			}
+			dp.ZeroThreshold = zeroThreshold
+		}
+	}
+	return nil
+}
+
+// Buckets represents the corresponding OTEL protobuf message
+type Buckets struct {
+	Offset       int32
+	BucketCounts []uint64
+}
+
+func (b *Buckets) marshalProtobuf(mm *easyproto.MessageMarshaler) {
+	mm.AppendSint32(1, b.Offset)
+	for _, bc := range b.BucketCounts {
+		mm.AppendUint64(2, bc)
+	}
+}
+
+func (b *Buckets) unmarshalProtobuf(src []byte) (err error) {
+	// message Buckets {
+	//   sint32 offset = 1;
+	//   repeated uint64 bucket_counts = 2;
+	// }
+	var fc easyproto.FieldContext
+	for len(src) > 0 {
+		src, err = fc.NextField(src)
+		if err != nil {
+			return fmt.Errorf("cannot read next field in HistogramDataPoint: %w", err)
+		}
+		switch fc.FieldNum {
+		case 1:
+			offset, ok := fc.Sint32()
+			if !ok {
+				return fmt.Errorf("cannot read Offset")
+			}
+			b.Offset = offset
+		case 2:
+			bucketCounts, ok := fc.UnpackUint64s(b.BucketCounts)
+			if !ok {
+				return fmt.Errorf("cannot read BucketCounts")
+			}
+			b.BucketCounts = bucketCounts
+		}
+	}
+	return nil
+}
+
 // SummaryDataPoint represents the corresponding OTEL protobuf message
 type SummaryDataPoint struct {
 	Attributes     []*KeyValue
diff --git a/lib/protoparser/opentelemetry/stream/streamparser.go b/lib/protoparser/opentelemetry/stream/streamparser.go
index f2e60a8203..f36291c86a 100644
--- a/lib/protoparser/opentelemetry/stream/streamparser.go
+++ b/lib/protoparser/opentelemetry/stream/streamparser.go
@@ -3,6 +3,7 @@ package stream
 import (
 	"fmt"
 	"io"
+	"math"
 	"strconv"
 	"sync"
 
@@ -84,6 +85,14 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
 			for _, p := range m.Histogram.DataPoints {
 				wr.appendSamplesFromHistogram(metricName, p)
 			}
+		case m.ExponentialHistogram != nil:
+			if m.ExponentialHistogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
+				rowsDroppedUnsupportedExponentialHistogram.Inc()
+				continue
+			}
+			for _, p := range m.ExponentialHistogram.DataPoints {
+				wr.appendSamplesFromExponentialHistogram(metricName, p)
+			}
 		default:
 			rowsDroppedUnsupportedMetricType.Inc()
 			logger.Warnf("unsupported type for metric %q", metricName)
@@ -158,6 +167,51 @@ func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.Hist
 	wr.appendSampleWithExtraLabel(metricName+"_bucket", "le", "+Inf", t, float64(cumulative), isStale)
 }
 
+// appendSamplesFromExponentialHistogram appends histogram p to wr.tss
+func (wr *writeContext) appendSamplesFromExponentialHistogram(metricName string, p *pb.ExponentialHistogramDataPoint) {
+	t := int64(p.TimeUnixNano / 1e6)
+	isStale := (p.Flags)&uint32(1) != 0
+	wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
+	wr.appendSample(metricName+"_count", t, float64(p.Count), isStale)
+	if p.Sum == nil {
+		// fast path, convert metric as simple counter.
+		// given buckets cannot be used for histogram functions.
+		// Negative threshold buckets MAY be used, but then the Histogram MetricPoint MUST NOT contain a sum value as it would no longer be a counter semantically.
+		// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#histogram
+		return
+	}
+
+	wr.appendSample(metricName+"_sum", t, *p.Sum, isStale)
+	if p.ZeroCount > 0 {
+		vmRange := fmt.Sprintf("%.3e...%.3e", 0.0, p.ZeroThreshold)
+		wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(p.ZeroCount), isStale)
+	}
+	ratio := math.Pow(2, -float64(p.Scale))
+	base := math.Pow(2, ratio)
+	if p.Positive != nil {
+		bound := math.Pow(2, float64(p.Positive.Offset)*ratio)
+		for i, s := range p.Positive.BucketCounts {
+			if s > 0 {
+				lowerBound := bound * math.Pow(base, float64(i))
+				upperBound := lowerBound * base
+				vmRange := fmt.Sprintf("%.3e...%.3e", lowerBound, upperBound)
+				wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(s), isStale)
+			}
+		}
+	}
+	if p.Negative != nil {
+		bound := math.Pow(2, -float64(p.Negative.Offset)*ratio)
+		for i, s := range p.Negative.BucketCounts {
+			if s > 0 {
+				upperBound := bound * math.Pow(base, float64(i))
+				lowerBound := upperBound / base
+				vmRange := fmt.Sprintf("%.3e...%.3e", lowerBound, upperBound)
+				wr.appendSampleWithExtraLabel(metricName+"_bucket", "vmrange", vmRange, t, float64(s), isStale)
+			}
+		}
+	}
+}
+
 // appendSample appends sample with the given metricName to wr.tss
 func (wr *writeContext) appendSample(metricName string, t int64, v float64, isStale bool) {
 	wr.appendSampleWithExtraLabel(metricName, "", "", t, v, isStale)
@@ -300,8 +354,9 @@ func putWriteContext(wr *writeContext) {
 }
 
 var (
-	rowsRead                         = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`)
-	rowsDroppedUnsupportedHistogram  = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`)
-	rowsDroppedUnsupportedSum        = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`)
-	rowsDroppedUnsupportedMetricType = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`)
+	rowsRead                                   = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`)
+	rowsDroppedUnsupportedHistogram            = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`)
+	rowsDroppedUnsupportedExponentialHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_exponential_histogram_aggregation"}`)
+	rowsDroppedUnsupportedSum                  = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`)
+	rowsDroppedUnsupportedMetricType           = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`)
 )
diff --git a/lib/protoparser/opentelemetry/stream/streamparser_test.go b/lib/protoparser/opentelemetry/stream/streamparser_test.go
index 4d5dd253e6..f7f96b24ed 100644
--- a/lib/protoparser/opentelemetry/stream/streamparser_test.go
+++ b/lib/protoparser/opentelemetry/stream/streamparser_test.go
@@ -192,6 +192,21 @@ func TestParseStream(t *testing.T) {
 		},
 		true,
 	)
+
+	// Test exponential histograms
+	f(
+		[]*pb.Metric{
+			generateExpHistogram("test-histogram", "m/s"),
+		},
+		[]prompbmarshal.TimeSeries{
+			newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 5.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.061e+00...1.067e+00")),
+			newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 10.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.067e+00...1.073e+00")),
+			newPromPBTs("test_histogram_meters_per_second_bucket", 15000, 1.0, jobLabelValue, kvLabel("label1", "value1"), kvLabel("vmrange", "1.085e+00...1.091e+00")),
+			newPromPBTs("test_histogram_meters_per_second_count", 15000, 20.0, jobLabelValue, kvLabel("label1", "value1")),
+			newPromPBTs("test_histogram_meters_per_second_sum", 15000, 4578.0, jobLabelValue, kvLabel("label1", "value1")),
+		},
+		true,
+	)
 }
 
 func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeries) error) error {
@@ -227,6 +242,30 @@ func attributesFromKV(k, v string) []*pb.KeyValue {
 	}
 }
 
+func generateExpHistogram(name, unit string) *pb.Metric {
+	sum := float64(4578)
+	return &pb.Metric{
+		Name: name,
+		Unit: unit,
+		ExponentialHistogram: &pb.ExponentialHistogram{
+			AggregationTemporality: pb.AggregationTemporalityCumulative,
+			DataPoints: []*pb.ExponentialHistogramDataPoint{
+				{
+					Attributes:   attributesFromKV("label1", "value1"),
+					TimeUnixNano: uint64(15 * time.Second),
+					Count:        20,
+					Sum:          &sum,
+					Scale:        7,
+					Positive: &pb.Buckets{
+						Offset:       7,
+						BucketCounts: []uint64{0, 0, 0, 0, 5, 10, 0, 0, 1},
+					},
+				},
+			},
+		},
+	}
+}
+
 func generateGauge(name, unit string) *pb.Metric {
 	n := int64(15)
 	points := []*pb.NumberDataPoint{