Exemplar support (#5982)

This code adds Exemplars to VMagent and the promscrape parser adhering
to OpenMetrics Specifications. This will allow forwarding of exemplars
to Prometheus and other third party apps that support OpenMetrics specs.

---------

Signed-off-by: Ted Possible <ted_possible@cable.comcast.com>
(cherry picked from commit 5a3abfa041)
This commit is contained in:
Ted Possible 2024-05-07 03:09:44 -07:00 committed by hagen1778
parent 33db552a39
commit 0206a01d03
No known key found for this signature in database
GPG key ID: 3BF75F3741CA9640
12 changed files with 716 additions and 75 deletions

View file

@ -109,9 +109,10 @@ type writeRequest struct {
wr prompbmarshal.WriteRequest
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
exemplars []prompbmarshal.Exemplar
// buf holds labels data
buf []byte
@ -129,6 +130,7 @@ func (wr *writeRequest) reset() {
wr.labels = wr.labels[:0]
wr.samples = wr.samples[:0]
wr.exemplars = wr.exemplars[:0]
wr.buf = wr.buf[:0]
}
@ -200,6 +202,7 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
labelsDst := wr.labels
labelsLen := len(wr.labels)
samplesDst := wr.samples
exemplarsDst := wr.exemplars
buf := wr.buf
for i := range src.Labels {
labelsDst = append(labelsDst, prompbmarshal.Label{})
@ -216,8 +219,12 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
samplesDst = append(samplesDst, src.Samples...)
dst.Samples = samplesDst[len(samplesDst)-len(src.Samples):]
exemplarsDst = append(exemplarsDst, src.Exemplars...)
dst.Exemplars = exemplarsDst[len(exemplarsDst)-len(src.Exemplars):]
wr.samples = samplesDst
wr.labels = labelsDst
wr.exemplars = exemplarsDst
wr.buf = buf
}
@ -229,7 +236,6 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block
// Nothing to push
return true
}
marshalConcurrencyCh <- struct{}{}
bb := writeRequestBufPool.Get()
@ -266,6 +272,8 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block
if len(wr.Timeseries) == 1 {
// A single time series left. Recursively split its samples into smaller parts if possible.
samples := wr.Timeseries[0].Samples
exemplars := wr.Timeseries[0].Exemplars
if len(samples) == 1 {
logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N)
return true
@ -277,11 +285,16 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block
return false
}
wr.Timeseries[0].Samples = samples[n:]
// We do not want to send exemplars twice
wr.Timeseries[0].Exemplars = nil
if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) {
wr.Timeseries[0].Samples = samples
wr.Timeseries[0].Exemplars = exemplars
return false
}
wr.Timeseries[0].Samples = samples
wr.Timeseries[0].Exemplars = exemplars
return true
}
timeseries := wr.Timeseries

View file

@ -10,8 +10,8 @@ import (
func TestPushWriteRequest(t *testing.T) {
rowsCounts := []int{1, 10, 100, 1e3, 1e4}
expectedBlockLensProm := []int{216, 1848, 16424, 169882, 1757876}
expectedBlockLensVM := []int{138, 492, 3927, 34995, 288476}
expectedBlockLensProm := []int{248, 1952, 17433, 180381, 1861994}
expectedBlockLensVM := []int{170, 575, 4748, 44936, 367096}
for i, rowsCount := range rowsCounts {
expectedBlockLenProm := expectedBlockLensProm[i]
expectedBlockLenVM := expectedBlockLensVM[i]
@ -59,6 +59,20 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque
Value: fmt.Sprintf("value_%d_%d", i, j),
})
}
exemplar := prompbmarshal.Exemplar{
Labels: []prompbmarshal.Label{
{
Name: "trace_id",
Value: "123456",
},
{
Name: "log_id",
Value: "987654",
},
},
Value: float64(i),
Timestamp: 1000 * int64(i),
}
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{
Labels: labels,
Samples: []prompbmarshal.Sample{
@ -67,6 +81,10 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque
Timestamp: 1000 * int64(i),
},
},
Exemplars: []prompbmarshal.Exemplar{
exemplar,
},
})
}
return &wr

View file

@ -9,10 +9,11 @@ import (
// WriteRequest represents Prometheus remote write API request.
type WriteRequest struct {
// Timeseries is a list of time series in the given WriteRequest
Timeseries []TimeSeries
labelsPool []Label
samplesPool []Sample
Timeseries []TimeSeries
labelsPool []Label
exemplarLabelsPool []Label
samplesPool []Sample
exemplarsPool []Exemplar
}
// Reset resets wr for subsequent re-use.
@ -29,11 +30,33 @@ func (wr *WriteRequest) Reset() {
}
wr.labelsPool = labelsPool[:0]
exemplarLabelsPool := wr.exemplarLabelsPool
for i := range exemplarLabelsPool {
exemplarLabelsPool[i] = Label{}
}
wr.labelsPool = labelsPool[:0]
samplesPool := wr.samplesPool
for i := range samplesPool {
samplesPool[i] = Sample{}
}
wr.samplesPool = samplesPool[:0]
exemplarsPool := wr.exemplarsPool
for i := range exemplarsPool {
exemplarsPool[i] = Exemplar{}
}
wr.exemplarsPool = exemplarsPool[:0]
}
// Exemplar is an exemplar
type Exemplar struct {
// Labels a list of labels that uniquely identifies exemplar
// Optional, can be empty.
Labels []Label
// Value: the value of the exemplar
Value float64
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
Timestamp int64
}
// TimeSeries is a timeseries.
@ -42,7 +65,8 @@ type TimeSeries struct {
Labels []Label
// Samples is a list of samples for the given TimeSeries
Samples []Sample
Samples []Sample
Exemplars []Exemplar
}
// Sample is a timeseries sample.
@ -74,7 +98,10 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
// }
tss := wr.Timeseries
labelsPool := wr.labelsPool
exemplarLabelsPool := wr.exemplarLabelsPool
samplesPool := wr.samplesPool
exemplarsPool := wr.exemplarsPool
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
@ -93,7 +120,7 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
tss = append(tss, TimeSeries{})
}
ts := &tss[len(tss)-1]
labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool)
labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, err = ts.unmarshalProtobuf(data, labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool)
if err != nil {
return fmt.Errorf("cannot unmarshal timeseries: %w", err)
}
@ -102,28 +129,31 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
wr.Timeseries = tss
wr.labelsPool = labelsPool
wr.samplesPool = samplesPool
wr.exemplarsPool = exemplarsPool
return nil
}
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) {
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplarLabelsPool []Label, samplesPool []Sample, exemplarsPool []Exemplar) ([]Label, []Label, []Sample, []Exemplar, error) {
// message TimeSeries {
// repeated Label labels = 1;
// repeated Sample samples = 2;
// repeated Exemplar exemplars = 3
// }
labelsPoolLen := len(labelsPool)
samplesPoolLen := len(samplesPool)
exemplarsPoolLen := len(exemplarsPool)
var fc easyproto.FieldContext
for len(src) > 0 {
var err error
src, err = fc.NextField(src)
if err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read label data")
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read label data")
}
if len(labelsPool) < cap(labelsPool) {
labelsPool = labelsPool[:len(labelsPool)+1]
@ -132,12 +162,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP
}
label := &labelsPool[len(labelsPool)-1]
if err := label.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal label: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data")
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the sample data")
}
if len(samplesPool) < cap(samplesPool) {
samplesPool = samplesPool[:len(samplesPool)+1]
@ -146,15 +176,78 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP
}
sample := &samplesPool[len(samplesPool)-1]
if err := sample.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal sample: %w", err)
}
case 3:
data, ok := fc.MessageData()
if !ok {
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the exemplar data")
}
if len(exemplarsPool) < cap(exemplarsPool) {
exemplarsPool = exemplarsPool[:len(exemplarsPool)+1]
} else {
exemplarsPool = append(exemplarsPool, Exemplar{})
}
exemplar := &exemplarsPool[len(exemplarsPool)-1]
if exemplarLabelsPool, err = exemplar.unmarshalProtobuf(data, exemplarLabelsPool); err != nil {
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal exemplar: %w", err)
}
}
}
ts.Labels = labelsPool[labelsPoolLen:]
ts.Samples = samplesPool[samplesPoolLen:]
return labelsPool, samplesPool, nil
ts.Exemplars = exemplarsPool[exemplarsPoolLen:]
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, nil
}
func (exemplar *Exemplar) unmarshalProtobuf(src []byte, labelsPool []Label) ([]Label, error) {
// message Exemplar {
// repeated Label Labels = 1;
// float64 Value = 2;
// int64 Timestamp = 3;
// }
var fc easyproto.FieldContext
labelsPoolLen := len(labelsPool)
for len(src) > 0 {
var err error
src, err = fc.NextField(src)
if err != nil {
return labelsPool, fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return labelsPool, fmt.Errorf("cannot read label data")
}
if len(labelsPool) < cap(labelsPool) {
labelsPool = labelsPool[:len(labelsPool)+1]
} else {
labelsPool = append(labelsPool, Label{})
}
label := &labelsPool[len(labelsPool)-1]
if err := label.unmarshalProtobuf(data); err != nil {
return labelsPool, fmt.Errorf("cannot unmarshal label: %w", err)
}
case 2:
value, ok := fc.Double()
if !ok {
return labelsPool, fmt.Errorf("cannot read exemplar value")
}
exemplar.Value = value
case 3:
timestamp, ok := fc.Int64()
if !ok {
return labelsPool, fmt.Errorf("cannot read exemplar timestamp")
}
exemplar.Timestamp = timestamp
}
}
exemplar.Labels = labelsPool[labelsPoolLen:]
return labelsPool, nil
}
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
// message Label {
// string name = 1;

View file

@ -36,9 +36,25 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
Timestamp: sample.Timestamp,
})
}
var exemplars []prompbmarshal.Exemplar
for _, exemplar := range ts.Exemplars {
exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels))
for i, label := range exemplar.Labels {
exemplarLabels[i] = prompbmarshal.Label{
Name: label.Name,
Value: label.Value,
}
}
exemplars = append(exemplars, prompbmarshal.Exemplar{
Labels: exemplarLabels,
Value: exemplar.Value,
Timestamp: exemplar.Timestamp,
})
}
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples,
Labels: labels,
Samples: samples,
Exemplars: exemplars,
})
}
dataResult := wrm.MarshalProtobuf(nil)
@ -121,6 +137,19 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
Timestamp: 18939432423,
},
},
Exemplars: []prompbmarshal.Exemplar{
{
Labels: []prompbmarshal.Label{
{Name: "trace-id",
Value: "123456",
},
{Name: "log_id",
Value: "987664"},
},
Value: 12345.6,
Timestamp: 456,
},
},
},
}
data = wrm.MarshalProtobuf(data[:0])
@ -153,6 +182,18 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
Timestamp: 18939432423,
},
},
Exemplars: []prompbmarshal.Exemplar{
{
Labels: []prompbmarshal.Label{
{
Name: "trace-id",
Value: "123456",
},
},
Value: 12345.6,
Timestamp: 456,
},
},
},
{
Labels: []prompbmarshal.Label{
@ -166,6 +207,22 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
Value: 9873,
},
},
Exemplars: []prompbmarshal.Exemplar{
{
Labels: []prompbmarshal.Label{
{
Name: "trace-id",
Value: "123456",
},
{
Name: "log_id",
Value: "987654",
},
},
Value: 12345.6,
Timestamp: 456,
},
},
},
}
data = wrm.MarshalProtobuf(data[:0])

View file

@ -36,6 +36,22 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) {
Timestamp: 18939432423,
},
},
Exemplars: []prompbmarshal.Exemplar{
{
Labels: []prompbmarshal.Label{
{
Name: "trace-id",
Value: "123456",
},
{
Name: "log_id",
Value: "987654",
},
},
Value: 12345.6,
Timestamp: 456,
},
},
},
},
}
@ -64,9 +80,25 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) {
Timestamp: sample.Timestamp,
})
}
var exemplars []prompbmarshal.Exemplar
for _, exemplar := range ts.Exemplars {
exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels))
for i, label := range exemplar.Labels {
exemplarLabels[i] = prompbmarshal.Label{
Name: label.Name,
Value: label.Value,
}
}
exemplars = append(exemplars, prompbmarshal.Exemplar{
Labels: exemplarLabels,
Value: exemplar.Value,
Timestamp: exemplar.Timestamp,
})
}
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples,
Labels: labels,
Samples: samples,
Exemplars: exemplars,
})
}
dataResult := wrm.MarshalProtobuf(nil)

View file

@ -13,10 +13,70 @@ type Sample struct {
Timestamp int64
}
type Exemplar struct {
// Optional, can be empty.
Labels []Label
Value float64
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
Timestamp int64
}
func (m *Exemplar) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.Timestamp != 0 {
i = encodeVarint(dAtA, i, uint64(m.Timestamp))
i--
dAtA[i] = 0x18
}
if m.Value != 0 {
i -= 8
binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
i--
dAtA[i] = 0x11
}
if len(m.Labels) > 0 {
for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarint(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *Exemplar) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Labels) > 0 {
for _, e := range m.Labels {
l = e.Size()
n += 1 + l + sov(uint64(l))
}
}
if m.Value != 0 {
n += 9
}
if m.Timestamp != 0 {
n += 1 + sov(uint64(m.Timestamp))
}
return n
}
// TimeSeries represents samples and labels for a single time series.
type TimeSeries struct {
Labels []Label
Samples []Sample
Labels []Label
Samples []Sample
Exemplars []Exemplar
}
type Label struct {
@ -42,6 +102,16 @@ func (m *Sample) MarshalToSizedBuffer(dst []byte) (int, error) {
func (m *TimeSeries) MarshalToSizedBuffer(dst []byte) (int, error) {
i := len(dst)
for j := len(m.Exemplars) - 1; j >= 0; j-- {
size, err := m.Exemplars[j].MarshalToSizedBuffer(dst[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarint(dst, i, uint64(size))
i--
dst[i] = 0x1a
}
for j := len(m.Samples) - 1; j >= 0; j-- {
size, err := m.Samples[j].MarshalToSizedBuffer(dst[:i])
if err != nil {
@ -109,6 +179,10 @@ func (m *TimeSeries) Size() (n int) {
l := e.Size()
n += 1 + l + sov(uint64(l))
}
for _, e := range m.Exemplars {
l := e.Size()
n += 1 + l + sov(uint64(l))
}
return n
}

View file

@ -30,6 +30,7 @@ var (
streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+
"for reducing memory usage when millions of metrics are exposed per each scrape target. "+
"It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
scrapeExemplars = flag.Bool("promscrape.scrapeExemplars", false, "Whether to enable scraping of exemplars from scrape targets.")
)
type client struct {
@ -107,6 +108,12 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
// We set to support exemplars to be compatible with Prometheus Exposition format which uses
// Open Metrics Specification
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#openmetrics-text-format
if *scrapeExemplars {
req.Header.Set("Accept", "application/openmetrics-text")
}
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)

View file

@ -670,6 +670,7 @@ type writeRequestCtx struct {
writeRequest prompbmarshal.WriteRequest
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
exemplars []prompbmarshal.Exemplar
}
func (wc *writeRequestCtx) reset() {
@ -684,6 +685,7 @@ func (wc *writeRequestCtx) resetNoRows() {
wc.labels = wc.labels[:0]
wc.samples = wc.samples[:0]
wc.exemplars = wc.exemplars[:0]
}
var writeRequestCtxPool leveledWriteRequestCtxPool
@ -902,10 +904,27 @@ func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, tim
Value: r.Value,
Timestamp: sampleTimestamp,
})
// Add Exemplars to Timeseries
exemplarsLen := len(wc.exemplars)
exemplarTagsLen := len(r.Exemplar.Tags)
if exemplarTagsLen > 0 {
exemplarLabels := make([]prompbmarshal.Label, exemplarTagsLen)
for i, label := range r.Exemplar.Tags {
exemplarLabels[i].Name = label.Key
exemplarLabels[i].Value = label.Value
}
wc.exemplars = append(wc.exemplars, prompbmarshal.Exemplar{
Labels: exemplarLabels,
Value: r.Exemplar.Value,
Timestamp: r.Exemplar.Timestamp,
})
}
wr := &wc.writeRequest
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{
Labels: wc.labels[labelsLen:],
Samples: wc.samples[len(wc.samples)-1:],
Labels: wc.labels[labelsLen:],
Samples: wc.samples[len(wc.samples)-1:],
Exemplars: wc.exemplars[exemplarsLen:],
})
}

View file

@ -708,6 +708,11 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) {
HonorLabels: true,
},
`metric{a="e",foo="bar"} 0 123`)
f(`metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`,
&ScrapeWork{
HonorLabels: true,
},
`metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`)
}
func TestSendStaleSeries(t *testing.T) {
@ -765,6 +770,8 @@ func parseData(data string) []prompbmarshal.TimeSeries {
}
rows.UnmarshalWithErrLogger(data, errLogger)
var tss []prompbmarshal.TimeSeries
var exemplars []prompbmarshal.Exemplar
for _, r := range rows.Rows {
labels := []prompbmarshal.Label{
{
@ -778,6 +785,21 @@ func parseData(data string) []prompbmarshal.TimeSeries {
Value: tag.Value,
})
}
exemplarLabels := []prompbmarshal.Label{}
if len(r.Exemplar.Tags) > 0 {
for _, tag := range r.Exemplar.Tags {
exemplarLabels = append(exemplarLabels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
exemplars = append(exemplars, prompbmarshal.Exemplar{
Labels: exemplarLabels,
Value: r.Exemplar.Value,
Timestamp: r.Exemplar.Timestamp,
})
}
var ts prompbmarshal.TimeSeries
ts.Labels = labels
ts.Samples = []prompbmarshal.Sample{
@ -786,6 +808,7 @@ func parseData(data string) []prompbmarshal.TimeSeries {
Timestamp: r.Timestamp,
},
}
ts.Exemplars = exemplars
tss = append(tss, ts)
}
return tss
@ -850,6 +873,19 @@ func timeseriesToString(ts *prompbmarshal.TimeSeries) string {
}
s := ts.Samples[0]
fmt.Fprintf(&sb, "%g %d", s.Value, s.Timestamp)
// Add Exemplars to the end of string
for j, exemplar := range ts.Exemplars {
for i, label := range exemplar.Labels {
fmt.Fprintf(&sb, "%s=%q", label.Name, label.Value)
if i+1 < len(ts.Labels) {
fmt.Fprintf(&sb, ",")
}
}
fmt.Fprintf(&sb, "%g %d", exemplar.Value, exemplar.Timestamp)
if j+1 < len(ts.Exemplars) {
fmt.Fprintf(&sb, ",")
}
}
return sb.String()
}

View file

@ -57,12 +57,33 @@ func (rs *Rows) UnmarshalWithErrLogger(s string, errLogger func(s string)) {
rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], noEscapes, errLogger)
}
const tagsPrefix = '{'
const exemplarPreifx = '{'
// Exemplar Item
type Exemplar struct {
// Tags: a list of labels that uniquely identifies exemplar
Tags []Tag
// Value: the value of the exemplar
Value float64
// Timestamp: the time when exemplar was recorded
Timestamp int64
}
// Reset - resets the Exemplar object to defaults
func (e *Exemplar) Reset() {
e.Tags = nil
e.Value = 0
e.Timestamp = 0
}
// Row is a single Prometheus row.
type Row struct {
Metric string
Tags []Tag
Value float64
Timestamp int64
Exemplar Exemplar
}
func (r *Row) reset() {
@ -70,6 +91,7 @@ func (r *Row) reset() {
r.Tags = nil
r.Value = 0
r.Timestamp = 0
r.Exemplar = Exemplar{}
}
func skipTrailingComment(s string) string {
@ -110,69 +132,140 @@ func nextWhitespace(s string) int {
return n1
}
func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) {
r.reset()
func parseStringToTags(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) {
n := strings.IndexByte(s, tagsPrefix)
c := strings.IndexByte(s, '#')
if c != -1 && c < n {
return s, tagsPool, nil
}
s = skipLeadingWhitespace(s)
n := strings.IndexByte(s, '{')
if n >= 0 {
// Tags found. Parse them.
r.Metric = skipTrailingWhitespace(s[:n])
s = s[n+1:]
tagsStart := len(tagsPool)
var err error
s, tagsPool, err = unmarshalTags(tagsPool, s, noEscapes)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err)
return s, tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err)
}
if len(s) > 0 && s[0] == ' ' {
// Fast path - skip whitespace.
s = s[1:]
}
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
} else {
// Tags weren't found. Search for value after whitespace
n = nextWhitespace(s)
if n < 0 {
return tagsPool, fmt.Errorf("missing value")
}
r.Metric = s[:n]
s = s[n+1:]
}
if len(r.Metric) == 0 {
return tagsPool, fmt.Errorf("metric cannot be empty")
return s, tagsPool, nil
}
var tvtPool = sync.Pool{New: func() interface{} {
return &tagsValueTimestamp{}
}}
func getTVT() *tagsValueTimestamp {
return tvtPool.Get().(*tagsValueTimestamp)
}
func putTVT(tvt *tagsValueTimestamp) {
tvt.reset()
tvtPool.Put(tvt)
}
type tagsValueTimestamp struct {
Prefix string
Value float64
Timestamp int64
Comments string
}
func (tvt *tagsValueTimestamp) reset() {
tvt.Prefix = ""
tvt.Value = 0
tvt.Timestamp = 0
tvt.Comments = ""
}
func parseTagsValueTimestamp(s string, tagsPool []Tag, noEscapes bool) ([]Tag, *tagsValueTimestamp, error) {
tvt := getTVT()
n := 0
// Prefix is everything up to a tag start or a space
t := strings.IndexByte(s, tagsPrefix)
// If there is no tag start process rest of string
mustParseTags := false
if t != -1 {
// Check to see if there is a space before tag
n = nextWhitespace(s)
// If there is a space
if n > 0 {
if n < t {
tvt.Prefix = s[:n]
s = skipLeadingWhitespace(s[n:])
// Cover the use case where there is whitespace between the prefix and the tag
if len(s) > 0 && s[0] == '{' {
mustParseTags = true
}
// Most likely this has an exemplar
} else {
tvt.Prefix = s[:t]
s = s[t:]
mustParseTags = true
}
}
if mustParseTags {
var err error
s, tagsPool, err = parseStringToTags(s, tagsPool, noEscapes)
if err != nil {
return tagsPool, tvt, err
}
}
} else {
// Tag doesn't exist
n = nextWhitespace(s)
if n != -1 {
tvt.Prefix = s[:n]
s = s[n:]
} else {
tvt.Prefix = s
return tagsPool, tvt, fmt.Errorf("missing value")
}
}
s = skipLeadingWhitespace(s)
s = skipTrailingComment(s)
if len(s) == 0 {
return tagsPool, fmt.Errorf("value cannot be empty")
// save and remove the comments
n = strings.IndexByte(s, '#')
if n >= 0 {
tvt.Comments = s[n:]
if len(tvt.Comments) > 1 {
tvt.Comments = s[n+1:]
} else {
tvt.Comments = ""
}
s = skipTrailingComment(s)
s = skipLeadingWhitespace(s)
s = skipTrailingWhitespace(s)
}
n = nextWhitespace(s)
if n < 0 {
// There is no timestamp.
v, err := fastfloat.Parse(s)
if err != nil {
return tagsPool, fmt.Errorf("cannot parse value %q: %w", s, err)
return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s, err)
}
r.Value = v
return tagsPool, nil
tvt.Value = v
return tagsPool, tvt, nil
}
// There is a timestamp.
// There is a timestamp
s = skipLeadingWhitespace(s)
v, err := fastfloat.Parse(s[:n])
if err != nil {
return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err)
return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s[:n], err)
}
r.Value = v
s = skipLeadingWhitespace(s[n+1:])
tvt.Value = v
s = s[n:]
// There are some whitespaces after timestamp
s = skipLeadingWhitespace(s)
if len(s) == 0 {
// There is no timestamp - just a whitespace after the value.
return tagsPool, nil
return tagsPool, tvt, nil
}
// There are some whitespaces after timestamp
s = skipTrailingWhitespace(s)
ts, err := fastfloat.Parse(s)
if err != nil {
return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
return tagsPool, tvt, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
if ts >= -1<<31 && ts < 1<<31 {
// This looks like OpenMetrics timestamp in Unix seconds.
@ -181,7 +274,68 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error)
// See https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps
ts *= 1000
}
r.Timestamp = int64(ts)
tvt.Timestamp = int64(ts)
return tagsPool, tvt, nil
}
// Returns possible comments that could be exemplars
func (r *Row) unmarshalMetric(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) {
tagsStart := len(tagsPool)
var err error
var tvt *tagsValueTimestamp
tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes)
defer putTVT(tvt)
if err != nil {
return "", tagsPool, err
}
r.Metric = tvt.Prefix
tags := tagsPool[tagsStart:]
if len(tags) > 0 {
r.Tags = tags
}
r.Value = tvt.Value
r.Timestamp = tvt.Timestamp
return tvt.Comments, tagsPool, nil
}
func (e *Exemplar) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) {
// We can use the Comment parsed out to further parse the Exemplar
s = skipLeadingWhitespace(s)
// If we are a comment immediately followed by whitespace or a labelset
// then we are an exemplar
if len(s) != 0 && s[0] == exemplarPreifx {
var err error
var tvt *tagsValueTimestamp
tagsStart := len(tagsPool)
tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes)
defer putTVT(tvt)
if err != nil {
return tagsPool, err
}
tags := tagsPool[tagsStart:]
if len(tags) > 0 {
e.Tags = tags
}
e.Value = tvt.Value
e.Timestamp = tvt.Timestamp
return tagsPool, nil
}
return tagsPool, nil
}
func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) {
r.reset()
// Parse for labels, the value and the timestamp
// Anything before labels is saved in Prefix we can use this
// for the metric name
comments, tagsPool, err := r.unmarshalMetric(s, tagsPool, noEscapes)
if err != nil {
return nil, err
}
tagsPool, err = r.Exemplar.unmarshal(comments, tagsPool, noEscapes)
if err != nil {
return nil, err
}
return tagsPool, nil
}

View file

@ -362,12 +362,87 @@ cassandra_token_ownership_ratio 78.9`, &Rows{
Value: 56,
}},
})
// Exemplars - see https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1
f(`foo_bucket{le="10",a="#b"} 17 # {trace_id="oHg5SJ#YRHA0"} 9.8 1520879607.789
abc 123 456 # foobar
foo 344#bar`, &Rows{
// Support for Exemplars Open Metric Specification
// see: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars-1
f(`foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789`, &Rows{
Rows: []Row{
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "le",
Value: "25",
},
},
Value: 17,
Exemplar: Exemplar{
Value: 9.8,
Tags: []Tag{
{
Key: "trace_id",
Value: "oHg5SJYRHA0",
},
{
Key: "log_id",
Value: "test_id",
},
},
Timestamp: 1520879607789,
},
},
}})
f(`foo_bucket{le="0.01"} 0
foo_bucket{le="0.1"} 8 # {} 0.054
foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67
foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789
foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
foo_bucket{le="+Inf"} 17
foo_count 17
foo_sum 324789.3
foo_created 1520430000.123`, &Rows{
Rows: []Row{
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "le",
Value: "0.01",
},
},
},
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "le",
Value: "0.1",
},
},
Value: 8,
Exemplar: Exemplar{
Value: 0.054,
},
},
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "le",
Value: "1",
},
},
Value: 11,
Exemplar: Exemplar{
Value: 0.67,
Tags: []Tag{
{
Key: "trace_id",
Value: "KOO5S4vxi0o",
},
},
},
},
{
Metric: "foo_bucket",
Tags: []Tag{
@ -375,21 +450,84 @@ cassandra_token_ownership_ratio 78.9`, &Rows{
Key: "le",
Value: "10",
},
},
Value: 17,
Exemplar: Exemplar{
Value: 9.8,
Tags: []Tag{
{
Key: "trace_id",
Value: "oHg5SJYRHA0",
},
},
Timestamp: 1520879607789,
},
},
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "a",
Value: "#b",
Key: "le",
Value: "25",
},
},
Value: 17,
Exemplar: Exemplar{
Value: 9.8,
Tags: []Tag{
{
Key: "trace_id",
Value: "oHg5SJYRHA0",
},
{
Key: "log_id",
Value: "test_id",
},
},
Timestamp: 1520879607789,
},
},
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "nospace",
Value: "exemplar",
},
},
Value: 17,
Exemplar: Exemplar{
Value: 9.8,
Tags: []Tag{
{
Key: "trace_id",
Value: "oHg5SJYRHA0",
},
},
Timestamp: 1520879607789,
},
},
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "le",
Value: "+Inf",
},
},
Value: 17,
},
{
Metric: "abc",
Value: 123,
Timestamp: 456000,
Metric: "foo_count",
Value: 17,
},
{
Metric: "foo",
Value: 344,
Metric: "foo_sum",
Value: 324789.3,
},
{
Metric: "foo_created",
Value: 1520430000.123,
},
},
})

View file

@ -146,10 +146,10 @@ container_ulimits_soft{container="kube-scheduler",id="/kubelet/kubepods/burstabl
}
func BenchmarkRowsUnmarshal(b *testing.B) {
s := `cpu_usage{mode="user"} 1.23
cpu_usage{mode="system"} 23.344
cpu_usage{mode="iowait"} 3.3443
cpu_usage{mode="irq"} 0.34432
s := `foo_bucket{le="0.01"} 0
foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67
foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
`
b.SetBytes(int64(len(s)))
b.ReportAllocs()