mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
Revert "Exemplar support (#5982)"
This reverts commit5a3abfa041
. Reason for revert: exemplars aren't in wide use because they have numerous issues which prevent their adoption (see below). Adding support for examplars into VictoriaMetrics introduces non-trivial code changes. These code changes need to be supported forever once the release of VictoriaMetrics with exemplar support is published. That's why I don't think this is a good feature despite that the source code of the reverted commit has an excellent quality. See https://docs.victoriametrics.com/goals/ . Issues with Prometheus exemplars: - Prometheus still has only experimental support for exemplars after more than three years since they were introduced. It stores exemplars in memory, so they are lost after Prometheus restart. This doesn't look like production-ready feature. See0a2f3b3794/content/docs/instrumenting/exposition_formats.md (L153-L159)
and https://prometheus.io/docs/prometheus/latest/feature_flags/#exemplars-storage - It is very non-trivial to expose exemplars alongside metrics in your application, since the official Prometheus SDKs for metrics' exposition ( https://prometheus.io/docs/instrumenting/clientlibs/ ) either have very hard-to-use API for exposing histograms or do not have this API at all. For example, try figuring out how to expose exemplars via https://pkg.go.dev/github.com/prometheus/client_golang@v1.19.1/prometheus . - It looks like exemplars are supported for Histogram metric types only - see https://pkg.go.dev/github.com/prometheus/client_golang@v1.19.1/prometheus#Timer.ObserveDurationWithExemplar . Exemplars aren't supported for Counter, Gauge and Summary metric types. - Grafana has very poor support for Prometheus exemplars. It looks like it supports exemplars only when the query contains histogram_quantile() function. It queries exemplars via special Prometheus API - https://prometheus.io/docs/prometheus/latest/querying/api/#querying-exemplars - (which is still marked as experimental, btw.) and then displays all the returned exemplars on the graph as special dots. The issue is that this doesn't work in production in most cases when the histogram_quantile() is calculated over thousands of histogram buckets exposed by big number of application instances. Every histogram bucket may expose an exemplar on every timestamp shown on the graph. This makes the graph unusable, since it is litterally filled with thousands of exemplar dots. Neither Prometheus API nor Grafana doesn't provide the ability to filter out unneeded exemplars. - Exemplars are usually connected to traces. While traces are good for some I doubt exemplars will become production-ready in the near future because of the issues outlined above. Alternative to exemplars: Exemplars are marketed as a silver bullet for the correlation between metrics, traces and logs - just click the exemplar dot on some graph in Grafana and instantly see the corresponding trace or log entry! This doesn't work as expected in production as shown above. Are there better solutions, which work in production? Yes - just use time-based and label-based correlation between metrics, traces and logs. Assign the same `job` and `instance` labels to metrics, logs and traces, so you can quickly find the needed trace or log entry by these labes on the time range with the anomaly on metrics' graph.
This commit is contained in:
parent
cc4d57d650
commit
bb00bae353
12 changed files with 73 additions and 714 deletions
|
@ -112,7 +112,6 @@ type writeRequest struct {
|
||||||
tss []prompbmarshal.TimeSeries
|
tss []prompbmarshal.TimeSeries
|
||||||
labels []prompbmarshal.Label
|
labels []prompbmarshal.Label
|
||||||
samples []prompbmarshal.Sample
|
samples []prompbmarshal.Sample
|
||||||
exemplars []prompbmarshal.Exemplar
|
|
||||||
|
|
||||||
// buf holds labels data
|
// buf holds labels data
|
||||||
buf []byte
|
buf []byte
|
||||||
|
@ -130,7 +129,6 @@ func (wr *writeRequest) reset() {
|
||||||
wr.labels = wr.labels[:0]
|
wr.labels = wr.labels[:0]
|
||||||
|
|
||||||
wr.samples = wr.samples[:0]
|
wr.samples = wr.samples[:0]
|
||||||
wr.exemplars = wr.exemplars[:0]
|
|
||||||
wr.buf = wr.buf[:0]
|
wr.buf = wr.buf[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,7 +200,6 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
|
||||||
labelsDst := wr.labels
|
labelsDst := wr.labels
|
||||||
labelsLen := len(wr.labels)
|
labelsLen := len(wr.labels)
|
||||||
samplesDst := wr.samples
|
samplesDst := wr.samples
|
||||||
exemplarsDst := wr.exemplars
|
|
||||||
buf := wr.buf
|
buf := wr.buf
|
||||||
for i := range src.Labels {
|
for i := range src.Labels {
|
||||||
labelsDst = append(labelsDst, prompbmarshal.Label{})
|
labelsDst = append(labelsDst, prompbmarshal.Label{})
|
||||||
|
@ -219,12 +216,8 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
|
||||||
samplesDst = append(samplesDst, src.Samples...)
|
samplesDst = append(samplesDst, src.Samples...)
|
||||||
dst.Samples = samplesDst[len(samplesDst)-len(src.Samples):]
|
dst.Samples = samplesDst[len(samplesDst)-len(src.Samples):]
|
||||||
|
|
||||||
exemplarsDst = append(exemplarsDst, src.Exemplars...)
|
|
||||||
dst.Exemplars = exemplarsDst[len(exemplarsDst)-len(src.Exemplars):]
|
|
||||||
|
|
||||||
wr.samples = samplesDst
|
wr.samples = samplesDst
|
||||||
wr.labels = labelsDst
|
wr.labels = labelsDst
|
||||||
wr.exemplars = exemplarsDst
|
|
||||||
wr.buf = buf
|
wr.buf = buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,6 +229,7 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block
|
||||||
// Nothing to push
|
// Nothing to push
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
marshalConcurrencyCh <- struct{}{}
|
marshalConcurrencyCh <- struct{}{}
|
||||||
|
|
||||||
bb := writeRequestBufPool.Get()
|
bb := writeRequestBufPool.Get()
|
||||||
|
@ -272,8 +266,6 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block
|
||||||
if len(wr.Timeseries) == 1 {
|
if len(wr.Timeseries) == 1 {
|
||||||
// A single time series left. Recursively split its samples into smaller parts if possible.
|
// A single time series left. Recursively split its samples into smaller parts if possible.
|
||||||
samples := wr.Timeseries[0].Samples
|
samples := wr.Timeseries[0].Samples
|
||||||
exemplars := wr.Timeseries[0].Exemplars
|
|
||||||
|
|
||||||
if len(samples) == 1 {
|
if len(samples) == 1 {
|
||||||
logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N)
|
logger.Warnf("dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes", maxUnpackedBlockSize.N)
|
||||||
return true
|
return true
|
||||||
|
@ -285,16 +277,11 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
wr.Timeseries[0].Samples = samples[n:]
|
wr.Timeseries[0].Samples = samples[n:]
|
||||||
// We do not want to send exemplars twice
|
|
||||||
wr.Timeseries[0].Exemplars = nil
|
|
||||||
|
|
||||||
if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) {
|
if !tryPushWriteRequest(wr, tryPushBlock, isVMRemoteWrite) {
|
||||||
wr.Timeseries[0].Samples = samples
|
wr.Timeseries[0].Samples = samples
|
||||||
wr.Timeseries[0].Exemplars = exemplars
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
wr.Timeseries[0].Samples = samples
|
wr.Timeseries[0].Samples = samples
|
||||||
wr.Timeseries[0].Exemplars = exemplars
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
timeseries := wr.Timeseries
|
timeseries := wr.Timeseries
|
||||||
|
|
|
@ -10,8 +10,8 @@ import (
|
||||||
|
|
||||||
func TestPushWriteRequest(t *testing.T) {
|
func TestPushWriteRequest(t *testing.T) {
|
||||||
rowsCounts := []int{1, 10, 100, 1e3, 1e4}
|
rowsCounts := []int{1, 10, 100, 1e3, 1e4}
|
||||||
expectedBlockLensProm := []int{248, 1952, 17433, 180381, 1861994}
|
expectedBlockLensProm := []int{216, 1848, 16424, 169882, 1757876}
|
||||||
expectedBlockLensVM := []int{170, 575, 4748, 44936, 367096}
|
expectedBlockLensVM := []int{138, 492, 3927, 34995, 288476}
|
||||||
for i, rowsCount := range rowsCounts {
|
for i, rowsCount := range rowsCounts {
|
||||||
expectedBlockLenProm := expectedBlockLensProm[i]
|
expectedBlockLenProm := expectedBlockLensProm[i]
|
||||||
expectedBlockLenVM := expectedBlockLensVM[i]
|
expectedBlockLenVM := expectedBlockLensVM[i]
|
||||||
|
@ -59,20 +59,6 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque
|
||||||
Value: fmt.Sprintf("value_%d_%d", i, j),
|
Value: fmt.Sprintf("value_%d_%d", i, j),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
exemplar := prompbmarshal.Exemplar{
|
|
||||||
Labels: []prompbmarshal.Label{
|
|
||||||
{
|
|
||||||
Name: "trace_id",
|
|
||||||
Value: "123456",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "log_id",
|
|
||||||
Value: "987654",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: float64(i),
|
|
||||||
Timestamp: 1000 * int64(i),
|
|
||||||
}
|
|
||||||
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{
|
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
Samples: []prompbmarshal.Sample{
|
Samples: []prompbmarshal.Sample{
|
||||||
|
@ -81,10 +67,6 @@ func newTestWriteRequest(seriesCount, labelsCount int) *prompbmarshal.WriteReque
|
||||||
Timestamp: 1000 * int64(i),
|
Timestamp: 1000 * int64(i),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
Exemplars: []prompbmarshal.Exemplar{
|
|
||||||
exemplar,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return &wr
|
return &wr
|
||||||
|
|
|
@ -10,10 +10,9 @@ import (
|
||||||
type WriteRequest struct {
|
type WriteRequest struct {
|
||||||
// Timeseries is a list of time series in the given WriteRequest
|
// Timeseries is a list of time series in the given WriteRequest
|
||||||
Timeseries []TimeSeries
|
Timeseries []TimeSeries
|
||||||
|
|
||||||
labelsPool []Label
|
labelsPool []Label
|
||||||
exemplarLabelsPool []Label
|
|
||||||
samplesPool []Sample
|
samplesPool []Sample
|
||||||
exemplarsPool []Exemplar
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset resets wr for subsequent re-use.
|
// Reset resets wr for subsequent re-use.
|
||||||
|
@ -30,33 +29,11 @@ func (wr *WriteRequest) Reset() {
|
||||||
}
|
}
|
||||||
wr.labelsPool = labelsPool[:0]
|
wr.labelsPool = labelsPool[:0]
|
||||||
|
|
||||||
exemplarLabelsPool := wr.exemplarLabelsPool
|
|
||||||
for i := range exemplarLabelsPool {
|
|
||||||
exemplarLabelsPool[i] = Label{}
|
|
||||||
}
|
|
||||||
wr.labelsPool = labelsPool[:0]
|
|
||||||
samplesPool := wr.samplesPool
|
samplesPool := wr.samplesPool
|
||||||
for i := range samplesPool {
|
for i := range samplesPool {
|
||||||
samplesPool[i] = Sample{}
|
samplesPool[i] = Sample{}
|
||||||
}
|
}
|
||||||
wr.samplesPool = samplesPool[:0]
|
wr.samplesPool = samplesPool[:0]
|
||||||
exemplarsPool := wr.exemplarsPool
|
|
||||||
for i := range exemplarsPool {
|
|
||||||
exemplarsPool[i] = Exemplar{}
|
|
||||||
}
|
|
||||||
wr.exemplarsPool = exemplarsPool[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exemplar is an exemplar
|
|
||||||
type Exemplar struct {
|
|
||||||
// Labels a list of labels that uniquely identifies exemplar
|
|
||||||
// Optional, can be empty.
|
|
||||||
Labels []Label
|
|
||||||
// Value: the value of the exemplar
|
|
||||||
Value float64
|
|
||||||
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
|
||||||
// conversion from time.Time to Prometheus timestamp.
|
|
||||||
Timestamp int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TimeSeries is a timeseries.
|
// TimeSeries is a timeseries.
|
||||||
|
@ -66,7 +43,6 @@ type TimeSeries struct {
|
||||||
|
|
||||||
// Samples is a list of samples for the given TimeSeries
|
// Samples is a list of samples for the given TimeSeries
|
||||||
Samples []Sample
|
Samples []Sample
|
||||||
Exemplars []Exemplar
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sample is a timeseries sample.
|
// Sample is a timeseries sample.
|
||||||
|
@ -98,10 +74,7 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
|
||||||
// }
|
// }
|
||||||
tss := wr.Timeseries
|
tss := wr.Timeseries
|
||||||
labelsPool := wr.labelsPool
|
labelsPool := wr.labelsPool
|
||||||
exemplarLabelsPool := wr.exemplarLabelsPool
|
|
||||||
samplesPool := wr.samplesPool
|
samplesPool := wr.samplesPool
|
||||||
exemplarsPool := wr.exemplarsPool
|
|
||||||
|
|
||||||
var fc easyproto.FieldContext
|
var fc easyproto.FieldContext
|
||||||
for len(src) > 0 {
|
for len(src) > 0 {
|
||||||
src, err = fc.NextField(src)
|
src, err = fc.NextField(src)
|
||||||
|
@ -120,7 +93,7 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
|
||||||
tss = append(tss, TimeSeries{})
|
tss = append(tss, TimeSeries{})
|
||||||
}
|
}
|
||||||
ts := &tss[len(tss)-1]
|
ts := &tss[len(tss)-1]
|
||||||
labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, err = ts.unmarshalProtobuf(data, labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool)
|
labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal timeseries: %w", err)
|
return fmt.Errorf("cannot unmarshal timeseries: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -129,31 +102,28 @@ func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
|
||||||
wr.Timeseries = tss
|
wr.Timeseries = tss
|
||||||
wr.labelsPool = labelsPool
|
wr.labelsPool = labelsPool
|
||||||
wr.samplesPool = samplesPool
|
wr.samplesPool = samplesPool
|
||||||
wr.exemplarsPool = exemplarsPool
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplarLabelsPool []Label, samplesPool []Sample, exemplarsPool []Exemplar) ([]Label, []Label, []Sample, []Exemplar, error) {
|
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) {
|
||||||
// message TimeSeries {
|
// message TimeSeries {
|
||||||
// repeated Label labels = 1;
|
// repeated Label labels = 1;
|
||||||
// repeated Sample samples = 2;
|
// repeated Sample samples = 2;
|
||||||
// repeated Exemplar exemplars = 3
|
|
||||||
// }
|
// }
|
||||||
labelsPoolLen := len(labelsPool)
|
labelsPoolLen := len(labelsPool)
|
||||||
samplesPoolLen := len(samplesPool)
|
samplesPoolLen := len(samplesPool)
|
||||||
exemplarsPoolLen := len(exemplarsPool)
|
|
||||||
var fc easyproto.FieldContext
|
var fc easyproto.FieldContext
|
||||||
for len(src) > 0 {
|
for len(src) > 0 {
|
||||||
var err error
|
var err error
|
||||||
src, err = fc.NextField(src)
|
src, err = fc.NextField(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the next field: %w", err)
|
return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
|
||||||
}
|
}
|
||||||
switch fc.FieldNum {
|
switch fc.FieldNum {
|
||||||
case 1:
|
case 1:
|
||||||
data, ok := fc.MessageData()
|
data, ok := fc.MessageData()
|
||||||
if !ok {
|
if !ok {
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read label data")
|
return labelsPool, samplesPool, fmt.Errorf("cannot read label data")
|
||||||
}
|
}
|
||||||
if len(labelsPool) < cap(labelsPool) {
|
if len(labelsPool) < cap(labelsPool) {
|
||||||
labelsPool = labelsPool[:len(labelsPool)+1]
|
labelsPool = labelsPool[:len(labelsPool)+1]
|
||||||
|
@ -162,12 +132,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplar
|
||||||
}
|
}
|
||||||
label := &labelsPool[len(labelsPool)-1]
|
label := &labelsPool[len(labelsPool)-1]
|
||||||
if err := label.unmarshalProtobuf(data); err != nil {
|
if err := label.unmarshalProtobuf(data); err != nil {
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal label: %w", err)
|
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
|
||||||
}
|
}
|
||||||
case 2:
|
case 2:
|
||||||
data, ok := fc.MessageData()
|
data, ok := fc.MessageData()
|
||||||
if !ok {
|
if !ok {
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the sample data")
|
return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data")
|
||||||
}
|
}
|
||||||
if len(samplesPool) < cap(samplesPool) {
|
if len(samplesPool) < cap(samplesPool) {
|
||||||
samplesPool = samplesPool[:len(samplesPool)+1]
|
samplesPool = samplesPool[:len(samplesPool)+1]
|
||||||
|
@ -176,78 +146,15 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, exemplar
|
||||||
}
|
}
|
||||||
sample := &samplesPool[len(samplesPool)-1]
|
sample := &samplesPool[len(samplesPool)-1]
|
||||||
if err := sample.unmarshalProtobuf(data); err != nil {
|
if err := sample.unmarshalProtobuf(data); err != nil {
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal sample: %w", err)
|
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
|
||||||
}
|
|
||||||
case 3:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot read the exemplar data")
|
|
||||||
}
|
|
||||||
if len(exemplarsPool) < cap(exemplarsPool) {
|
|
||||||
exemplarsPool = exemplarsPool[:len(exemplarsPool)+1]
|
|
||||||
} else {
|
|
||||||
exemplarsPool = append(exemplarsPool, Exemplar{})
|
|
||||||
}
|
|
||||||
exemplar := &exemplarsPool[len(exemplarsPool)-1]
|
|
||||||
if exemplarLabelsPool, err = exemplar.unmarshalProtobuf(data, exemplarLabelsPool); err != nil {
|
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, fmt.Errorf("cannot unmarshal exemplar: %w", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ts.Labels = labelsPool[labelsPoolLen:]
|
ts.Labels = labelsPool[labelsPoolLen:]
|
||||||
ts.Samples = samplesPool[samplesPoolLen:]
|
ts.Samples = samplesPool[samplesPoolLen:]
|
||||||
ts.Exemplars = exemplarsPool[exemplarsPoolLen:]
|
return labelsPool, samplesPool, nil
|
||||||
return labelsPool, exemplarLabelsPool, samplesPool, exemplarsPool, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exemplar *Exemplar) unmarshalProtobuf(src []byte, labelsPool []Label) ([]Label, error) {
|
|
||||||
// message Exemplar {
|
|
||||||
// repeated Label Labels = 1;
|
|
||||||
// float64 Value = 2;
|
|
||||||
// int64 Timestamp = 3;
|
|
||||||
// }
|
|
||||||
var fc easyproto.FieldContext
|
|
||||||
|
|
||||||
labelsPoolLen := len(labelsPool)
|
|
||||||
|
|
||||||
for len(src) > 0 {
|
|
||||||
var err error
|
|
||||||
src, err = fc.NextField(src)
|
|
||||||
if err != nil {
|
|
||||||
return labelsPool, fmt.Errorf("cannot read the next field: %w", err)
|
|
||||||
}
|
|
||||||
switch fc.FieldNum {
|
|
||||||
case 1:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return labelsPool, fmt.Errorf("cannot read label data")
|
|
||||||
}
|
|
||||||
if len(labelsPool) < cap(labelsPool) {
|
|
||||||
labelsPool = labelsPool[:len(labelsPool)+1]
|
|
||||||
} else {
|
|
||||||
labelsPool = append(labelsPool, Label{})
|
|
||||||
}
|
|
||||||
label := &labelsPool[len(labelsPool)-1]
|
|
||||||
if err := label.unmarshalProtobuf(data); err != nil {
|
|
||||||
return labelsPool, fmt.Errorf("cannot unmarshal label: %w", err)
|
|
||||||
}
|
|
||||||
case 2:
|
|
||||||
value, ok := fc.Double()
|
|
||||||
if !ok {
|
|
||||||
return labelsPool, fmt.Errorf("cannot read exemplar value")
|
|
||||||
}
|
|
||||||
exemplar.Value = value
|
|
||||||
case 3:
|
|
||||||
timestamp, ok := fc.Int64()
|
|
||||||
if !ok {
|
|
||||||
return labelsPool, fmt.Errorf("cannot read exemplar timestamp")
|
|
||||||
}
|
|
||||||
exemplar.Timestamp = timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
exemplar.Labels = labelsPool[labelsPoolLen:]
|
|
||||||
return labelsPool, nil
|
|
||||||
}
|
|
||||||
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
|
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
|
||||||
// message Label {
|
// message Label {
|
||||||
// string name = 1;
|
// string name = 1;
|
||||||
|
|
|
@ -36,25 +36,9 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
|
||||||
Timestamp: sample.Timestamp,
|
Timestamp: sample.Timestamp,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
var exemplars []prompbmarshal.Exemplar
|
|
||||||
for _, exemplar := range ts.Exemplars {
|
|
||||||
exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels))
|
|
||||||
for i, label := range exemplar.Labels {
|
|
||||||
exemplarLabels[i] = prompbmarshal.Label{
|
|
||||||
Name: label.Name,
|
|
||||||
Value: label.Value,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
exemplars = append(exemplars, prompbmarshal.Exemplar{
|
|
||||||
Labels: exemplarLabels,
|
|
||||||
Value: exemplar.Value,
|
|
||||||
Timestamp: exemplar.Timestamp,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
|
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
Samples: samples,
|
Samples: samples,
|
||||||
Exemplars: exemplars,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
dataResult := wrm.MarshalProtobuf(nil)
|
dataResult := wrm.MarshalProtobuf(nil)
|
||||||
|
@ -137,19 +121,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
|
||||||
Timestamp: 18939432423,
|
Timestamp: 18939432423,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Exemplars: []prompbmarshal.Exemplar{
|
|
||||||
{
|
|
||||||
Labels: []prompbmarshal.Label{
|
|
||||||
{Name: "trace-id",
|
|
||||||
Value: "123456",
|
|
||||||
},
|
|
||||||
{Name: "log_id",
|
|
||||||
Value: "987664"},
|
|
||||||
},
|
|
||||||
Value: 12345.6,
|
|
||||||
Timestamp: 456,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
data = wrm.MarshalProtobuf(data[:0])
|
data = wrm.MarshalProtobuf(data[:0])
|
||||||
|
@ -182,18 +153,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
|
||||||
Timestamp: 18939432423,
|
Timestamp: 18939432423,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Exemplars: []prompbmarshal.Exemplar{
|
|
||||||
{
|
|
||||||
Labels: []prompbmarshal.Label{
|
|
||||||
{
|
|
||||||
Name: "trace-id",
|
|
||||||
Value: "123456",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 12345.6,
|
|
||||||
Timestamp: 456,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []prompbmarshal.Label{
|
Labels: []prompbmarshal.Label{
|
||||||
|
@ -207,22 +166,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
|
||||||
Value: 9873,
|
Value: 9873,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Exemplars: []prompbmarshal.Exemplar{
|
|
||||||
{
|
|
||||||
Labels: []prompbmarshal.Label{
|
|
||||||
{
|
|
||||||
Name: "trace-id",
|
|
||||||
Value: "123456",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "log_id",
|
|
||||||
Value: "987654",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 12345.6,
|
|
||||||
Timestamp: 456,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
data = wrm.MarshalProtobuf(data[:0])
|
data = wrm.MarshalProtobuf(data[:0])
|
||||||
|
|
|
@ -36,22 +36,6 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) {
|
||||||
Timestamp: 18939432423,
|
Timestamp: 18939432423,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Exemplars: []prompbmarshal.Exemplar{
|
|
||||||
{
|
|
||||||
Labels: []prompbmarshal.Label{
|
|
||||||
{
|
|
||||||
Name: "trace-id",
|
|
||||||
Value: "123456",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "log_id",
|
|
||||||
Value: "987654",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 12345.6,
|
|
||||||
Timestamp: 456,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -80,25 +64,9 @@ func TestWriteRequestMarshalProtobuf(t *testing.T) {
|
||||||
Timestamp: sample.Timestamp,
|
Timestamp: sample.Timestamp,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
var exemplars []prompbmarshal.Exemplar
|
|
||||||
for _, exemplar := range ts.Exemplars {
|
|
||||||
exemplarLabels := make([]prompbmarshal.Label, len(exemplar.Labels))
|
|
||||||
for i, label := range exemplar.Labels {
|
|
||||||
exemplarLabels[i] = prompbmarshal.Label{
|
|
||||||
Name: label.Name,
|
|
||||||
Value: label.Value,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
exemplars = append(exemplars, prompbmarshal.Exemplar{
|
|
||||||
Labels: exemplarLabels,
|
|
||||||
Value: exemplar.Value,
|
|
||||||
Timestamp: exemplar.Timestamp,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
|
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
Samples: samples,
|
Samples: samples,
|
||||||
Exemplars: exemplars,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
dataResult := wrm.MarshalProtobuf(nil)
|
dataResult := wrm.MarshalProtobuf(nil)
|
||||||
|
|
|
@ -13,70 +13,10 @@ type Sample struct {
|
||||||
Timestamp int64
|
Timestamp int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type Exemplar struct {
|
|
||||||
// Optional, can be empty.
|
|
||||||
Labels []Label
|
|
||||||
Value float64
|
|
||||||
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
|
||||||
// conversion from time.Time to Prometheus timestamp.
|
|
||||||
Timestamp int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Exemplar) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|
||||||
i := len(dAtA)
|
|
||||||
if m.Timestamp != 0 {
|
|
||||||
i = encodeVarint(dAtA, i, uint64(m.Timestamp))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x18
|
|
||||||
}
|
|
||||||
if m.Value != 0 {
|
|
||||||
i -= 8
|
|
||||||
binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x11
|
|
||||||
}
|
|
||||||
if len(m.Labels) > 0 {
|
|
||||||
for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- {
|
|
||||||
{
|
|
||||||
size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
i -= size
|
|
||||||
i = encodeVarint(dAtA, i, uint64(size))
|
|
||||||
}
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0xa
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return len(dAtA) - i, nil
|
|
||||||
}
|
|
||||||
func (m *Exemplar) Size() (n int) {
|
|
||||||
if m == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
if len(m.Labels) > 0 {
|
|
||||||
for _, e := range m.Labels {
|
|
||||||
l = e.Size()
|
|
||||||
n += 1 + l + sov(uint64(l))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if m.Value != 0 {
|
|
||||||
n += 9
|
|
||||||
}
|
|
||||||
if m.Timestamp != 0 {
|
|
||||||
n += 1 + sov(uint64(m.Timestamp))
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// TimeSeries represents samples and labels for a single time series.
|
// TimeSeries represents samples and labels for a single time series.
|
||||||
type TimeSeries struct {
|
type TimeSeries struct {
|
||||||
Labels []Label
|
Labels []Label
|
||||||
Samples []Sample
|
Samples []Sample
|
||||||
Exemplars []Exemplar
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Label struct {
|
type Label struct {
|
||||||
|
@ -102,16 +42,6 @@ func (m *Sample) MarshalToSizedBuffer(dst []byte) (int, error) {
|
||||||
|
|
||||||
func (m *TimeSeries) MarshalToSizedBuffer(dst []byte) (int, error) {
|
func (m *TimeSeries) MarshalToSizedBuffer(dst []byte) (int, error) {
|
||||||
i := len(dst)
|
i := len(dst)
|
||||||
for j := len(m.Exemplars) - 1; j >= 0; j-- {
|
|
||||||
size, err := m.Exemplars[j].MarshalToSizedBuffer(dst[:i])
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
i -= size
|
|
||||||
i = encodeVarint(dst, i, uint64(size))
|
|
||||||
i--
|
|
||||||
dst[i] = 0x1a
|
|
||||||
}
|
|
||||||
for j := len(m.Samples) - 1; j >= 0; j-- {
|
for j := len(m.Samples) - 1; j >= 0; j-- {
|
||||||
size, err := m.Samples[j].MarshalToSizedBuffer(dst[:i])
|
size, err := m.Samples[j].MarshalToSizedBuffer(dst[:i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -179,10 +109,6 @@ func (m *TimeSeries) Size() (n int) {
|
||||||
l := e.Size()
|
l := e.Size()
|
||||||
n += 1 + l + sov(uint64(l))
|
n += 1 + l + sov(uint64(l))
|
||||||
}
|
}
|
||||||
for _, e := range m.Exemplars {
|
|
||||||
l := e.Size()
|
|
||||||
n += 1 + l + sov(uint64(l))
|
|
||||||
}
|
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,6 @@ var (
|
||||||
streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+
|
streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+
|
||||||
"for reducing memory usage when millions of metrics are exposed per each scrape target. "+
|
"for reducing memory usage when millions of metrics are exposed per each scrape target. "+
|
||||||
"It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
|
"It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
|
||||||
scrapeExemplars = flag.Bool("promscrape.scrapeExemplars", false, "Whether to enable scraping of exemplars from scrape targets.")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type client struct {
|
type client struct {
|
||||||
|
@ -109,12 +108,6 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error {
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
|
||||||
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
|
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
|
||||||
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
|
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
|
||||||
// We set to support exemplars to be compatible with Prometheus Exposition format which uses
|
|
||||||
// Open Metrics Specification
|
|
||||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#openmetrics-text-format
|
|
||||||
if *scrapeExemplars {
|
|
||||||
req.Header.Set("Accept", "application/openmetrics-text")
|
|
||||||
}
|
|
||||||
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
|
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
|
||||||
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
|
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
|
||||||
|
|
|
@ -675,7 +675,6 @@ type writeRequestCtx struct {
|
||||||
writeRequest prompbmarshal.WriteRequest
|
writeRequest prompbmarshal.WriteRequest
|
||||||
labels []prompbmarshal.Label
|
labels []prompbmarshal.Label
|
||||||
samples []prompbmarshal.Sample
|
samples []prompbmarshal.Sample
|
||||||
exemplars []prompbmarshal.Exemplar
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wc *writeRequestCtx) reset() {
|
func (wc *writeRequestCtx) reset() {
|
||||||
|
@ -690,7 +689,6 @@ func (wc *writeRequestCtx) resetNoRows() {
|
||||||
wc.labels = wc.labels[:0]
|
wc.labels = wc.labels[:0]
|
||||||
|
|
||||||
wc.samples = wc.samples[:0]
|
wc.samples = wc.samples[:0]
|
||||||
wc.exemplars = wc.exemplars[:0]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var writeRequestCtxPool leveledWriteRequestCtxPool
|
var writeRequestCtxPool leveledWriteRequestCtxPool
|
||||||
|
@ -911,27 +909,10 @@ func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, tim
|
||||||
Value: r.Value,
|
Value: r.Value,
|
||||||
Timestamp: sampleTimestamp,
|
Timestamp: sampleTimestamp,
|
||||||
})
|
})
|
||||||
// Add Exemplars to Timeseries
|
|
||||||
exemplarsLen := len(wc.exemplars)
|
|
||||||
exemplarTagsLen := len(r.Exemplar.Tags)
|
|
||||||
if exemplarTagsLen > 0 {
|
|
||||||
exemplarLabels := make([]prompbmarshal.Label, exemplarTagsLen)
|
|
||||||
for i, label := range r.Exemplar.Tags {
|
|
||||||
exemplarLabels[i].Name = label.Key
|
|
||||||
exemplarLabels[i].Value = label.Value
|
|
||||||
}
|
|
||||||
wc.exemplars = append(wc.exemplars, prompbmarshal.Exemplar{
|
|
||||||
Labels: exemplarLabels,
|
|
||||||
Value: r.Exemplar.Value,
|
|
||||||
Timestamp: r.Exemplar.Timestamp,
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
||||||
wr := &wc.writeRequest
|
wr := &wc.writeRequest
|
||||||
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{
|
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{
|
||||||
Labels: wc.labels[labelsLen:],
|
Labels: wc.labels[labelsLen:],
|
||||||
Samples: wc.samples[len(wc.samples)-1:],
|
Samples: wc.samples[len(wc.samples)-1:],
|
||||||
Exemplars: wc.exemplars[exemplarsLen:],
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -724,11 +724,6 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) {
|
||||||
HonorLabels: true,
|
HonorLabels: true,
|
||||||
},
|
},
|
||||||
`metric{a="e",foo="bar"} 0 123`)
|
`metric{a="e",foo="bar"} 0 123`)
|
||||||
f(`metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`,
|
|
||||||
&ScrapeWork{
|
|
||||||
HonorLabels: true,
|
|
||||||
},
|
|
||||||
`metric{foo="bar"} 0 123 # {trace_id="12345"} 52 456`)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendStaleSeries(t *testing.T) {
|
func TestSendStaleSeries(t *testing.T) {
|
||||||
|
@ -786,8 +781,6 @@ func parseData(data string) []prompbmarshal.TimeSeries {
|
||||||
}
|
}
|
||||||
rows.UnmarshalWithErrLogger(data, errLogger)
|
rows.UnmarshalWithErrLogger(data, errLogger)
|
||||||
var tss []prompbmarshal.TimeSeries
|
var tss []prompbmarshal.TimeSeries
|
||||||
var exemplars []prompbmarshal.Exemplar
|
|
||||||
|
|
||||||
for _, r := range rows.Rows {
|
for _, r := range rows.Rows {
|
||||||
labels := []prompbmarshal.Label{
|
labels := []prompbmarshal.Label{
|
||||||
{
|
{
|
||||||
|
@ -801,21 +794,6 @@ func parseData(data string) []prompbmarshal.TimeSeries {
|
||||||
Value: tag.Value,
|
Value: tag.Value,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
exemplarLabels := []prompbmarshal.Label{}
|
|
||||||
if len(r.Exemplar.Tags) > 0 {
|
|
||||||
for _, tag := range r.Exemplar.Tags {
|
|
||||||
exemplarLabels = append(exemplarLabels, prompbmarshal.Label{
|
|
||||||
Name: tag.Key,
|
|
||||||
Value: tag.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
exemplars = append(exemplars, prompbmarshal.Exemplar{
|
|
||||||
Labels: exemplarLabels,
|
|
||||||
Value: r.Exemplar.Value,
|
|
||||||
Timestamp: r.Exemplar.Timestamp,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
var ts prompbmarshal.TimeSeries
|
var ts prompbmarshal.TimeSeries
|
||||||
ts.Labels = labels
|
ts.Labels = labels
|
||||||
ts.Samples = []prompbmarshal.Sample{
|
ts.Samples = []prompbmarshal.Sample{
|
||||||
|
@ -824,7 +802,6 @@ func parseData(data string) []prompbmarshal.TimeSeries {
|
||||||
Timestamp: r.Timestamp,
|
Timestamp: r.Timestamp,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
ts.Exemplars = exemplars
|
|
||||||
tss = append(tss, ts)
|
tss = append(tss, ts)
|
||||||
}
|
}
|
||||||
return tss
|
return tss
|
||||||
|
@ -889,19 +866,6 @@ func timeseriesToString(ts *prompbmarshal.TimeSeries) string {
|
||||||
}
|
}
|
||||||
s := ts.Samples[0]
|
s := ts.Samples[0]
|
||||||
fmt.Fprintf(&sb, "%g %d", s.Value, s.Timestamp)
|
fmt.Fprintf(&sb, "%g %d", s.Value, s.Timestamp)
|
||||||
// Add Exemplars to the end of string
|
|
||||||
for j, exemplar := range ts.Exemplars {
|
|
||||||
for i, label := range exemplar.Labels {
|
|
||||||
fmt.Fprintf(&sb, "%s=%q", label.Name, label.Value)
|
|
||||||
if i+1 < len(ts.Labels) {
|
|
||||||
fmt.Fprintf(&sb, ",")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Fprintf(&sb, "%g %d", exemplar.Value, exemplar.Timestamp)
|
|
||||||
if j+1 < len(ts.Exemplars) {
|
|
||||||
fmt.Fprintf(&sb, ",")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sb.String()
|
return sb.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,33 +57,12 @@ func (rs *Rows) UnmarshalWithErrLogger(s string, errLogger func(s string)) {
|
||||||
rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], noEscapes, errLogger)
|
rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], noEscapes, errLogger)
|
||||||
}
|
}
|
||||||
|
|
||||||
const tagsPrefix = '{'
|
|
||||||
const exemplarPreifx = '{'
|
|
||||||
|
|
||||||
// Exemplar Item
|
|
||||||
type Exemplar struct {
|
|
||||||
// Tags: a list of labels that uniquely identifies exemplar
|
|
||||||
Tags []Tag
|
|
||||||
// Value: the value of the exemplar
|
|
||||||
Value float64
|
|
||||||
// Timestamp: the time when exemplar was recorded
|
|
||||||
Timestamp int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset - resets the Exemplar object to defaults
|
|
||||||
func (e *Exemplar) Reset() {
|
|
||||||
e.Tags = nil
|
|
||||||
e.Value = 0
|
|
||||||
e.Timestamp = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Row is a single Prometheus row.
|
// Row is a single Prometheus row.
|
||||||
type Row struct {
|
type Row struct {
|
||||||
Metric string
|
Metric string
|
||||||
Tags []Tag
|
Tags []Tag
|
||||||
Value float64
|
Value float64
|
||||||
Timestamp int64
|
Timestamp int64
|
||||||
Exemplar Exemplar
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Row) reset() {
|
func (r *Row) reset() {
|
||||||
|
@ -91,7 +70,6 @@ func (r *Row) reset() {
|
||||||
r.Tags = nil
|
r.Tags = nil
|
||||||
r.Value = 0
|
r.Value = 0
|
||||||
r.Timestamp = 0
|
r.Timestamp = 0
|
||||||
r.Exemplar = Exemplar{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func skipTrailingComment(s string) string {
|
func skipTrailingComment(s string) string {
|
||||||
|
@ -132,140 +110,69 @@ func nextWhitespace(s string) int {
|
||||||
return n1
|
return n1
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseStringToTags(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) {
|
func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) {
|
||||||
n := strings.IndexByte(s, tagsPrefix)
|
r.reset()
|
||||||
c := strings.IndexByte(s, '#')
|
|
||||||
if c != -1 && c < n {
|
|
||||||
return s, tagsPool, nil
|
|
||||||
}
|
|
||||||
s = skipLeadingWhitespace(s)
|
s = skipLeadingWhitespace(s)
|
||||||
|
n := strings.IndexByte(s, '{')
|
||||||
if n >= 0 {
|
if n >= 0 {
|
||||||
// Tags found. Parse them.
|
// Tags found. Parse them.
|
||||||
|
r.Metric = skipTrailingWhitespace(s[:n])
|
||||||
s = s[n+1:]
|
s = s[n+1:]
|
||||||
|
tagsStart := len(tagsPool)
|
||||||
var err error
|
var err error
|
||||||
s, tagsPool, err = unmarshalTags(tagsPool, s, noEscapes)
|
s, tagsPool, err = unmarshalTags(tagsPool, s, noEscapes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return s, tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err)
|
return tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err)
|
||||||
}
|
}
|
||||||
if len(s) > 0 && s[0] == ' ' {
|
if len(s) > 0 && s[0] == ' ' {
|
||||||
// Fast path - skip whitespace.
|
// Fast path - skip whitespace.
|
||||||
s = s[1:]
|
s = s[1:]
|
||||||
}
|
}
|
||||||
}
|
tags := tagsPool[tagsStart:]
|
||||||
return s, tagsPool, nil
|
r.Tags = tags[:len(tags):len(tags)]
|
||||||
}
|
} else {
|
||||||
|
// Tags weren't found. Search for value after whitespace
|
||||||
var tvtPool = sync.Pool{New: func() interface{} {
|
|
||||||
return &tagsValueTimestamp{}
|
|
||||||
}}
|
|
||||||
|
|
||||||
func getTVT() *tagsValueTimestamp {
|
|
||||||
return tvtPool.Get().(*tagsValueTimestamp)
|
|
||||||
}
|
|
||||||
func putTVT(tvt *tagsValueTimestamp) {
|
|
||||||
tvt.reset()
|
|
||||||
tvtPool.Put(tvt)
|
|
||||||
}
|
|
||||||
|
|
||||||
type tagsValueTimestamp struct {
|
|
||||||
Prefix string
|
|
||||||
Value float64
|
|
||||||
Timestamp int64
|
|
||||||
Comments string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tvt *tagsValueTimestamp) reset() {
|
|
||||||
tvt.Prefix = ""
|
|
||||||
tvt.Value = 0
|
|
||||||
tvt.Timestamp = 0
|
|
||||||
tvt.Comments = ""
|
|
||||||
}
|
|
||||||
func parseTagsValueTimestamp(s string, tagsPool []Tag, noEscapes bool) ([]Tag, *tagsValueTimestamp, error) {
|
|
||||||
tvt := getTVT()
|
|
||||||
n := 0
|
|
||||||
// Prefix is everything up to a tag start or a space
|
|
||||||
t := strings.IndexByte(s, tagsPrefix)
|
|
||||||
// If there is no tag start process rest of string
|
|
||||||
mustParseTags := false
|
|
||||||
if t != -1 {
|
|
||||||
// Check to see if there is a space before tag
|
|
||||||
n = nextWhitespace(s)
|
n = nextWhitespace(s)
|
||||||
// If there is a space
|
if n < 0 {
|
||||||
if n > 0 {
|
return tagsPool, fmt.Errorf("missing value")
|
||||||
if n < t {
|
|
||||||
tvt.Prefix = s[:n]
|
|
||||||
s = skipLeadingWhitespace(s[n:])
|
|
||||||
// Cover the use case where there is whitespace between the prefix and the tag
|
|
||||||
if len(s) > 0 && s[0] == '{' {
|
|
||||||
mustParseTags = true
|
|
||||||
}
|
}
|
||||||
// Most likely this has an exemplar
|
r.Metric = s[:n]
|
||||||
} else {
|
s = s[n+1:]
|
||||||
tvt.Prefix = s[:t]
|
|
||||||
s = s[t:]
|
|
||||||
mustParseTags = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if mustParseTags {
|
|
||||||
var err error
|
|
||||||
s, tagsPool, err = parseStringToTags(s, tagsPool, noEscapes)
|
|
||||||
if err != nil {
|
|
||||||
return tagsPool, tvt, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Tag doesn't exist
|
|
||||||
n = nextWhitespace(s)
|
|
||||||
if n != -1 {
|
|
||||||
tvt.Prefix = s[:n]
|
|
||||||
s = s[n:]
|
|
||||||
} else {
|
|
||||||
tvt.Prefix = s
|
|
||||||
return tagsPool, tvt, fmt.Errorf("missing value")
|
|
||||||
}
|
}
|
||||||
|
if len(r.Metric) == 0 {
|
||||||
|
return tagsPool, fmt.Errorf("metric cannot be empty")
|
||||||
}
|
}
|
||||||
s = skipLeadingWhitespace(s)
|
s = skipLeadingWhitespace(s)
|
||||||
// save and remove the comments
|
|
||||||
n = strings.IndexByte(s, '#')
|
|
||||||
if n >= 0 {
|
|
||||||
tvt.Comments = s[n:]
|
|
||||||
if len(tvt.Comments) > 1 {
|
|
||||||
tvt.Comments = s[n+1:]
|
|
||||||
} else {
|
|
||||||
tvt.Comments = ""
|
|
||||||
}
|
|
||||||
s = skipTrailingComment(s)
|
s = skipTrailingComment(s)
|
||||||
s = skipLeadingWhitespace(s)
|
if len(s) == 0 {
|
||||||
s = skipTrailingWhitespace(s)
|
return tagsPool, fmt.Errorf("value cannot be empty")
|
||||||
}
|
}
|
||||||
n = nextWhitespace(s)
|
n = nextWhitespace(s)
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
// There is no timestamp.
|
// There is no timestamp.
|
||||||
v, err := fastfloat.Parse(s)
|
v, err := fastfloat.Parse(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s, err)
|
return tagsPool, fmt.Errorf("cannot parse value %q: %w", s, err)
|
||||||
}
|
}
|
||||||
tvt.Value = v
|
r.Value = v
|
||||||
return tagsPool, tvt, nil
|
return tagsPool, nil
|
||||||
}
|
}
|
||||||
// There is a timestamp
|
// There is a timestamp.
|
||||||
s = skipLeadingWhitespace(s)
|
|
||||||
v, err := fastfloat.Parse(s[:n])
|
v, err := fastfloat.Parse(s[:n])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return tagsPool, tvt, fmt.Errorf("cannot parse value %q: %w", s[:n], err)
|
return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err)
|
||||||
}
|
}
|
||||||
tvt.Value = v
|
r.Value = v
|
||||||
s = s[n:]
|
s = skipLeadingWhitespace(s[n+1:])
|
||||||
// There are some whitespaces after timestamp
|
|
||||||
s = skipLeadingWhitespace(s)
|
|
||||||
if len(s) == 0 {
|
if len(s) == 0 {
|
||||||
// There is no timestamp - just a whitespace after the value.
|
// There is no timestamp - just a whitespace after the value.
|
||||||
return tagsPool, tvt, nil
|
return tagsPool, nil
|
||||||
}
|
}
|
||||||
|
// There are some whitespaces after timestamp
|
||||||
s = skipTrailingWhitespace(s)
|
s = skipTrailingWhitespace(s)
|
||||||
ts, err := fastfloat.Parse(s)
|
ts, err := fastfloat.Parse(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return tagsPool, tvt, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
||||||
}
|
}
|
||||||
if ts >= -1<<31 && ts < 1<<31 {
|
if ts >= -1<<31 && ts < 1<<31 {
|
||||||
// This looks like OpenMetrics timestamp in Unix seconds.
|
// This looks like OpenMetrics timestamp in Unix seconds.
|
||||||
|
@ -274,68 +181,7 @@ func parseTagsValueTimestamp(s string, tagsPool []Tag, noEscapes bool) ([]Tag, *
|
||||||
// See https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps
|
// See https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md#timestamps
|
||||||
ts *= 1000
|
ts *= 1000
|
||||||
}
|
}
|
||||||
tvt.Timestamp = int64(ts)
|
r.Timestamp = int64(ts)
|
||||||
return tagsPool, tvt, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns possible comments that could be exemplars
|
|
||||||
func (r *Row) unmarshalMetric(s string, tagsPool []Tag, noEscapes bool) (string, []Tag, error) {
|
|
||||||
tagsStart := len(tagsPool)
|
|
||||||
var err error
|
|
||||||
var tvt *tagsValueTimestamp
|
|
||||||
tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes)
|
|
||||||
defer putTVT(tvt)
|
|
||||||
if err != nil {
|
|
||||||
return "", tagsPool, err
|
|
||||||
}
|
|
||||||
r.Metric = tvt.Prefix
|
|
||||||
tags := tagsPool[tagsStart:]
|
|
||||||
if len(tags) > 0 {
|
|
||||||
r.Tags = tags
|
|
||||||
}
|
|
||||||
r.Value = tvt.Value
|
|
||||||
r.Timestamp = tvt.Timestamp
|
|
||||||
return tvt.Comments, tagsPool, nil
|
|
||||||
|
|
||||||
}
|
|
||||||
func (e *Exemplar) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) {
|
|
||||||
// We can use the Comment parsed out to further parse the Exemplar
|
|
||||||
s = skipLeadingWhitespace(s)
|
|
||||||
// If we are a comment immediately followed by whitespace or a labelset
|
|
||||||
// then we are an exemplar
|
|
||||||
if len(s) != 0 && s[0] == exemplarPreifx {
|
|
||||||
var err error
|
|
||||||
var tvt *tagsValueTimestamp
|
|
||||||
tagsStart := len(tagsPool)
|
|
||||||
tagsPool, tvt, err = parseTagsValueTimestamp(s, tagsPool, noEscapes)
|
|
||||||
defer putTVT(tvt)
|
|
||||||
if err != nil {
|
|
||||||
return tagsPool, err
|
|
||||||
}
|
|
||||||
tags := tagsPool[tagsStart:]
|
|
||||||
if len(tags) > 0 {
|
|
||||||
e.Tags = tags
|
|
||||||
}
|
|
||||||
e.Value = tvt.Value
|
|
||||||
e.Timestamp = tvt.Timestamp
|
|
||||||
return tagsPool, nil
|
|
||||||
}
|
|
||||||
return tagsPool, nil
|
|
||||||
|
|
||||||
}
|
|
||||||
func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) {
|
|
||||||
r.reset()
|
|
||||||
// Parse for labels, the value and the timestamp
|
|
||||||
// Anything before labels is saved in Prefix we can use this
|
|
||||||
// for the metric name
|
|
||||||
comments, tagsPool, err := r.unmarshalMetric(s, tagsPool, noEscapes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
tagsPool, err = r.Exemplar.unmarshal(comments, tagsPool, noEscapes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return tagsPool, nil
|
return tagsPool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -362,87 +362,12 @@ cassandra_token_ownership_ratio 78.9`, &Rows{
|
||||||
Value: 56,
|
Value: 56,
|
||||||
}},
|
}},
|
||||||
})
|
})
|
||||||
// Support for Exemplars Open Metric Specification
|
|
||||||
// see: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars-1
|
// Exemplars - see https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1
|
||||||
f(`foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789`, &Rows{
|
f(`foo_bucket{le="10",a="#b"} 17 # {trace_id="oHg5SJ#YRHA0"} 9.8 1520879607.789
|
||||||
|
abc 123 456 # foobar
|
||||||
|
foo 344#bar`, &Rows{
|
||||||
Rows: []Row{
|
Rows: []Row{
|
||||||
{
|
|
||||||
Metric: "foo_bucket",
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "le",
|
|
||||||
Value: "25",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 17,
|
|
||||||
Exemplar: Exemplar{
|
|
||||||
Value: 9.8,
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "trace_id",
|
|
||||||
Value: "oHg5SJYRHA0",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Key: "log_id",
|
|
||||||
Value: "test_id",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Timestamp: 1520879607789,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}})
|
|
||||||
f(`foo_bucket{le="0.01"} 0
|
|
||||||
foo_bucket{le="0.1"} 8 # {} 0.054
|
|
||||||
foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67
|
|
||||||
foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
|
|
||||||
foo_bucket{le="25"} 17 # {trace_id="oHg5SJYRHA0", log_id="test_id"} 9.8 1520879607.789
|
|
||||||
foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
|
|
||||||
foo_bucket{le="+Inf"} 17
|
|
||||||
foo_count 17
|
|
||||||
foo_sum 324789.3
|
|
||||||
foo_created 1520430000.123`, &Rows{
|
|
||||||
Rows: []Row{
|
|
||||||
{
|
|
||||||
Metric: "foo_bucket",
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "le",
|
|
||||||
Value: "0.01",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Metric: "foo_bucket",
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "le",
|
|
||||||
Value: "0.1",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 8,
|
|
||||||
Exemplar: Exemplar{
|
|
||||||
Value: 0.054,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Metric: "foo_bucket",
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "le",
|
|
||||||
Value: "1",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 11,
|
|
||||||
Exemplar: Exemplar{
|
|
||||||
Value: 0.67,
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "trace_id",
|
|
||||||
Value: "KOO5S4vxi0o",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
Metric: "foo_bucket",
|
Metric: "foo_bucket",
|
||||||
Tags: []Tag{
|
Tags: []Tag{
|
||||||
|
@ -450,84 +375,21 @@ foo_created 1520430000.123`, &Rows{
|
||||||
Key: "le",
|
Key: "le",
|
||||||
Value: "10",
|
Value: "10",
|
||||||
},
|
},
|
||||||
},
|
|
||||||
Value: 17,
|
|
||||||
Exemplar: Exemplar{
|
|
||||||
Value: 9.8,
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
{
|
||||||
Key: "trace_id",
|
Key: "a",
|
||||||
Value: "oHg5SJYRHA0",
|
Value: "#b",
|
||||||
},
|
|
||||||
},
|
|
||||||
Timestamp: 1520879607789,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Metric: "foo_bucket",
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "le",
|
|
||||||
Value: "25",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 17,
|
|
||||||
Exemplar: Exemplar{
|
|
||||||
Value: 9.8,
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "trace_id",
|
|
||||||
Value: "oHg5SJYRHA0",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Key: "log_id",
|
|
||||||
Value: "test_id",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Timestamp: 1520879607789,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Metric: "foo_bucket",
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "nospace",
|
|
||||||
Value: "exemplar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Value: 17,
|
|
||||||
Exemplar: Exemplar{
|
|
||||||
Value: 9.8,
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "trace_id",
|
|
||||||
Value: "oHg5SJYRHA0",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Timestamp: 1520879607789,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Metric: "foo_bucket",
|
|
||||||
Tags: []Tag{
|
|
||||||
{
|
|
||||||
Key: "le",
|
|
||||||
Value: "+Inf",
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Value: 17,
|
Value: 17,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Metric: "foo_count",
|
Metric: "abc",
|
||||||
Value: 17,
|
Value: 123,
|
||||||
|
Timestamp: 456000,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Metric: "foo_sum",
|
Metric: "foo",
|
||||||
Value: 324789.3,
|
Value: 344,
|
||||||
},
|
|
||||||
{
|
|
||||||
Metric: "foo_created",
|
|
||||||
Value: 1520430000.123,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
|
@ -146,10 +146,10 @@ container_ulimits_soft{container="kube-scheduler",id="/kubelet/kubepods/burstabl
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkRowsUnmarshal(b *testing.B) {
|
func BenchmarkRowsUnmarshal(b *testing.B) {
|
||||||
s := `foo_bucket{le="0.01"} 0
|
s := `cpu_usage{mode="user"} 1.23
|
||||||
foo_bucket{le="1"} 11 # {trace_id="KOO5S4vxi0o"} 0.67
|
cpu_usage{mode="system"} 23.344
|
||||||
foo_bucket{le="10"} 17 # {trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
|
cpu_usage{mode="iowait"} 3.3443
|
||||||
foo_bucket{nospace="exemplar"} 17 #{trace_id="oHg5SJYRHA0"} 9.8 1520879607.789
|
cpu_usage{mode="irq"} 0.34432
|
||||||
`
|
`
|
||||||
b.SetBytes(int64(len(s)))
|
b.SetBytes(int64(len(s)))
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
|
|
Loading…
Reference in a new issue