diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 3204eb279..efd760e6f 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -109,10 +109,9 @@ type writeRequest struct { wr prompbmarshal.WriteRequest - tss []prompbmarshal.TimeSeries - labels []prompbmarshal.Label - samples []prompbmarshal.Sample - exemplars []prompbmarshal.Exemplar + tss []prompbmarshal.TimeSeries + labels []prompbmarshal.Label + samples []prompbmarshal.Sample // buf holds labels data buf []byte @@ -130,7 +129,6 @@ func (wr *writeRequest) reset() { wr.labels = wr.labels[:0] wr.samples = wr.samples[:0] - wr.exemplars = wr.exemplars[:0] wr.buf = wr.buf[:0] } @@ -202,7 +200,6 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { labelsDst := wr.labels labelsLen := len(wr.labels) samplesDst := wr.samples - exemplarsDst := wr.exemplars buf := wr.buf for i := range src.Labels { labelsDst = append(labelsDst, prompbmarshal.Label{}) @@ -219,12 +216,8 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { samplesDst = append(samplesDst, src.Samples...) dst.Samples = samplesDst[len(samplesDst)-len(src.Samples):] - exemplarsDst = append(exemplarsDst, src.Exemplars...) - dst.Exemplars = exemplarsDst[len(exemplarsDst)-len(src.Exemplars):] - wr.samples = samplesDst wr.labels = labelsDst - wr.exemplars = exemplarsDst wr.buf = buf } @@ -236,6 +229,7 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block // Nothing to push return true } + marshalConcurrencyCh <- struct{}{} bb := writeRequestBufPool.Get() @@ -272,8 +266,6 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block if len(wr.Timeseries) == 1 { // A single time series left. Recursively split its samples into smaller parts if possible. samples := wr.Timeseries[0].Samples - exemplars := wr.Timeseries[0].Exemplars - if len(samples) == 1 { logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N) return true @@ -285,16 +277,11 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block return false } wr.Timeseries[0].Samples = samples[n:] - // We do not want to send exemplars twice - wr.Timeseries[0].Exemplars = nil - if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) { wr.Timeseries[0].Samples = samples - wr.Timeseries[0].Exemplars = exemplars return false } wr.Timeseries[0].Samples = samples - wr.Timeseries[0].Exemplars = exemplars return true } timeseries := wr.Timeseries diff --git a/app/vmagent/remotewrite/pendingseries_test.go b/app/vmagent/remotewrite/pendingseries_test.go index 6e949a94f..67c1f7043 100644 --- a/app/vmagent/remotewrite/pendingseries_test.go +++ b/app/vmagent/remotewrite/pendingseries_test.go @@ -10,8 +10,8 @@ import ( func TestPushWriteRequest(t *testing.T) { rowsCounts := []int{1, 10, 100, 1e3, 1e4} - expectedBlockLensProm := []int{248, 1952, 17433, 180381, 1861994} - expectedBlockLensVM := []int{170, 575, 4748, 44936, 367096} + expectedBlockLensProm := []int{216, 1848, 16424, 169882, 1757876} + expectedBlockLensVM := []int{138, 492, 3927, 34995, 288476} for i, rowsCount := range rowsCounts { expectedBlockLenProm := expectedBlockLensProm[i] expectedBlockLenVM := expectedBlockLensVM[i] @@ -59,20 +59,6 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque Value: fmt.Sprintf("value_%d_%d", i, j), }) } - exemplar := prompbmarshal.Exemplar{ - Labels: []prompbmarshal.Label{ - { - Name: "trace_id", - Value: "123456", - }, - { - Name: "log_id", - Value: "987654", - }, - }, - Value: float64(i), - Timestamp: 1000 * int64(i), - } wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{ Labels: labels, Samples: []prompbmarshal.Sample{ @@ -81,10 +67,6 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque Timestamp: 1000 * int64(i), }, }, - - Exemplars: []prompbmarshal.Exemplar{ - exemplar, - }, }) } return &wr diff --git a/lib/prompb/prompb.go b/lib/prompb/prompb.go index adc8935e2..34a1c5716 100644 --- a/lib/prompb/prompb.go +++ b/lib/prompb/prompb.go @@ -9,11 +9,10 @@ import ( // WriteRequest represents Prometheus remote write API request. type WriteRequest struct { // Timeseries is a list of time series in the given WriteRequest - Timeseries []TimeSeries - labelsPool []Label - exemplarLabelsPool []Label - samplesPool []Sample - exemplarsPool []Exemplar + Timeseries []TimeSeries + + labelsPool []Label + samplesPool []Sample } // Reset resets wr for subsequent re-use. @@ -30,33 +29,11 @@ func (wr *WriteRequest) Reset() { } wr.labelsPool = labelsPool[:0] - exemplarLabelsPool := wr.exemplarLabelsPool - for i := range exemplarLabelsPool { - exemplarLabelsPool[i] = Label{} - } - wr.labelsPool = labelsPool[:0] samplesPool := wr.samplesPool for i := range samplesPool { samplesPool[i] = Sample{} } wr.samplesPool = samplesPool[:0] - exemplarsPool := wr.exemplarsPool - for i := range exemplarsPool { - exemplarsPool[i] = Exemplar{} - } - wr.exemplarsPool = exemplarsPool[:0] -} - -// Exemplar is an exemplar -type Exemplar struct { - // Labels a list of labels that uniquely identifies exemplar - // Optional, can be empty. - Labels []Label - // Value: the value of the exemplar - Value float64 - // timestamp is in ms format, see model/timestamp/timestamp.go for - // conversion from time.Time to Prometheus timestamp. - Timestamp int64 } // TimeSeries is a timeseries. @@ -65,8 +42,7 @@ type TimeSeries struct { Labels []Label // Samples is a list of samples for the given TimeSeries - Samples []Sample - Exemplars []Exemplar + Samples []Sample } // Sample is a timeseries sample. @@ -98,10 +74,7 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { // } tss := wr.Timeseries labelsPool := wr.labelsPool - exemplarLabelsPool := wr.exemplarLabelsPool samplesPool := wr.samplesPool - exemplarsPool := wr.exemplarsPool - var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) @@ -120,7 +93,7 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { tss = append(tss, TimeSeries{}) } ts := &tss[len(tss)-1] - labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, err = ts.unmarshalProtobuf(data, labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool) + labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool) if err != nil { return fmt.Errorf("cannot unmarshal timeseries: %w", err) } @@ -129,31 +102,28 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { wr.Timeseries = tss wr.labelsPool = labelsPool wr.samplesPool = samplesPool - wr.exemplarsPool = exemplarsPool return nil } -func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplarLabelsPool []Label, samplesPool []Sample, exemplarsPool []Exemplar) ([]Label, []Label, []Sample, []Exemplar, error) { +func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) { // message TimeSeries { // repeated Label labels = 1; // repeated Sample samples = 2; - // repeated Exemplar exemplars = 3 // } labelsPoolLen := len(labelsPool) samplesPoolLen := len(samplesPool) - exemplarsPoolLen := len(exemplarsPool) var fc easyproto.FieldContext for len(src) > 0 { var err error src, err = fc.NextField(src) if err != nil { - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the next field: %w", err) + return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err) } switch fc.FieldNum { case 1: data, ok := fc.MessageData() if !ok { - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read label data") + return labelsPool, samplesPool, fmt.Errorf("cannot read label data") } if len(labelsPool) < cap(labelsPool) { labelsPool = labelsPool[:len(labelsPool)+1] @@ -162,12 +132,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplar } label := &labelsPool[len(labelsPool)-1] if err := label.unmarshalProtobuf(data); err != nil { - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal label: %w", err) + return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err) } case 2: data, ok := fc.MessageData() if !ok { - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the sample data") + return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data") } if len(samplesPool) < cap(samplesPool) { samplesPool = samplesPool[:len(samplesPool)+1] @@ -176,78 +146,15 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplar } sample := &samplesPool[len(samplesPool)-1] if err := sample.unmarshalProtobuf(data); err != nil { - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal sample: %w", err) - } - case 3: - data, ok := fc.MessageData() - if !ok { - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the exemplar data") - } - if len(exemplarsPool) < cap(exemplarsPool) { - exemplarsPool = exemplarsPool[:len(exemplarsPool)+1] - } else { - exemplarsPool = append(exemplarsPool, Exemplar{}) - } - exemplar := &exemplarsPool[len(exemplarsPool)-1] - if exemplarLabelsPool, err = exemplar.unmarshalProtobuf(data, exemplarLabelsPool); err != nil { - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal exemplar: %w", err) + return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err) } } } ts.Labels = labelsPool[labelsPoolLen:] ts.Samples = samplesPool[samplesPoolLen:] - ts.Exemplars = exemplarsPool[exemplarsPoolLen:] - return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, nil + return labelsPool, samplesPool, nil } -func (exemplar *Exemplar) unmarshalProtobuf(src []byte, labelsPool []Label) ([]Label, error) { - // message Exemplar { - // repeated Label Labels = 1; - // float64 Value = 2; - // int64 Timestamp = 3; - // } - var fc easyproto.FieldContext - - labelsPoolLen := len(labelsPool) - - for len(src) > 0 { - var err error - src, err = fc.NextField(src) - if err != nil { - return labelsPool, fmt.Errorf("cannot read the next field: %w", err) - } - switch fc.FieldNum { - case 1: - data, ok := fc.MessageData() - if !ok { - return labelsPool, fmt.Errorf("cannot read label data") - } - if len(labelsPool) < cap(labelsPool) { - labelsPool = labelsPool[:len(labelsPool)+1] - } else { - labelsPool = append(labelsPool, Label{}) - } - label := &labelsPool[len(labelsPool)-1] - if err := label.unmarshalProtobuf(data); err != nil { - return labelsPool, fmt.Errorf("cannot unmarshal label: %w", err) - } - case 2: - value, ok := fc.Double() - if !ok { - return labelsPool, fmt.Errorf("cannot read exemplar value") - } - exemplar.Value = value - case 3: - timestamp, ok := fc.Int64() - if !ok { - return labelsPool, fmt.Errorf("cannot read exemplar timestamp") - } - exemplar.Timestamp = timestamp - } - } - exemplar.Labels = labelsPool[labelsPoolLen:] - return labelsPool, nil -} func (lbl *Label) unmarshalProtobuf(src []byte) (err error) { // message Label { // string name = 1; diff --git a/lib/prompb/prompb_test.go b/lib/prompb/prompb_test.go index b1d6e02bc..727101206 100644 --- a/lib/prompb/prompb_test.go +++ b/lib/prompb/prompb_test.go @@ -36,25 +36,9 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: sample.Timestamp, }) } - var exemplars []prompbmarshal.Exemplar - for _, exemplar := range ts.Exemplars { - exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels)) - for i, label := range exemplar.Labels { - exemplarLabels[i] = prompbmarshal.Label{ - Name: label.Name, - Value: label.Value, - } - } - exemplars = append(exemplars, prompbmarshal.Exemplar{ - Labels: exemplarLabels, - Value: exemplar.Value, - Timestamp: exemplar.Timestamp, - }) - } wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples, - Exemplars: exemplars, + Labels: labels, + Samples: samples, }) } dataResult := wrm.MarshalProtobuf(nil) @@ -137,19 +121,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, - Exemplars: []prompbmarshal.Exemplar{ - { - Labels: []prompbmarshal.Label{ - {Name: "trace-id", - Value: "123456", - }, - {Name: "log_id", - Value: "987664"}, - }, - Value: 12345.6, - Timestamp: 456, - }, - }, }, } data = wrm.MarshalProtobuf(data[:0]) @@ -182,18 +153,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, - Exemplars: []prompbmarshal.Exemplar{ - { - Labels: []prompbmarshal.Label{ - { - Name: "trace-id", - Value: "123456", - }, - }, - Value: 12345.6, - Timestamp: 456, - }, - }, }, { Labels: []prompbmarshal.Label{ @@ -207,22 +166,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Value: 9873, }, }, - Exemplars: []prompbmarshal.Exemplar{ - { - Labels: []prompbmarshal.Label{ - { - Name: "trace-id", - Value: "123456", - }, - { - Name: "log_id", - Value: "987654", - }, - }, - Value: 12345.6, - Timestamp: 456, - }, - }, }, } data = wrm.MarshalProtobuf(data[:0]) diff --git a/lib/prompbmarshal/prompbmarshal_test.go b/lib/prompbmarshal/prompbmarshal_test.go index 42716fdb2..99fb28c10 100644 --- a/lib/prompbmarshal/prompbmarshal_test.go +++ b/lib/prompbmarshal/prompbmarshal_test.go @@ -36,22 +36,6 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) { Timestamp: 18939432423, }, }, - Exemplars: []prompbmarshal.Exemplar{ - { - Labels: []prompbmarshal.Label{ - { - Name: "trace-id", - Value: "123456", - }, - { - Name: "log_id", - Value: "987654", - }, - }, - Value: 12345.6, - Timestamp: 456, - }, - }, }, }, } @@ -80,25 +64,9 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) { Timestamp: sample.Timestamp, }) } - var exemplars []prompbmarshal.Exemplar - for _, exemplar := range ts.Exemplars { - exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels)) - for i, label := range exemplar.Labels { - exemplarLabels[i] = prompbmarshal.Label{ - Name: label.Name, - Value: label.Value, - } - } - exemplars = append(exemplars, prompbmarshal.Exemplar{ - Labels: exemplarLabels, - Value: exemplar.Value, - Timestamp: exemplar.Timestamp, - }) - } wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ - Labels: labels, - Samples: samples, - Exemplars: exemplars, + Labels: labels, + Samples: samples, }) } dataResult := wrm.MarshalProtobuf(nil) diff --git a/lib/prompbmarshal/types.pb.go b/lib/prompbmarshal/types.pb.go index d5295ddd0..ca2b62f1b 100644 --- a/lib/prompbmarshal/types.pb.go +++ b/lib/prompbmarshal/types.pb.go @@ -13,70 +13,10 @@ type Sample struct { Timestamp int64 } -type Exemplar struct { - // Optional, can be empty. - Labels []Label - Value float64 - // timestamp is in ms format, see model/timestamp/timestamp.go for - // conversion from time.Time to Prometheus timestamp. - Timestamp int64 -} - -func (m *Exemplar) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - if m.Timestamp != 0 { - i = encodeVarint(dAtA, i, uint64(m.Timestamp)) - i-- - dAtA[i] = 0x18 - } - if m.Value != 0 { - i -= 8 - binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) - i-- - dAtA[i] = 0x11 - } - if len(m.Labels) > 0 { - for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarint(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } - } - return len(dAtA) - i, nil -} -func (m *Exemplar) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.Labels) > 0 { - for _, e := range m.Labels { - l = e.Size() - n += 1 + l + sov(uint64(l)) - } - } - if m.Value != 0 { - n += 9 - } - if m.Timestamp != 0 { - n += 1 + sov(uint64(m.Timestamp)) - } - return n -} - // TimeSeries represents samples and labels for a single time series. type TimeSeries struct { - Labels []Label - Samples []Sample - Exemplars []Exemplar + Labels []Label + Samples []Sample } type Label struct { @@ -102,16 +42,6 @@ func (m *Sample) MarshalToSizedBuffer(dst []byte) (int, error) { func (m *TimeSeries) MarshalToSizedBuffer(dst []byte) (int, error) { i := len(dst) - for j := len(m.Exemplars) - 1; j >= 0; j-- { - size, err := m.Exemplars[j].MarshalToSizedBuffer(dst[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarint(dst, i, uint64(size)) - i-- - dst[i] = 0x1a - } for j := len(m.Samples) - 1; j >= 0; j-- { size, err := m.Samples[j].MarshalToSizedBuffer(dst[:i]) if err != nil { @@ -179,10 +109,6 @@ func (m *TimeSeries) Size() (n int) { l := e.Size() n += 1 + l + sov(uint64(l)) } - for _, e := range m.Exemplars { - l := e.Size() - n += 1 + l + sov(uint64(l)) - } return n } diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index f26fbc6ba..239f2da7e 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -29,7 +29,6 @@ var ( streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+ "for reducing memory usage when millions of metrics are exposed per each scrape target. "+ "It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control") - scrapeExemplars = flag.Bool("promscrape.scrapeExemplars", false, "Whether to enable scraping of exemplars from scrape targets.") ) type client struct { @@ -109,12 +108,6 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") - // We set to support exemplars to be compatible with Prometheus Exposition format which uses - // Open Metrics Specification - // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#openmetrics-text-format - if *scrapeExemplars { - req.Header.Set("Accept", "application/openmetrics-text") - } // Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 3897ea376..24e882bc5 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -675,7 +675,6 @@ type writeRequestCtx struct { writeRequest prompbmarshal.WriteRequest labels []prompbmarshal.Label samples []prompbmarshal.Sample - exemplars []prompbmarshal.Exemplar } func (wc *writeRequestCtx) reset() { @@ -690,7 +689,6 @@ func (wc *writeRequestCtx) resetNoRows() { wc.labels = wc.labels[:0] wc.samples = wc.samples[:0] - wc.exemplars = wc.exemplars[:0] } var writeRequestCtxPool leveledWriteRequestCtxPool @@ -911,27 +909,10 @@ func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, tim Value: r.Value, Timestamp: sampleTimestamp, }) - // Add Exemplars to Timeseries - exemplarsLen := len(wc.exemplars) - exemplarTagsLen := len(r.Exemplar.Tags) - if exemplarTagsLen > 0 { - exemplarLabels := make([]prompbmarshal.Label, exemplarTagsLen) - for i, label := range r.Exemplar.Tags { - exemplarLabels[i].Name = label.Key - exemplarLabels[i].Value = label.Value - } - wc.exemplars = append(wc.exemplars, prompbmarshal.Exemplar{ - Labels: exemplarLabels, - Value: r.Exemplar.Value, - Timestamp: r.Exemplar.Timestamp, - }) - - } wr := &wc.writeRequest wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{ - Labels: wc.labels[labelsLen:], - Samples: wc.samples[len(wc.samples)-1:], - Exemplars: wc.exemplars[exemplarsLen:], + Labels: wc.labels[labelsLen:], + Samples: wc.samples[len(wc.samples)-1:], }) } diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 1435ba2aa..52084f8ae 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -724,11 +724,6 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) { HonorLabels: true, }, `metric{a="e",foo="bar"} 0 123`) - f(`metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`, - &ScrapeWork{ - HonorLabels: true, - }, - `metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`) } func TestSendStaleSeries(t *testing.T) { @@ -786,8 +781,6 @@ func parseData(data string) []prompbmarshal.TimeSeries { } rows.UnmarshalWithErrLogger(data, errLogger) var tss []prompbmarshal.TimeSeries - var exemplars []prompbmarshal.Exemplar - for _, r := range rows.Rows { labels := []prompbmarshal.Label{ { @@ -801,21 +794,6 @@ func parseData(data string) []prompbmarshal.TimeSeries { Value: tag.Value, }) } - exemplarLabels := []prompbmarshal.Label{} - if len(r.Exemplar.Tags) > 0 { - for _, tag := range r.Exemplar.Tags { - exemplarLabels = append(exemplarLabels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, - }) - } - exemplars = append(exemplars, prompbmarshal.Exemplar{ - Labels: exemplarLabels, - Value: r.Exemplar.Value, - Timestamp: r.Exemplar.Timestamp, - }) - } - var ts prompbmarshal.TimeSeries ts.Labels = labels ts.Samples = []prompbmarshal.Sample{ @@ -824,7 +802,6 @@ func parseData(data string) []prompbmarshal.TimeSeries { Timestamp: r.Timestamp, }, } - ts.Exemplars = exemplars tss = append(tss, ts) } return tss @@ -889,19 +866,6 @@ func timeseriesToString(ts *prompbmarshal.TimeSeries) string { } s := ts.Samples[0] fmt.Fprintf(&sb, "%g %d", s.Value, s.Timestamp) - // Add Exemplars to the end of string - for j, exemplar := range ts.Exemplars { - for i, label := range exemplar.Labels { - fmt.Fprintf(&sb, "%s=%q", label.Name, label.Value) - if i+1 < len(ts.Labels) { - fmt.Fprintf(&sb, ",") - } - } - fmt.Fprintf(&sb, "%g %d", exemplar.Value, exemplar.Timestamp) - if j+1 < len(ts.Exemplars) { - fmt.Fprintf(&sb, ",") - } - } return sb.String() } diff --git a/lib/protoparser/prometheus/parser.go b/lib/protoparser/prometheus/parser.go index 58ef46f1f..923737f12 100644 --- a/lib/protoparser/prometheus/parser.go +++ b/lib/protoparser/prometheus/parser.go @@ -57,33 +57,12 @@ func (rs *Rows) UnmarshalWithErrLogger(s string, errLogger func(s string)) { rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], noEscapes, errLogger) } -const tagsPrefix = '{' -const exemplarPreifx = '{' - -// Exemplar Item -type Exemplar struct { - // Tags: a list of labels that uniquely identifies exemplar - Tags []Tag - // Value: the value of the exemplar - Value float64 - // Timestamp: the time when exemplar was recorded - Timestamp int64 -} - -// Reset - resets the Exemplar object to defaults -func (e *Exemplar) Reset() { - e.Tags = nil - e.Value = 0 - e.Timestamp = 0 -} - // Row is a single Prometheus row. type Row struct { Metric string Tags []Tag Value float64 Timestamp int64 - Exemplar Exemplar } func (r *Row) reset() { @@ -91,7 +70,6 @@ func (r *Row) reset() { r.Tags = nil r.Value = 0 r.Timestamp = 0 - r.Exemplar = Exemplar{} } func skipTrailingComment(s string) string { @@ -132,140 +110,69 @@ func nextWhitespace(s string) int { return n1 } -func parseStringToTags(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) { - n := strings.IndexByte(s, tagsPrefix) - c := strings.IndexByte(s, '#') - if c != -1 && c < n { - return s, tagsPool, nil - } +func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { + r.reset() s = skipLeadingWhitespace(s) + n := strings.IndexByte(s, '{') if n >= 0 { // Tags found. Parse them. + r.Metric = skipTrailingWhitespace(s[:n]) s = s[n+1:] + tagsStart := len(tagsPool) var err error s, tagsPool, err = unmarshalTags(tagsPool, s, noEscapes) if err != nil { - return s, tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err) + return tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err) } if len(s) > 0 && s[0] == ' ' { // Fast path - skip whitespace. s = s[1:] } - } - return s, tagsPool, nil -} - -var tvtPool = sync.Pool{New: func() interface{} { - return &tagsValueTimestamp{} -}} - -func getTVT() *tagsValueTimestamp { - return tvtPool.Get().(*tagsValueTimestamp) -} -func putTVT(tvt *tagsValueTimestamp) { - tvt.reset() - tvtPool.Put(tvt) -} - -type tagsValueTimestamp struct { - Prefix string - Value float64 - Timestamp int64 - Comments string -} - -func (tvt *tagsValueTimestamp) reset() { - tvt.Prefix = "" - tvt.Value = 0 - tvt.Timestamp = 0 - tvt.Comments = "" -} -func parseTagsValueTimestamp(s string, tagsPool []Tag, noEscapes bool) ([]Tag, *tagsValueTimestamp, error) { - tvt := getTVT() - n := 0 - // Prefix is everything up to a tag start or a space - t := strings.IndexByte(s, tagsPrefix) - // If there is no tag start process rest of string - mustParseTags := false - if t != -1 { - // Check to see if there is a space before tag - n = nextWhitespace(s) - // If there is a space - if n > 0 { - if n < t { - tvt.Prefix = s[:n] - s = skipLeadingWhitespace(s[n:]) - // Cover the use case where there is whitespace between the prefix and the tag - if len(s) > 0 && s[0] == '{' { - mustParseTags = true - } - // Most likely this has an exemplar - } else { - tvt.Prefix = s[:t] - s = s[t:] - mustParseTags = true - } - } - if mustParseTags { - var err error - s, tagsPool, err = parseStringToTags(s, tagsPool, noEscapes) - if err != nil { - return tagsPool, tvt, err - } - } + tags := tagsPool[tagsStart:] + r.Tags = tags[:len(tags):len(tags)] } else { - // Tag doesn't exist + // Tags weren't found. Search for value after whitespace n = nextWhitespace(s) - if n != -1 { - tvt.Prefix = s[:n] - s = s[n:] - } else { - tvt.Prefix = s - return tagsPool, tvt, fmt.Errorf("missing value") + if n < 0 { + return tagsPool, fmt.Errorf("missing value") } + r.Metric = s[:n] + s = s[n+1:] + } + if len(r.Metric) == 0 { + return tagsPool, fmt.Errorf("metric cannot be empty") } s = skipLeadingWhitespace(s) - // save and remove the comments - n = strings.IndexByte(s, '#') - if n >= 0 { - tvt.Comments = s[n:] - if len(tvt.Comments) > 1 { - tvt.Comments = s[n+1:] - } else { - tvt.Comments = "" - } - s = skipTrailingComment(s) - s = skipLeadingWhitespace(s) - s = skipTrailingWhitespace(s) + s = skipTrailingComment(s) + if len(s) == 0 { + return tagsPool, fmt.Errorf("value cannot be empty") } n = nextWhitespace(s) if n < 0 { // There is no timestamp. v, err := fastfloat.Parse(s) if err != nil { - return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s, err) + return tagsPool, fmt.Errorf("cannot parse value %q: %w", s, err) } - tvt.Value = v - return tagsPool, tvt, nil + r.Value = v + return tagsPool, nil } - // There is a timestamp - s = skipLeadingWhitespace(s) + // There is a timestamp. v, err := fastfloat.Parse(s[:n]) if err != nil { - return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s[:n], err) + return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err) } - tvt.Value = v - s = s[n:] - // There are some whitespaces after timestamp - s = skipLeadingWhitespace(s) + r.Value = v + s = skipLeadingWhitespace(s[n+1:]) if len(s) == 0 { // There is no timestamp - just a whitespace after the value. - return tagsPool, tvt, nil + return tagsPool, nil } + // There are some whitespaces after timestamp s = skipTrailingWhitespace(s) ts, err := fastfloat.Parse(s) if err != nil { - return tagsPool, tvt, fmt.Errorf("cannot parse timestamp %q: %w", s, err) + return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err) } if ts >= -1<<31 && ts < 1<<31 { // This looks like OpenMetrics timestamp in Unix seconds. @@ -274,68 +181,7 @@ func parseTagsValueTimestamp(s string, tagsPool []Tag, noEscapes bool) ([]Tag, * // See https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps ts *= 1000 } - tvt.Timestamp = int64(ts) - return tagsPool, tvt, nil -} - -// Returns possible comments that could be exemplars -func (r *Row) unmarshalMetric(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) { - tagsStart := len(tagsPool) - var err error - var tvt *tagsValueTimestamp - tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes) - defer putTVT(tvt) - if err != nil { - return "", tagsPool, err - } - r.Metric = tvt.Prefix - tags := tagsPool[tagsStart:] - if len(tags) > 0 { - r.Tags = tags - } - r.Value = tvt.Value - r.Timestamp = tvt.Timestamp - return tvt.Comments, tagsPool, nil - -} -func (e *Exemplar) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { - // We can use the Comment parsed out to further parse the Exemplar - s = skipLeadingWhitespace(s) - // If we are a comment immediately followed by whitespace or a labelset - // then we are an exemplar - if len(s) != 0 && s[0] == exemplarPreifx { - var err error - var tvt *tagsValueTimestamp - tagsStart := len(tagsPool) - tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes) - defer putTVT(tvt) - if err != nil { - return tagsPool, err - } - tags := tagsPool[tagsStart:] - if len(tags) > 0 { - e.Tags = tags - } - e.Value = tvt.Value - e.Timestamp = tvt.Timestamp - return tagsPool, nil - } - return tagsPool, nil - -} -func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { - r.reset() - // Parse for labels, the value and the timestamp - // Anything before labels is saved in Prefix we can use this - // for the metric name - comments, tagsPool, err := r.unmarshalMetric(s, tagsPool, noEscapes) - if err != nil { - return nil, err - } - tagsPool, err = r.Exemplar.unmarshal(comments, tagsPool, noEscapes) - if err != nil { - return nil, err - } + r.Timestamp = int64(ts) return tagsPool, nil } diff --git a/lib/protoparser/prometheus/parser_test.go b/lib/protoparser/prometheus/parser_test.go index ff95f25b1..39116c937 100644 --- a/lib/protoparser/prometheus/parser_test.go +++ b/lib/protoparser/prometheus/parser_test.go @@ -362,87 +362,12 @@ cassandra_token_ownership_ratio 78.9`, &Rows{ Value: 56, }}, }) - // Support for Exemplars Open Metric Specification - // see: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars-1 - f(`foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789`, &Rows{ + + // Exemplars - see https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1 + f(`foo_bucket{le="10",a="#b"} 17 # {trace_id="oHg5SJ#YRHA0"} 9.8 1520879607.789 + abc 123 456 # foobar + foo 344#bar`, &Rows{ Rows: []Row{ - { - Metric: "foo_bucket", - Tags: []Tag{ - { - Key: "le", - Value: "25", - }, - }, - Value: 17, - Exemplar: Exemplar{ - Value: 9.8, - Tags: []Tag{ - { - Key: "trace_id", - Value: "oHg5SJYRHA0", - }, - { - Key: "log_id", - Value: "test_id", - }, - }, - Timestamp: 1520879607789, - }, - }, - }}) - f(`foo_bucket{le="0.01"} 0 -foo_bucket{le="0.1"} 8 # {} 0.054 -foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67 -foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 -foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789 -foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 -foo_bucket{le="+Inf"} 17 -foo_count 17 -foo_sum 324789.3 -foo_created 1520430000.123`, &Rows{ - Rows: []Row{ - { - Metric: "foo_bucket", - Tags: []Tag{ - { - Key: "le", - Value: "0.01", - }, - }, - }, - { - Metric: "foo_bucket", - Tags: []Tag{ - { - Key: "le", - Value: "0.1", - }, - }, - Value: 8, - Exemplar: Exemplar{ - Value: 0.054, - }, - }, - { - Metric: "foo_bucket", - Tags: []Tag{ - { - Key: "le", - Value: "1", - }, - }, - Value: 11, - Exemplar: Exemplar{ - Value: 0.67, - Tags: []Tag{ - { - Key: "trace_id", - Value: "KOO5S4vxi0o", - }, - }, - }, - }, { Metric: "foo_bucket", Tags: []Tag{ @@ -450,84 +375,21 @@ foo_created 1520430000.123`, &Rows{ Key: "le", Value: "10", }, - }, - Value: 17, - Exemplar: Exemplar{ - Value: 9.8, - Tags: []Tag{ - { - Key: "trace_id", - Value: "oHg5SJYRHA0", - }, - }, - Timestamp: 1520879607789, - }, - }, - { - Metric: "foo_bucket", - Tags: []Tag{ { - Key: "le", - Value: "25", - }, - }, - Value: 17, - Exemplar: Exemplar{ - Value: 9.8, - Tags: []Tag{ - { - Key: "trace_id", - Value: "oHg5SJYRHA0", - }, - { - Key: "log_id", - Value: "test_id", - }, - }, - Timestamp: 1520879607789, - }, - }, - { - Metric: "foo_bucket", - Tags: []Tag{ - { - Key: "nospace", - Value: "exemplar", - }, - }, - Value: 17, - Exemplar: Exemplar{ - Value: 9.8, - Tags: []Tag{ - { - Key: "trace_id", - Value: "oHg5SJYRHA0", - }, - }, - Timestamp: 1520879607789, - }, - }, - { - Metric: "foo_bucket", - Tags: []Tag{ - { - Key: "le", - Value: "+Inf", + Key: "a", + Value: "#b", }, }, Value: 17, }, { - Metric: "foo_count", - Value: 17, + Metric: "abc", + Value: 123, + Timestamp: 456000, }, { - Metric: "foo_sum", - Value: 324789.3, - }, - { - Metric: "foo_created", - Value: 1520430000.123, + Metric: "foo", + Value: 344, }, }, }) diff --git a/lib/protoparser/prometheus/parser_timing_test.go b/lib/protoparser/prometheus/parser_timing_test.go index acabb7460..8d6a2827b 100644 --- a/lib/protoparser/prometheus/parser_timing_test.go +++ b/lib/protoparser/prometheus/parser_timing_test.go @@ -146,10 +146,10 @@ container_ulimits_soft{container="kube-scheduler",id="/kubelet/kubepods/burstabl } func BenchmarkRowsUnmarshal(b *testing.B) { - s := `foo_bucket{le="0.01"} 0 - foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67 - foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 - foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789 + s := `cpu_usage{mode="user"} 1.23 +cpu_usage{mode="system"} 23.344 +cpu_usage{mode="iowait"} 3.3443 +cpu_usage{mode="irq"} 0.34432 ` b.SetBytes(int64(len(s))) b.ReportAllocs()