diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index efd760e6f..3204eb279 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -109,9 +109,10 @@ type writeRequest struct { wr prompbmarshal.WriteRequest - tss []prompbmarshal.TimeSeries - labels []prompbmarshal.Label - samples []prompbmarshal.Sample + tss []prompbmarshal.TimeSeries + labels []prompbmarshal.Label + samples []prompbmarshal.Sample + exemplars []prompbmarshal.Exemplar // buf holds labels data buf []byte @@ -129,6 +130,7 @@ func (wr *writeRequest) reset() { wr.labels = wr.labels[:0] wr.samples = wr.samples[:0] + wr.exemplars = wr.exemplars[:0] wr.buf = wr.buf[:0] } @@ -200,6 +202,7 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { labelsDst := wr.labels labelsLen := len(wr.labels) samplesDst := wr.samples + exemplarsDst := wr.exemplars buf := wr.buf for i := range src.Labels { labelsDst = append(labelsDst, prompbmarshal.Label{}) @@ -216,8 +219,12 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { samplesDst = append(samplesDst, src.Samples...) dst.Samples = samplesDst[len(samplesDst)-len(src.Samples):] + exemplarsDst = append(exemplarsDst, src.Exemplars...) + dst.Exemplars = exemplarsDst[len(exemplarsDst)-len(src.Exemplars):] + wr.samples = samplesDst wr.labels = labelsDst + wr.exemplars = exemplarsDst wr.buf = buf } @@ -229,7 +236,6 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block // Nothing to push return true } - marshalConcurrencyCh <- struct{}{} bb := writeRequestBufPool.Get() @@ -266,6 +272,8 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block if len(wr.Timeseries) == 1 { // A single time series left. Recursively split its samples into smaller parts if possible. samples := wr.Timeseries[0].Samples + exemplars := wr.Timeseries[0].Exemplars + if len(samples) == 1 { logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N) return true @@ -277,11 +285,16 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block return false } wr.Timeseries[0].Samples = samples[n:] + // We do not want to send exemplars twice + wr.Timeseries[0].Exemplars = nil + if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) { wr.Timeseries[0].Samples = samples + wr.Timeseries[0].Exemplars = exemplars return false } wr.Timeseries[0].Samples = samples + wr.Timeseries[0].Exemplars = exemplars return true } timeseries := wr.Timeseries diff --git a/app/vmagent/remotewrite/pendingseries_test.go b/app/vmagent/remotewrite/pendingseries_test.go index 14b1bd451..487fdab49 100644 --- a/app/vmagent/remotewrite/pendingseries_test.go +++ b/app/vmagent/remotewrite/pendingseries_test.go @@ -10,8 +10,8 @@ import ( func TestPushWriteRequest(t *testing.T) { rowsCounts := []int{1, 10, 100, 1e3, 1e4} - expectedBlockLensProm := []int{216, 1848, 16424, 169882, 1757876} - expectedBlockLensVM := []int{138, 492, 3927, 34995, 288476} + expectedBlockLensProm := []int{248, 1952, 17433, 180381, 1861994} + expectedBlockLensVM := []int{170, 575, 4748, 44936, 367096} for i, rowsCount := range rowsCounts { expectedBlockLenProm := expectedBlockLensProm[i] expectedBlockLenVM := expectedBlockLensVM[i] @@ -59,6 +59,20 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque Value: fmt.Sprintf("value_%d_%d", i, j), }) } + exemplar := prompbmarshal.Exemplar{ + Labels: []prompbmarshal.Label{ + { + Name: "trace_id", + Value: "123456", + }, + { + Name: "log_id", + Value: "987654", + }, + }, + Value: float64(i), + Timestamp: 1000 * int64(i), + } wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{ Labels: labels, Samples: []prompbmarshal.Sample{ @@ -67,6 +81,10 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque Timestamp: 1000 * int64(i), }, }, + + Exemplars: []prompbmarshal.Exemplar{ + exemplar, + }, }) } return &wr diff --git a/lib/prompb/prompb.go b/lib/prompb/prompb.go index 34a1c5716..adc8935e2 100644 --- a/lib/prompb/prompb.go +++ b/lib/prompb/prompb.go @@ -9,10 +9,11 @@ import ( // WriteRequest represents Prometheus remote write API request. type WriteRequest struct { // Timeseries is a list of time series in the given WriteRequest - Timeseries []TimeSeries - - labelsPool []Label - samplesPool []Sample + Timeseries []TimeSeries + labelsPool []Label + exemplarLabelsPool []Label + samplesPool []Sample + exemplarsPool []Exemplar } // Reset resets wr for subsequent re-use. @@ -29,11 +30,33 @@ func (wr *WriteRequest) Reset() { } wr.labelsPool = labelsPool[:0] + exemplarLabelsPool := wr.exemplarLabelsPool + for i := range exemplarLabelsPool { + exemplarLabelsPool[i] = Label{} + } + wr.labelsPool = labelsPool[:0] samplesPool := wr.samplesPool for i := range samplesPool { samplesPool[i] = Sample{} } wr.samplesPool = samplesPool[:0] + exemplarsPool := wr.exemplarsPool + for i := range exemplarsPool { + exemplarsPool[i] = Exemplar{} + } + wr.exemplarsPool = exemplarsPool[:0] +} + +// Exemplar is an exemplar +type Exemplar struct { + // Labels a list of labels that uniquely identifies exemplar + // Optional, can be empty. + Labels []Label + // Value: the value of the exemplar + Value float64 + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + Timestamp int64 } // TimeSeries is a timeseries. @@ -42,7 +65,8 @@ type TimeSeries struct { Labels []Label // Samples is a list of samples for the given TimeSeries - Samples []Sample + Samples []Sample + Exemplars []Exemplar } // Sample is a timeseries sample. @@ -74,7 +98,10 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { // } tss := wr.Timeseries labelsPool := wr.labelsPool + exemplarLabelsPool := wr.exemplarLabelsPool samplesPool := wr.samplesPool + exemplarsPool := wr.exemplarsPool + var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) @@ -93,7 +120,7 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { tss = append(tss, TimeSeries{}) } ts := &tss[len(tss)-1] - labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool) + labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, err = ts.unmarshalProtobuf(data, labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool) if err != nil { return fmt.Errorf("cannot unmarshal timeseries: %w", err) } @@ -102,28 +129,31 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { wr.Timeseries = tss wr.labelsPool = labelsPool wr.samplesPool = samplesPool + wr.exemplarsPool = exemplarsPool return nil } -func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) { +func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplarLabelsPool []Label, samplesPool []Sample, exemplarsPool []Exemplar) ([]Label, []Label, []Sample, []Exemplar, error) { // message TimeSeries { // repeated Label labels = 1; // repeated Sample samples = 2; + // repeated Exemplar exemplars = 3 // } labelsPoolLen := len(labelsPool) samplesPoolLen := len(samplesPool) + exemplarsPoolLen := len(exemplarsPool) var fc easyproto.FieldContext for len(src) > 0 { var err error src, err = fc.NextField(src) if err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err) + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the next field: %w", err) } switch fc.FieldNum { case 1: data, ok := fc.MessageData() if !ok { - return labelsPool, samplesPool, fmt.Errorf("cannot read label data") + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read label data") } if len(labelsPool) < cap(labelsPool) { labelsPool = labelsPool[:len(labelsPool)+1] @@ -132,12 +162,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP } label := &labelsPool[len(labelsPool)-1] if err := label.unmarshalProtobuf(data); err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err) + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal label: %w", err) } case 2: data, ok := fc.MessageData() if !ok { - return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data") + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the sample data") } if len(samplesPool) < cap(samplesPool) { samplesPool = samplesPool[:len(samplesPool)+1] @@ -146,15 +176,78 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP } sample := &samplesPool[len(samplesPool)-1] if err := sample.unmarshalProtobuf(data); err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err) + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal sample: %w", err) + } + case 3: + data, ok := fc.MessageData() + if !ok { + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the exemplar data") + } + if len(exemplarsPool) < cap(exemplarsPool) { + exemplarsPool = exemplarsPool[:len(exemplarsPool)+1] + } else { + exemplarsPool = append(exemplarsPool, Exemplar{}) + } + exemplar := &exemplarsPool[len(exemplarsPool)-1] + if exemplarLabelsPool, err = exemplar.unmarshalProtobuf(data, exemplarLabelsPool); err != nil { + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal exemplar: %w", err) } } } ts.Labels = labelsPool[labelsPoolLen:] ts.Samples = samplesPool[samplesPoolLen:] - return labelsPool, samplesPool, nil + ts.Exemplars = exemplarsPool[exemplarsPoolLen:] + return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, nil } +func (exemplar *Exemplar) unmarshalProtobuf(src []byte, labelsPool []Label) ([]Label, error) { + // message Exemplar { + // repeated Label Labels = 1; + // float64 Value = 2; + // int64 Timestamp = 3; + // } + var fc easyproto.FieldContext + + labelsPoolLen := len(labelsPool) + + for len(src) > 0 { + var err error + src, err = fc.NextField(src) + if err != nil { + return labelsPool, fmt.Errorf("cannot read the next field: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return labelsPool, fmt.Errorf("cannot read label data") + } + if len(labelsPool) < cap(labelsPool) { + labelsPool = labelsPool[:len(labelsPool)+1] + } else { + labelsPool = append(labelsPool, Label{}) + } + label := &labelsPool[len(labelsPool)-1] + if err := label.unmarshalProtobuf(data); err != nil { + return labelsPool, fmt.Errorf("cannot unmarshal label: %w", err) + } + case 2: + value, ok := fc.Double() + if !ok { + return labelsPool, fmt.Errorf("cannot read exemplar value") + } + exemplar.Value = value + case 3: + timestamp, ok := fc.Int64() + if !ok { + return labelsPool, fmt.Errorf("cannot read exemplar timestamp") + } + exemplar.Timestamp = timestamp + } + } + exemplar.Labels = labelsPool[labelsPoolLen:] + return labelsPool, nil +} func (lbl *Label) unmarshalProtobuf(src []byte) (err error) { // message Label { // string name = 1; diff --git a/lib/prompb/prompb_test.go b/lib/prompb/prompb_test.go index 727101206..b1d6e02bc 100644 --- a/lib/prompb/prompb_test.go +++ b/lib/prompb/prompb_test.go @@ -36,9 +36,25 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: sample.Timestamp, }) } + var exemplars []prompbmarshal.Exemplar + for _, exemplar := range ts.Exemplars { + exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels)) + for i, label := range exemplar.Labels { + exemplarLabels[i] = prompbmarshal.Label{ + Name: label.Name, + Value: label.Value, + } + } + exemplars = append(exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: exemplar.Value, + Timestamp: exemplar.Timestamp, + }) + } wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples, + Labels: labels, + Samples: samples, + Exemplars: exemplars, }) } dataResult := wrm.MarshalProtobuf(nil) @@ -121,6 +137,19 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + {Name: "trace-id", + Value: "123456", + }, + {Name: "log_id", + Value: "987664"}, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, } data = wrm.MarshalProtobuf(data[:0]) @@ -153,6 +182,18 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + { + Name: "trace-id", + Value: "123456", + }, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, { Labels: []prompbmarshal.Label{ @@ -166,6 +207,22 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Value: 9873, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + { + Name: "trace-id", + Value: "123456", + }, + { + Name: "log_id", + Value: "987654", + }, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, } data = wrm.MarshalProtobuf(data[:0]) diff --git a/lib/prompbmarshal/prompbmarshal_test.go b/lib/prompbmarshal/prompbmarshal_test.go index 99fb28c10..42716fdb2 100644 --- a/lib/prompbmarshal/prompbmarshal_test.go +++ b/lib/prompbmarshal/prompbmarshal_test.go @@ -36,6 +36,22 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, + Exemplars: []prompbmarshal.Exemplar{ + { + Labels: []prompbmarshal.Label{ + { + Name: "trace-id", + Value: "123456", + }, + { + Name: "log_id", + Value: "987654", + }, + }, + Value: 12345.6, + Timestamp: 456, + }, + }, }, }, } @@ -64,9 +80,25 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) { Timestamp: sample.Timestamp, }) } + var exemplars []prompbmarshal.Exemplar + for _, exemplar := range ts.Exemplars { + exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels)) + for i, label := range exemplar.Labels { + exemplarLabels[i] = prompbmarshal.Label{ + Name: label.Name, + Value: label.Value, + } + } + exemplars = append(exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: exemplar.Value, + Timestamp: exemplar.Timestamp, + }) + } wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples, + Labels: labels, + Samples: samples, + Exemplars: exemplars, }) } dataResult := wrm.MarshalProtobuf(nil) diff --git a/lib/prompbmarshal/types.pb.go b/lib/prompbmarshal/types.pb.go index ca2b62f1b..d5295ddd0 100644 --- a/lib/prompbmarshal/types.pb.go +++ b/lib/prompbmarshal/types.pb.go @@ -13,10 +13,70 @@ type Sample struct { Timestamp int64 } +type Exemplar struct { + // Optional, can be empty. + Labels []Label + Value float64 + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + Timestamp int64 +} + +func (m *Exemplar) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Timestamp != 0 { + i = encodeVarint(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x18 + } + if m.Value != 0 { + i -= 8 + binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) + i-- + dAtA[i] = 0x11 + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} +func (m *Exemplar) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + if m.Value != 0 { + n += 9 + } + if m.Timestamp != 0 { + n += 1 + sov(uint64(m.Timestamp)) + } + return n +} + // TimeSeries represents samples and labels for a single time series. type TimeSeries struct { - Labels []Label - Samples []Sample + Labels []Label + Samples []Sample + Exemplars []Exemplar } type Label struct { @@ -42,6 +102,16 @@ func (m *Sample) MarshalToSizedBuffer(dst []byte) (int, error) { func (m *TimeSeries) MarshalToSizedBuffer(dst []byte) (int, error) { i := len(dst) + for j := len(m.Exemplars) - 1; j >= 0; j-- { + size, err := m.Exemplars[j].MarshalToSizedBuffer(dst[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dst, i, uint64(size)) + i-- + dst[i] = 0x1a + } for j := len(m.Samples) - 1; j >= 0; j-- { size, err := m.Samples[j].MarshalToSizedBuffer(dst[:i]) if err != nil { @@ -109,6 +179,10 @@ func (m *TimeSeries) Size() (n int) { l := e.Size() n += 1 + l + sov(uint64(l)) } + for _, e := range m.Exemplars { + l := e.Size() + n += 1 + l + sov(uint64(l)) + } return n } diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index e5f7b4a5e..31457911a 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -30,6 +30,7 @@ var ( streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+ "for reducing memory usage when millions of metrics are exposed per each scrape target. "+ "It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control") + scrapeExemplars = flag.Bool("promscrape.scrapeExemplars", false, "Whether to enable scraping of exemplars from scrape targets.") ) type client struct { @@ -107,6 +108,12 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") + // We set to support exemplars to be compatible with Prometheus Exposition format which uses + // Open Metrics Specification + // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#openmetrics-text-format + if *scrapeExemplars { + req.Header.Set("Accept", "application/openmetrics-text") + } // Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 837db6dc5..ad478dbf7 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -670,6 +670,7 @@ type writeRequestCtx struct { writeRequest prompbmarshal.WriteRequest labels []prompbmarshal.Label samples []prompbmarshal.Sample + exemplars []prompbmarshal.Exemplar } func (wc *writeRequestCtx) reset() { @@ -684,6 +685,7 @@ func (wc *writeRequestCtx) resetNoRows() { wc.labels = wc.labels[:0] wc.samples = wc.samples[:0] + wc.exemplars = wc.exemplars[:0] } var writeRequestCtxPool leveledWriteRequestCtxPool @@ -902,10 +904,27 @@ func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, tim Value: r.Value, Timestamp: sampleTimestamp, }) + // Add Exemplars to Timeseries + exemplarsLen := len(wc.exemplars) + exemplarTagsLen := len(r.Exemplar.Tags) + if exemplarTagsLen > 0 { + exemplarLabels := make([]prompbmarshal.Label, exemplarTagsLen) + for i, label := range r.Exemplar.Tags { + exemplarLabels[i].Name = label.Key + exemplarLabels[i].Value = label.Value + } + wc.exemplars = append(wc.exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: r.Exemplar.Value, + Timestamp: r.Exemplar.Timestamp, + }) + + } wr := &wc.writeRequest wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{ - Labels: wc.labels[labelsLen:], - Samples: wc.samples[len(wc.samples)-1:], + Labels: wc.labels[labelsLen:], + Samples: wc.samples[len(wc.samples)-1:], + Exemplars: wc.exemplars[exemplarsLen:], }) } diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 787203d29..a4a972c62 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -708,6 +708,11 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) { HonorLabels: true, }, `metric{a="e",foo="bar"} 0 123`) + f(`metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`, + &ScrapeWork{ + HonorLabels: true, + }, + `metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`) } func TestSendStaleSeries(t *testing.T) { @@ -765,6 +770,8 @@ func parseData(data string) []prompbmarshal.TimeSeries { } rows.UnmarshalWithErrLogger(data, errLogger) var tss []prompbmarshal.TimeSeries + var exemplars []prompbmarshal.Exemplar + for _, r := range rows.Rows { labels := []prompbmarshal.Label{ { @@ -778,6 +785,21 @@ func parseData(data string) []prompbmarshal.TimeSeries { Value: tag.Value, }) } + exemplarLabels := []prompbmarshal.Label{} + if len(r.Exemplar.Tags) > 0 { + for _, tag := range r.Exemplar.Tags { + exemplarLabels = append(exemplarLabels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + exemplars = append(exemplars, prompbmarshal.Exemplar{ + Labels: exemplarLabels, + Value: r.Exemplar.Value, + Timestamp: r.Exemplar.Timestamp, + }) + } + var ts prompbmarshal.TimeSeries ts.Labels = labels ts.Samples = []prompbmarshal.Sample{ @@ -786,6 +808,7 @@ func parseData(data string) []prompbmarshal.TimeSeries { Timestamp: r.Timestamp, }, } + ts.Exemplars = exemplars tss = append(tss, ts) } return tss @@ -850,6 +873,19 @@ func timeseriesToString(ts *prompbmarshal.TimeSeries) string { } s := ts.Samples[0] fmt.Fprintf(&sb, "%g %d", s.Value, s.Timestamp) + // Add Exemplars to the end of string + for j, exemplar := range ts.Exemplars { + for i, label := range exemplar.Labels { + fmt.Fprintf(&sb, "%s=%q", label.Name, label.Value) + if i+1 < len(ts.Labels) { + fmt.Fprintf(&sb, ",") + } + } + fmt.Fprintf(&sb, "%g %d", exemplar.Value, exemplar.Timestamp) + if j+1 < len(ts.Exemplars) { + fmt.Fprintf(&sb, ",") + } + } return sb.String() } diff --git a/lib/protoparser/prometheus/parser.go b/lib/protoparser/prometheus/parser.go index 923737f12..58ef46f1f 100644 --- a/lib/protoparser/prometheus/parser.go +++ b/lib/protoparser/prometheus/parser.go @@ -57,12 +57,33 @@ func (rs *Rows) UnmarshalWithErrLogger(s string, errLogger func(s string)) { rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], noEscapes, errLogger) } +const tagsPrefix = '{' +const exemplarPreifx = '{' + +// Exemplar Item +type Exemplar struct { + // Tags: a list of labels that uniquely identifies exemplar + Tags []Tag + // Value: the value of the exemplar + Value float64 + // Timestamp: the time when exemplar was recorded + Timestamp int64 +} + +// Reset - resets the Exemplar object to defaults +func (e *Exemplar) Reset() { + e.Tags = nil + e.Value = 0 + e.Timestamp = 0 +} + // Row is a single Prometheus row. type Row struct { Metric string Tags []Tag Value float64 Timestamp int64 + Exemplar Exemplar } func (r *Row) reset() { @@ -70,6 +91,7 @@ func (r *Row) reset() { r.Tags = nil r.Value = 0 r.Timestamp = 0 + r.Exemplar = Exemplar{} } func skipTrailingComment(s string) string { @@ -110,69 +132,140 @@ func nextWhitespace(s string) int { return n1 } -func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { - r.reset() +func parseStringToTags(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) { + n := strings.IndexByte(s, tagsPrefix) + c := strings.IndexByte(s, '#') + if c != -1 && c < n { + return s, tagsPool, nil + } s = skipLeadingWhitespace(s) - n := strings.IndexByte(s, '{') if n >= 0 { // Tags found. Parse them. - r.Metric = skipTrailingWhitespace(s[:n]) s = s[n+1:] - tagsStart := len(tagsPool) var err error s, tagsPool, err = unmarshalTags(tagsPool, s, noEscapes) if err != nil { - return tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err) + return s, tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err) } if len(s) > 0 && s[0] == ' ' { // Fast path - skip whitespace. s = s[1:] } - tags := tagsPool[tagsStart:] - r.Tags = tags[:len(tags):len(tags)] - } else { - // Tags weren't found. Search for value after whitespace - n = nextWhitespace(s) - if n < 0 { - return tagsPool, fmt.Errorf("missing value") - } - r.Metric = s[:n] - s = s[n+1:] } - if len(r.Metric) == 0 { - return tagsPool, fmt.Errorf("metric cannot be empty") + return s, tagsPool, nil +} + +var tvtPool = sync.Pool{New: func() interface{} { + return &tagsValueTimestamp{} +}} + +func getTVT() *tagsValueTimestamp { + return tvtPool.Get().(*tagsValueTimestamp) +} +func putTVT(tvt *tagsValueTimestamp) { + tvt.reset() + tvtPool.Put(tvt) +} + +type tagsValueTimestamp struct { + Prefix string + Value float64 + Timestamp int64 + Comments string +} + +func (tvt *tagsValueTimestamp) reset() { + tvt.Prefix = "" + tvt.Value = 0 + tvt.Timestamp = 0 + tvt.Comments = "" +} +func parseTagsValueTimestamp(s string, tagsPool []Tag, noEscapes bool) ([]Tag, *tagsValueTimestamp, error) { + tvt := getTVT() + n := 0 + // Prefix is everything up to a tag start or a space + t := strings.IndexByte(s, tagsPrefix) + // If there is no tag start process rest of string + mustParseTags := false + if t != -1 { + // Check to see if there is a space before tag + n = nextWhitespace(s) + // If there is a space + if n > 0 { + if n < t { + tvt.Prefix = s[:n] + s = skipLeadingWhitespace(s[n:]) + // Cover the use case where there is whitespace between the prefix and the tag + if len(s) > 0 && s[0] == '{' { + mustParseTags = true + } + // Most likely this has an exemplar + } else { + tvt.Prefix = s[:t] + s = s[t:] + mustParseTags = true + } + } + if mustParseTags { + var err error + s, tagsPool, err = parseStringToTags(s, tagsPool, noEscapes) + if err != nil { + return tagsPool, tvt, err + } + } + } else { + // Tag doesn't exist + n = nextWhitespace(s) + if n != -1 { + tvt.Prefix = s[:n] + s = s[n:] + } else { + tvt.Prefix = s + return tagsPool, tvt, fmt.Errorf("missing value") + } } s = skipLeadingWhitespace(s) - s = skipTrailingComment(s) - if len(s) == 0 { - return tagsPool, fmt.Errorf("value cannot be empty") + // save and remove the comments + n = strings.IndexByte(s, '#') + if n >= 0 { + tvt.Comments = s[n:] + if len(tvt.Comments) > 1 { + tvt.Comments = s[n+1:] + } else { + tvt.Comments = "" + } + s = skipTrailingComment(s) + s = skipLeadingWhitespace(s) + s = skipTrailingWhitespace(s) } n = nextWhitespace(s) if n < 0 { // There is no timestamp. v, err := fastfloat.Parse(s) if err != nil { - return tagsPool, fmt.Errorf("cannot parse value %q: %w", s, err) + return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s, err) } - r.Value = v - return tagsPool, nil + tvt.Value = v + return tagsPool, tvt, nil } - // There is a timestamp. + // There is a timestamp + s = skipLeadingWhitespace(s) v, err := fastfloat.Parse(s[:n]) if err != nil { - return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err) + return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s[:n], err) } - r.Value = v - s = skipLeadingWhitespace(s[n+1:]) + tvt.Value = v + s = s[n:] + // There are some whitespaces after timestamp + s = skipLeadingWhitespace(s) if len(s) == 0 { // There is no timestamp - just a whitespace after the value. - return tagsPool, nil + return tagsPool, tvt, nil } - // There are some whitespaces after timestamp s = skipTrailingWhitespace(s) ts, err := fastfloat.Parse(s) if err != nil { - return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err) + return tagsPool, tvt, fmt.Errorf("cannot parse timestamp %q: %w", s, err) } if ts >= -1<<31 && ts < 1<<31 { // This looks like OpenMetrics timestamp in Unix seconds. @@ -181,7 +274,68 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) // See https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps ts *= 1000 } - r.Timestamp = int64(ts) + tvt.Timestamp = int64(ts) + return tagsPool, tvt, nil +} + +// Returns possible comments that could be exemplars +func (r *Row) unmarshalMetric(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) { + tagsStart := len(tagsPool) + var err error + var tvt *tagsValueTimestamp + tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes) + defer putTVT(tvt) + if err != nil { + return "", tagsPool, err + } + r.Metric = tvt.Prefix + tags := tagsPool[tagsStart:] + if len(tags) > 0 { + r.Tags = tags + } + r.Value = tvt.Value + r.Timestamp = tvt.Timestamp + return tvt.Comments, tagsPool, nil + +} +func (e *Exemplar) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { + // We can use the Comment parsed out to further parse the Exemplar + s = skipLeadingWhitespace(s) + // If we are a comment immediately followed by whitespace or a labelset + // then we are an exemplar + if len(s) != 0 && s[0] == exemplarPreifx { + var err error + var tvt *tagsValueTimestamp + tagsStart := len(tagsPool) + tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes) + defer putTVT(tvt) + if err != nil { + return tagsPool, err + } + tags := tagsPool[tagsStart:] + if len(tags) > 0 { + e.Tags = tags + } + e.Value = tvt.Value + e.Timestamp = tvt.Timestamp + return tagsPool, nil + } + return tagsPool, nil + +} +func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { + r.reset() + // Parse for labels, the value and the timestamp + // Anything before labels is saved in Prefix we can use this + // for the metric name + comments, tagsPool, err := r.unmarshalMetric(s, tagsPool, noEscapes) + if err != nil { + return nil, err + } + tagsPool, err = r.Exemplar.unmarshal(comments, tagsPool, noEscapes) + if err != nil { + return nil, err + } return tagsPool, nil } diff --git a/lib/protoparser/prometheus/parser_test.go b/lib/protoparser/prometheus/parser_test.go index 39116c937..ff95f25b1 100644 --- a/lib/protoparser/prometheus/parser_test.go +++ b/lib/protoparser/prometheus/parser_test.go @@ -362,12 +362,87 @@ cassandra_token_ownership_ratio 78.9`, &Rows{ Value: 56, }}, }) - - // Exemplars - see https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1 - f(`foo_bucket{le="10",a="#b"} 17 # {trace_id="oHg5SJ#YRHA0"} 9.8 1520879607.789 - abc 123 456 # foobar - foo 344#bar`, &Rows{ + // Support for Exemplars Open Metric Specification + // see: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars-1 + f(`foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789`, &Rows{ Rows: []Row{ + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "25", + }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + { + Key: "log_id", + Value: "test_id", + }, + }, + Timestamp: 1520879607789, + }, + }, + }}) + f(`foo_bucket{le="0.01"} 0 +foo_bucket{le="0.1"} 8 # {} 0.054 +foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67 +foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 +foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789 +foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 +foo_bucket{le="+Inf"} 17 +foo_count 17 +foo_sum 324789.3 +foo_created 1520430000.123`, &Rows{ + Rows: []Row{ + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "0.01", + }, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "0.1", + }, + }, + Value: 8, + Exemplar: Exemplar{ + Value: 0.054, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "1", + }, + }, + Value: 11, + Exemplar: Exemplar{ + Value: 0.67, + Tags: []Tag{ + { + Key: "trace_id", + Value: "KOO5S4vxi0o", + }, + }, + }, + }, { Metric: "foo_bucket", Tags: []Tag{ @@ -375,21 +450,84 @@ cassandra_token_ownership_ratio 78.9`, &Rows{ Key: "le", Value: "10", }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + }, + Timestamp: 1520879607789, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ { - Key: "a", - Value: "#b", + Key: "le", + Value: "25", + }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + { + Key: "log_id", + Value: "test_id", + }, + }, + Timestamp: 1520879607789, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "nospace", + Value: "exemplar", + }, + }, + Value: 17, + Exemplar: Exemplar{ + Value: 9.8, + Tags: []Tag{ + { + Key: "trace_id", + Value: "oHg5SJYRHA0", + }, + }, + Timestamp: 1520879607789, + }, + }, + { + Metric: "foo_bucket", + Tags: []Tag{ + { + Key: "le", + Value: "+Inf", }, }, Value: 17, }, { - Metric: "abc", - Value: 123, - Timestamp: 456000, + Metric: "foo_count", + Value: 17, }, { - Metric: "foo", - Value: 344, + Metric: "foo_sum", + Value: 324789.3, + }, + { + Metric: "foo_created", + Value: 1520430000.123, }, }, }) diff --git a/lib/protoparser/prometheus/parser_timing_test.go b/lib/protoparser/prometheus/parser_timing_test.go index 8d6a2827b..acabb7460 100644 --- a/lib/protoparser/prometheus/parser_timing_test.go +++ b/lib/protoparser/prometheus/parser_timing_test.go @@ -146,10 +146,10 @@ container_ulimits_soft{container="kube-scheduler",id="/kubelet/kubepods/burstabl } func BenchmarkRowsUnmarshal(b *testing.B) { - s := `cpu_usage{mode="user"} 1.23 -cpu_usage{mode="system"} 23.344 -cpu_usage{mode="iowait"} 3.3443 -cpu_usage{mode="irq"} 0.34432 + s := `foo_bucket{le="0.01"} 0 + foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67 + foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 + foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 ` b.SetBytes(int64(len(s))) b.ReportAllocs()