From 5a3abfa0414ab495cbc34a58146b540aa8289636 Mon Sep 17 00:00:00 2001 From: Ted Possible Date: Tue, 7 May 2024 03:09:44 -0700 Subject: [PATCH] Exemplar support (#5982) This code adds Exemplars to VMagent and the promscrape parser adhering to OpenMetrics Specifications. This will allow forwarding of exemplars to Prometheus and other third party apps that support OpenMetrics specs. --------- Signed-off-by: Ted Possible --- app/vmagent/remotewrite/pendingseries.go | 21 +- app/vmagent/remotewrite/pendingseries_test.go | 22 +- lib/prompb/prompb.go | 119 ++++++++-- lib/prompb/prompb_test.go | 61 ++++- lib/prompbmarshal/prompbmarshal_test.go | 36 ++- lib/prompbmarshal/types.pb.go | 78 ++++++- lib/promscrape/client.go | 7 + lib/promscrape/scrapework.go | 23 +- lib/promscrape/scrapework_test.go | 36 +++ lib/protoparser/prometheus/parser.go | 218 +++++++++++++++--- lib/protoparser/prometheus/parser_test.go | 162 ++++++++++++- .../prometheus/parser_timing_test.go | 8 +- 12 files changed, 716 insertions(+), 75 deletions(-) diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index efd760e6f..3204eb279 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -109,9 +109,10 @@ type writeRequest struct { wr prompbmarshal.WriteRequest - tss []prompbmarshal.TimeSeries - labels []prompbmarshal.Label - samples []prompbmarshal.Sample + tss []prompbmarshal.TimeSeries + labels []prompbmarshal.Label + samples []prompbmarshal.Sample + exemplars []prompbmarshal.Exemplar // buf holds labels data buf []byte @@ -129,6 +130,7 @@ func (wr *writeRequest) reset() { wr.labels = wr.labels[:0] wr.samples = wr.samples[:0] + wr.exemplars = wr.exemplars[:0] wr.buf = wr.buf[:0] } @@ -200,6 +202,7 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { labelsDst := wr.labels labelsLen := len(wr.labels) samplesDst := wr.samples + exemplarsDst := wr.exemplars buf := wr.buf for i := range src.Labels { labelsDst = append(labelsDst, prompbmarshal.Label{}) @@ -216,8 +219,12 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { samplesDst = append(samplesDst, src.Samples...) dst.Samples = samplesDst[len(samplesDst)-len(src.Samples):] + exemplarsDst = append(exemplarsDst, src.Exemplars...) + dst.Exemplars = exemplarsDst[len(exemplarsDst)-len(src.Exemplars):] + wr.samples = samplesDst wr.labels = labelsDst + wr.exemplars = exemplarsDst wr.buf = buf } @@ -229,7 +236,6 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block // Nothing to push return true } - marshalConcurrencyCh <- struct{}{} bb := writeRequestBufPool.Get() @@ -266,6 +272,8 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block if len(wr.Timeseries) == 1 { // A single time series left. Recursively split its samples into smaller parts if possible. samples := wr.Timeseries[0].Samples + exemplars := wr.Timeseries[0].Exemplars + if len(samples) == 1 { logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N) return true @@ -277,11 +285,16 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block return false } wr.Timeseries[0].Samples = samples[n:] + // We do not want to send exemplars twice + wr.Timeseries[0].Exemplars = nil + if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) { wr.Timeseries[0].Samples = samples + wr.Timeseries[0].Exemplars = exemplars return false } wr.Timeseries[0].Samples = samples + wr.Timeseries[0].Exemplars = exemplars return true } timeseries := wr.Timeseries diff --git a/app/vmagent/remotewrite/pendingseries_test.go b/app/vmagent/remotewrite/pendingseries_test.go index 14b1bd451..487fdab49 100644 --- a/app/vmagent/remotewrite/pendingseries_test.go +++ b/app/vmagent/remotewrite/pendingseries_test.go @@ -10,8 +10,8 @@ import ( func TestPushWriteRequest(t *testing.T) { rowsCounts := []int{1, 10, 100, 1e3, 1e4} - expectedBlockLensProm := []int{216, 1848, 16424, 169882, 1757876} - expectedBlockLensVM := []int{138, 492, 3927, 34995, 288476} + expectedBlockLensProm := []int{248, 1952, 17433, 180381, 1861994} + expectedBlockLensVM := []int{170, 575, 4748, 44936, 367096} for i, rowsCount := range rowsCounts { expectedBlockLenProm := expectedBlockLensProm[i] expectedBlockLenVM := expectedBlockLensVM[i] @@ -59,6 +59,20 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque Value: fmt.Sprintf("value_%d_%d", i, j), }) } + exemplar := prompbmarshal.Exemplar{ + Labels: []prompbmarshal.Label{ + { + Name: "trace_id", + Value: "123456", + }, + { + Name: "log_id", + Value: "987654", + }, + }, + Value: float64(i), + Timestamp: 1000 * int64(i), + } wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{ Labels: labels, Samples: []prompbmarshal.Sample{ @@ -67,6 +81,10 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque Timestamp: 1000 * int64(i), }, }, + + Exemplars: []prompbmarshal.Exemplar{ + exemplar, + }, }) } return &wr diff --git a/lib/prompb/prompb.go b/lib/prompb/prompb.go index 34a1c5716..adc8935e2 100644 --- a/lib/prompb/prompb.go +++ b/lib/prompb/prompb.go @@ -9,10 +9,11 @@ import ( // WriteRequest represents Prometheus remote write API request. type WriteRequest struct { // Timeseries is a list of time series in the given WriteRequest - Timeseries []TimeSeries - - labelsPool []Label - samplesPool []Sample + Timeseries []TimeSeries + labelsPool []Label + exemplarLabelsPool []Label + samplesPool []Sample + exemplarsPool []Exemplar } // Reset resets wr for subsequent re-use. @@ -29,11 +30,33 @@ func (wr *WriteRequest) Reset() { } wr.labelsPool = labelsPool[:0] + exemplarLabelsPool := wr.exemplarLabelsPool + for i := range exemplarLabelsPool { + exemplarLabelsPool[i] = Label{} + } + wr.labelsPool = labelsPool[:0] samplesPool := wr.samplesPool for i := range samplesPool { samplesPool[i] = Sample{} } wr.samplesPool = samplesPool[:0] + exemplarsPool := wr.exemplarsPool + for i := range exemplarsPool { + exemplarsPool[i] = Exemplar{} + } + wr.exemplarsPool = exemplarsPool[:0] +} + +// Exemplar is an exemplar +type Exemplar struct { + // Labels a list of labels that uniquely identifies exemplar + // Optional, can be empty. + Labels []Label + // Value: the value of the exemplar + Value float64 + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + Timestamp int64 } // TimeSeries is a timeseries. @@ -42,7 +65,8 @@ type TimeSeries struct { Labels []Label // Samples is a list of samples for the given TimeSeries - Samples []Sample + Samples []Sample + Exemplars []Exemplar } // Sample is a timeseries sample. @@ -74,7 +98,10 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { // } tss := wr.Timeseries labelsPool := wr.labelsPool + exemplarLabelsPool := wr.exemplarLabelsPool samplesPool := wr.samplesPool + exemplarsPool := wr.exemplarsPool + var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) @@ -93,7 +120,7 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { tss = append(tss, TimeSeries{}) } ts := &tss[len(tss)-1] - labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool) + labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, err = ts.unmarshalProtobuf(data, labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool) if err != nil { return fmt.Errorf("cannot unmarshal timeseries: %w", err) } @@ -102,28 +129,31 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { wr.Timeseries = tss wr.labelsPool = labelsPool wr.samplesPool = samplesPool + wr.exemplarsPool = exemplarsPool return nil } -func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) { +func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplarLabelsPool []Label, samplesPool []Sample, exemplarsPool []Exemplar) ([]Label, []Label, []Sample, []Exemplar, error) { // message TimeSeries { // repeated Label labels = 1; // repeated Sample samples = 2; + // repeated Exemplar exemplars = 3 // } labelsPoolLen := len(labelsPool) samplesPoolLen := len(samplesPool) + exemplarsPoolLen := len(exemplarsPool) var fc easyproto.FieldContext for len(src) > 0 { var err error src, err = fc.NextField(src) if err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err) + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the next field: %w", err) } switch fc.FieldNum { case 1: data, ok := fc.MessageData() if !ok { - return labelsPool, samplesPool, fmt.Errorf("cannot read label data") + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read label data") } if len(labelsPool) < cap(labelsPool) { labelsPool = labelsPool[:len(labelsPool)+1] @@ -132,12 +162,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP } label := &labelsPool[len(labelsPool)-1] if err := label.unmarshalProtobuf(data); err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err) + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal label: %w", err) } case 2: data, ok := fc.MessageData() if !ok { - return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data") + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the sample data") } if len(samplesPool) < cap(samplesPool) { samplesPool = samplesPool[:len(samplesPool)+1] @@ -146,15 +176,78 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP } sample := &samplesPool[len(samplesPool)-1] if err := sample.unmarshalProtobuf(data); err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err) + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal sample: %w", err) + } + case 3: + data, ok := fc.MessageData() + if !ok { + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the exemplar data") + } + if len(exemplarsPool) < cap(exemplarsPool) { + exemplarsPool = exemplarsPool[:len(exemplarsPool)+1] + } else { + exemplarsPool = append(exemplarsPool, Exemplar{}) + } + exemplar := &exemplarsPool[len(exemplarsPool)-1] + if exemplarLabelsPool, err = exemplar.unmarshalProtobuf(data, exemplarLabelsPool); err != nil { + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal exemplar: %w", err) } } } ts.Labels = labelsPool[labelsPoolLen:] ts.Samples = samplesPool[samplesPoolLen:] - return labelsPool, samplesPool, nil + ts.Exemplars = exemplarsPool[exemplarsPoolLen:] + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, nil } +func (exemplar *Exemplar) unmarshalProtobuf(src []byte, labelsPool []Label) ([]Label, error) { + // message Exemplar { + // repeated Label Labels = 1; + // float64 Value = 2; + // int64 Timestamp = 3; + // } + var fc easyproto.FieldContext + + labelsPoolLen := len(labelsPool) + + for len(src) > 0 { + var err error + src, err = fc.NextField(src) + if err != nil { + return labelsPool, fmt.Errorf("cannot read the next field: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return labelsPool, fmt.Errorf("cannot read label data") + } + if len(labelsPool) < cap(labelsPool) { + labelsPool = labelsPool[:len(labelsPool)+1] + } else { + labelsPool = append(labelsPool, Label{}) + } + label := &labelsPool[len(labelsPool)-1] + if err := label.unmarshalProtobuf(data); err != nil { + return labelsPool, fmt.Errorf("cannot unmarshal label: %w", err) + } + case 2: + value, ok := fc.Double() + if !ok { + return labelsPool, fmt.Errorf("cannot read exemplar value") + } + exemplar.Value = value + case 3: + timestamp, ok := fc.Int64() + if !ok { + return labelsPool, fmt.Errorf("cannot read exemplar timestamp") + } + exemplar.Timestamp = timestamp + } + } + exemplar.Labels = labelsPool[labelsPoolLen:] + return labelsPool, nil +} func (lbl *Label) unmarshalProtobuf(src []byte) (err error) { // message Label { // string name = 1; diff --git a/lib/prompb/prompb_test.go b/lib/prompb/prompb_test.go index 727101206..b1d6e02bc 100644 --- a/lib/prompb/prompb_test.go +++ b/lib/prompb/prompb_test.go @@ -36,9 +36,25 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: sample.Timestamp, }) } + var exemplars []prompbmarshal.Exemplar + for _, exemplar := range ts.Exemplars { + exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels)) + for i, label := range exemplar.Labels { + exemplarLabels[i] = prompbmarshal.Label{ + Name: label.Name, + Value: label.Value, + } + } + exemplars = append(exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: exemplar.Value, + Timestamp: exemplar.Timestamp, + }) + } wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples, + Labels: labels, + Samples: samples, + Exemplars: exemplars, }) } dataResult := wrm.MarshalProtobuf(nil) @@ -121,6 +137,19 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + {Name: "trace-id", + Value: "123456", + }, + {Name: "log_id", + Value: "987664"}, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, } data = wrm.MarshalProtobuf(data[:0]) @@ -153,6 +182,18 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + { + Name: "trace-id", + Value: "123456", + }, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, { Labels: []prompbmarshal.Label{ @@ -166,6 +207,22 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Value: 9873, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + { + Name: "trace-id", + Value: "123456", + }, + { + Name: "log_id", + Value: "987654", + }, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, } data = wrm.MarshalProtobuf(data[:0]) diff --git a/lib/prompbmarshal/prompbmarshal_test.go b/lib/prompbmarshal/prompbmarshal_test.go index 99fb28c10..42716fdb2 100644 --- a/lib/prompbmarshal/prompbmarshal_test.go +++ b/lib/prompbmarshal/prompbmarshal_test.go @@ -36,6 +36,22 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + { + Name: "trace-id", + Value: "123456", + }, + { + Name: "log_id", + Value: "987654", + }, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, }, } @@ -64,9 +80,25 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) { Timestamp: sample.Timestamp, }) } + var exemplars []prompbmarshal.Exemplar + for _, exemplar := range ts.Exemplars { + exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels)) + for i, label := range exemplar.Labels { + exemplarLabels[i] = prompbmarshal.Label{ + Name: label.Name, + Value: label.Value, + } + } + exemplars = append(exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: exemplar.Value, + Timestamp: exemplar.Timestamp, + }) + } wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples, + Labels: labels, + Samples: samples, + Exemplars: exemplars, }) } dataResult := wrm.MarshalProtobuf(nil) diff --git a/lib/prompbmarshal/types.pb.go b/lib/prompbmarshal/types.pb.go index ca2b62f1b..d5295ddd0 100644 --- a/lib/prompbmarshal/types.pb.go +++ b/lib/prompbmarshal/types.pb.go @@ -13,10 +13,70 @@ type Sample struct { Timestamp int64 } +type Exemplar struct { + // Optional, can be empty. + Labels []Label + Value float64 + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + Timestamp int64 +} + +func (m *Exemplar) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Timestamp != 0 { + i = encodeVarint(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x18 + } + if m.Value != 0 { + i -= 8 + binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) + i-- + dAtA[i] = 0x11 + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} +func (m *Exemplar) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + if m.Value != 0 { + n += 9 + } + if m.Timestamp != 0 { + n += 1 + sov(uint64(m.Timestamp)) + } + return n +} + // TimeSeries represents samples and labels for a single time series. type TimeSeries struct { - Labels []Label - Samples []Sample + Labels []Label + Samples []Sample + Exemplars []Exemplar } type Label struct { @@ -42,6 +102,16 @@ func (m *Sample) MarshalToSizedBuffer(dst []byte) (int, error) { func (m *TimeSeries) MarshalToSizedBuffer(dst []byte) (int, error) { i := len(dst) + for j := len(m.Exemplars) - 1; j >= 0; j-- { + size, err := m.Exemplars[j].MarshalToSizedBuffer(dst[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dst, i, uint64(size)) + i-- + dst[i] = 0x1a + } for j := len(m.Samples) - 1; j >= 0; j-- { size, err := m.Samples[j].MarshalToSizedBuffer(dst[:i]) if err != nil { @@ -109,6 +179,10 @@ func (m *TimeSeries) Size() (n int) { l := e.Size() n += 1 + l + sov(uint64(l)) } + for _, e := range m.Exemplars { + l := e.Size() + n += 1 + l + sov(uint64(l)) + } return n } diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index e5f7b4a5e..31457911a 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -30,6 +30,7 @@ var ( streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+ "for reducing memory usage when millions of metrics are exposed per each scrape target. "+ "It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control") + scrapeExemplars = flag.Bool("promscrape.scrapeExemplars", false, "Whether to enable scraping of exemplars from scrape targets.") ) type client struct { @@ -107,6 +108,12 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") + // We set to support exemplars to be compatible with Prometheus Exposition format which uses + // Open Metrics Specification + // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#openmetrics-text-format + if *scrapeExemplars { + req.Header.Set("Accept", "application/openmetrics-text") + } // Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 837db6dc5..ad478dbf7 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -670,6 +670,7 @@ type writeRequestCtx struct { writeRequest prompbmarshal.WriteRequest labels []prompbmarshal.Label samples []prompbmarshal.Sample + exemplars []prompbmarshal.Exemplar } func (wc *writeRequestCtx) reset() { @@ -684,6 +685,7 @@ func (wc *writeRequestCtx) resetNoRows() { wc.labels = wc.labels[:0] wc.samples = wc.samples[:0] + wc.exemplars = wc.exemplars[:0] } var writeRequestCtxPool leveledWriteRequestCtxPool @@ -902,10 +904,27 @@ func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, tim Value: r.Value, Timestamp: sampleTimestamp, }) + // Add Exemplars to Timeseries + exemplarsLen := len(wc.exemplars) + exemplarTagsLen := len(r.Exemplar.Tags) + if exemplarTagsLen > 0 { + exemplarLabels := make([]prompbmarshal.Label, exemplarTagsLen) + for i, label := range r.Exemplar.Tags { + exemplarLabels[i].Name = label.Key + exemplarLabels[i].Value = label.Value + } + wc.exemplars = append(wc.exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: r.Exemplar.Value, + Timestamp: r.Exemplar.Timestamp, + }) + + } wr := &wc.writeRequest wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{ - Labels: wc.labels[labelsLen:], - Samples: wc.samples[len(wc.samples)-1:], + Labels: wc.labels[labelsLen:], + Samples: wc.samples[len(wc.samples)-1:], + Exemplars: wc.exemplars[exemplarsLen:], }) } diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 787203d29..a4a972c62 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -708,6 +708,11 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) { HonorLabels: true, }, `metric{a="e",foo="bar"} 0 123`) + f(`metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`, + &ScrapeWork{ + HonorLabels: true, + }, + `metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`) } func TestSendStaleSeries(t *testing.T) { @@ -765,6 +770,8 @@ func parseData(data string) []prompbmarshal.TimeSeries { } rows.UnmarshalWithErrLogger(data, errLogger) var tss []prompbmarshal.TimeSeries + var exemplars []prompbmarshal.Exemplar + for _, r := range rows.Rows { labels := []prompbmarshal.Label{ { @@ -778,6 +785,21 @@ func parseData(data string) []prompbmarshal.TimeSeries { Value: tag.Value, }) } + exemplarLabels := []prompbmarshal.Label{} + if len(r.Exemplar.Tags) > 0 { + for _, tag := range r.Exemplar.Tags { + exemplarLabels = append(exemplarLabels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + exemplars = append(exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: r.Exemplar.Value, + Timestamp: r.Exemplar.Timestamp, + }) + } + var ts prompbmarshal.TimeSeries ts.Labels = labels ts.Samples = []prompbmarshal.Sample{ @@ -786,6 +808,7 @@ func parseData(data string) []prompbmarshal.TimeSeries { Timestamp: r.Timestamp, }, } + ts.Exemplars = exemplars tss = append(tss, ts) } return tss @@ -850,6 +873,19 @@ func timeseriesToString(ts *prompbmarshal.TimeSeries) string { } s := ts.Samples[0] fmt.Fprintf(&sb, "%g %d", s.Value, s.Timestamp) + // Add Exemplars to the end of string + for j, exemplar := range ts.Exemplars { + for i, label := range exemplar.Labels { + fmt.Fprintf(&sb, "%s=%q", label.Name, label.Value) + if i+1 < len(ts.Labels) { + fmt.Fprintf(&sb, ",") + } + } + fmt.Fprintf(&sb, "%g %d", exemplar.Value, exemplar.Timestamp) + if j+1 < len(ts.Exemplars) { + fmt.Fprintf(&sb, ",") + } + } return sb.String() } diff --git a/lib/protoparser/prometheus/parser.go b/lib/protoparser/prometheus/parser.go index 923737f12..58ef46f1f 100644 --- a/lib/protoparser/prometheus/parser.go +++ b/lib/protoparser/prometheus/parser.go @@ -57,12 +57,33 @@ func (rs *Rows) UnmarshalWithErrLogger(s string, errLogger func(s string)) { rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], noEscapes, errLogger) } +const tagsPrefix = '{' +const exemplarPreifx = '{' + +// Exemplar Item +type Exemplar struct { + // Tags: a list of labels that uniquely identifies exemplar + Tags []Tag + // Value: the value of the exemplar + Value float64 + // Timestamp: the time when exemplar was recorded + Timestamp int64 +} + +// Reset - resets the Exemplar object to defaults +func (e *Exemplar) Reset() { + e.Tags = nil + e.Value = 0 + e.Timestamp = 0 +} + // Row is a single Prometheus row. type Row struct { Metric string Tags []Tag Value float64 Timestamp int64 + Exemplar Exemplar } func (r *Row) reset() { @@ -70,6 +91,7 @@ func (r *Row) reset() { r.Tags = nil r.Value = 0 r.Timestamp = 0 + r.Exemplar = Exemplar{} } func skipTrailingComment(s string) string { @@ -110,69 +132,140 @@ func nextWhitespace(s string) int { return n1 } -func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { - r.reset() +func parseStringToTags(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) { + n := strings.IndexByte(s, tagsPrefix) + c := strings.IndexByte(s, '#') + if c != -1 && c < n { + return s, tagsPool, nil + } s = skipLeadingWhitespace(s) - n := strings.IndexByte(s, '{') if n >= 0 { // Tags found. Parse them. - r.Metric = skipTrailingWhitespace(s[:n]) s = s[n+1:] - tagsStart := len(tagsPool) var err error s, tagsPool, err = unmarshalTags(tagsPool, s, noEscapes) if err != nil { - return tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err) + return s, tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err) } if len(s) > 0 && s[0] == ' ' { // Fast path - skip whitespace. s = s[1:] } - tags := tagsPool[tagsStart:] - r.Tags = tags[:len(tags):len(tags)] - } else { - // Tags weren't found. Search for value after whitespace - n = nextWhitespace(s) - if n < 0 { - return tagsPool, fmt.Errorf("missing value") - } - r.Metric = s[:n] - s = s[n+1:] } - if len(r.Metric) == 0 { - return tagsPool, fmt.Errorf("metric cannot be empty") + return s, tagsPool, nil +} + +var tvtPool = sync.Pool{New: func() interface{} { + return &tagsValueTimestamp{} +}} + +func getTVT() *tagsValueTimestamp { + return tvtPool.Get().(*tagsValueTimestamp) +} +func putTVT(tvt *tagsValueTimestamp) { + tvt.reset() + tvtPool.Put(tvt) +} + +type tagsValueTimestamp struct { + Prefix string + Value float64 + Timestamp int64 + Comments string +} + +func (tvt *tagsValueTimestamp) reset() { + tvt.Prefix = "" + tvt.Value = 0 + tvt.Timestamp = 0 + tvt.Comments = "" +} +func parseTagsValueTimestamp(s string, tagsPool []Tag, noEscapes bool) ([]Tag, *tagsValueTimestamp, error) { + tvt := getTVT() + n := 0 + // Prefix is everything up to a tag start or a space + t := strings.IndexByte(s, tagsPrefix) + // If there is no tag start process rest of string + mustParseTags := false + if t != -1 { + // Check to see if there is a space before tag + n = nextWhitespace(s) + // If there is a space + if n > 0 { + if n < t { + tvt.Prefix = s[:n] + s = skipLeadingWhitespace(s[n:]) + // Cover the use case where there is whitespace between the prefix and the tag + if len(s) > 0 && s[0] == '{' { + mustParseTags = true + } + // Most likely this has an exemplar + } else { + tvt.Prefix = s[:t] + s = s[t:] + mustParseTags = true + } + } + if mustParseTags { + var err error + s, tagsPool, err = parseStringToTags(s, tagsPool, noEscapes) + if err != nil { + return tagsPool, tvt, err + } + } + } else { + // Tag doesn't exist + n = nextWhitespace(s) + if n != -1 { + tvt.Prefix = s[:n] + s = s[n:] + } else { + tvt.Prefix = s + return tagsPool, tvt, fmt.Errorf("missing value") + } } s = skipLeadingWhitespace(s) - s = skipTrailingComment(s) - if len(s) == 0 { - return tagsPool, fmt.Errorf("value cannot be empty") + // save and remove the comments + n = strings.IndexByte(s, '#') + if n >= 0 { + tvt.Comments = s[n:] + if len(tvt.Comments) > 1 { + tvt.Comments = s[n+1:] + } else { + tvt.Comments = "" + } + s = skipTrailingComment(s) + s = skipLeadingWhitespace(s) + s = skipTrailingWhitespace(s) } n = nextWhitespace(s) if n < 0 { // There is no timestamp. v, err := fastfloat.Parse(s) if err != nil { - return tagsPool, fmt.Errorf("cannot parse value %q: %w", s, err) + return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s, err) } - r.Value = v - return tagsPool, nil + tvt.Value = v + return tagsPool, tvt, nil } - // There is a timestamp. + // There is a timestamp + s = skipLeadingWhitespace(s) v, err := fastfloat.Parse(s[:n]) if err != nil { - return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err) + return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s[:n], err) } - r.Value = v - s = skipLeadingWhitespace(s[n+1:]) + tvt.Value = v + s = s[n:] + // There are some whitespaces after timestamp + s = skipLeadingWhitespace(s) if len(s) == 0 { // There is no timestamp - just a whitespace after the value. - return tagsPool, nil + return tagsPool, tvt, nil } - // There are some whitespaces after timestamp s = skipTrailingWhitespace(s) ts, err := fastfloat.Parse(s) if err != nil { - return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err) + return tagsPool, tvt, fmt.Errorf("cannot parse timestamp %q: %w", s, err) } if ts >= -1<<31 && ts < 1<<31 { // This looks like OpenMetrics timestamp in Unix seconds. @@ -181,7 +274,68 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) // See https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps ts *= 1000 } - r.Timestamp = int64(ts) + tvt.Timestamp = int64(ts) + return tagsPool, tvt, nil +} + +// Returns possible comments that could be exemplars +func (r *Row) unmarshalMetric(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) { + tagsStart := len(tagsPool) + var err error + var tvt *tagsValueTimestamp + tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes) + defer putTVT(tvt) + if err != nil { + return "", tagsPool, err + } + r.Metric = tvt.Prefix + tags := tagsPool[tagsStart:] + if len(tags) > 0 { + r.Tags = tags + } + r.Value = tvt.Value + r.Timestamp = tvt.Timestamp + return tvt.Comments, tagsPool, nil + +} +func (e *Exemplar) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { + // We can use the Comment parsed out to further parse the Exemplar + s = skipLeadingWhitespace(s) + // If we are a comment immediately followed by whitespace or a labelset + // then we are an exemplar + if len(s) != 0 && s[0] == exemplarPreifx { + var err error + var tvt *tagsValueTimestamp + tagsStart := len(tagsPool) + tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes) + defer putTVT(tvt) + if err != nil { + return tagsPool, err + } + tags := tagsPool[tagsStart:] + if len(tags) > 0 { + e.Tags = tags + } + e.Value = tvt.Value + e.Timestamp = tvt.Timestamp + return tagsPool, nil + } + return tagsPool, nil + +} +func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { + r.reset() + // Parse for labels, the value and the timestamp + // Anything before labels is saved in Prefix we can use this + // for the metric name + comments, tagsPool, err := r.unmarshalMetric(s, tagsPool, noEscapes) + if err != nil { + return nil, err + } + tagsPool, err = r.Exemplar.unmarshal(comments, tagsPool, noEscapes) + if err != nil { + return nil, err + } return tagsPool, nil } diff --git a/lib/protoparser/prometheus/parser_test.go b/lib/protoparser/prometheus/parser_test.go index 39116c937..ff95f25b1 100644 --- a/lib/protoparser/prometheus/parser_test.go +++ b/lib/protoparser/prometheus/parser_test.go @@ -362,12 +362,87 @@ cassandra_token_ownership_ratio 78.9`, &Rows{ Value: 56, }}, }) - - // Exemplars - see https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1 - f(`foo_bucket{le="10",a="#b"} 17 # {trace_id="oHg5SJ#YRHA0"} 9.8 1520879607.789 - abc 123 456 # foobar - foo 344#bar`, &Rows{ + // Support for Exemplars Open Metric Specification + // see: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars-1 + f(`foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789`, &Rows{ Rows: []Row{ + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "25", + }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + { + Key: "log_id", + Value: "test_id", + }, + }, + Timestamp: 1520879607789, + }, + }, + }}) + f(`foo_bucket{le="0.01"} 0 +foo_bucket{le="0.1"} 8 # {} 0.054 +foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67 +foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 +foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789 +foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 +foo_bucket{le="+Inf"} 17 +foo_count 17 +foo_sum 324789.3 +foo_created 1520430000.123`, &Rows{ + Rows: []Row{ + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "0.01", + }, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "0.1", + }, + }, + Value: 8, + Exemplar: Exemplar{ + Value: 0.054, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "1", + }, + }, + Value: 11, + Exemplar: Exemplar{ + Value: 0.67, + Tags: []Tag{ + { + Key: "trace_id", + Value: "KOO5S4vxi0o", + }, + }, + }, + }, { Metric: "foo_bucket", Tags: []Tag{ @@ -375,21 +450,84 @@ cassandra_token_ownership_ratio 78.9`, &Rows{ Key: "le", Value: "10", }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + }, + Timestamp: 1520879607789, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ { - Key: "a", - Value: "#b", + Key: "le", + Value: "25", + }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + { + Key: "log_id", + Value: "test_id", + }, + }, + Timestamp: 1520879607789, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "nospace", + Value: "exemplar", + }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + }, + Timestamp: 1520879607789, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "+Inf", }, }, Value: 17, }, { - Metric: "abc", - Value: 123, - Timestamp: 456000, + Metric: "foo_count", + Value: 17, }, { - Metric: "foo", - Value: 344, + Metric: "foo_sum", + Value: 324789.3, + }, + { + Metric: "foo_created", + Value: 1520430000.123, }, }, }) diff --git a/lib/protoparser/prometheus/parser_timing_test.go b/lib/protoparser/prometheus/parser_timing_test.go index 8d6a2827b..acabb7460 100644 --- a/lib/protoparser/prometheus/parser_timing_test.go +++ b/lib/protoparser/prometheus/parser_timing_test.go @@ -146,10 +146,10 @@ container_ulimits_soft{container="kube-scheduler",id="/kubelet/kubepods/burstabl } func BenchmarkRowsUnmarshal(b *testing.B) { - s := `cpu_usage{mode="user"} 1.23 -cpu_usage{mode="system"} 23.344 -cpu_usage{mode="iowait"} 3.3443 -cpu_usage{mode="irq"} 0.34432 + s := `foo_bucket{le="0.01"} 0 + foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67 + foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 + foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 ` b.SetBytes(int64(len(s))) b.ReportAllocs()