VictoriaMetrics/lib/prompb/prompb.go
2024-01-14 22:46:06 +02:00

214 lines
5 KiB
Go

package prompb
import (
"fmt"
"github.com/VictoriaMetrics/easyproto"
)
// WriteRequest represents Prometheus remote write API request.
type WriteRequest struct {
// Timeseries is a list of time series in the given WriteRequest
Timeseries []TimeSeries
labelsPool []Label
samplesPool []Sample
}
// Reset resets wr for subsequent re-use.
func (wr *WriteRequest) Reset() {
tss := wr.Timeseries
for i := range tss {
tss[i] = TimeSeries{}
}
wr.Timeseries = tss[:0]
labelsPool := wr.labelsPool
for i := range labelsPool {
labelsPool[i] = Label{}
}
wr.labelsPool = labelsPool[:0]
samplesPool := wr.samplesPool
for i := range samplesPool {
samplesPool[i] = Sample{}
}
wr.samplesPool = samplesPool[:0]
}
// TimeSeries is a timeseries.
type TimeSeries struct {
// Labels is a list of labels for the given TimeSeries
Labels []Label
// Samples is a list of samples for the given TimeSeries
Samples []Sample
}
// Sample is a timeseries sample.
type Sample struct {
// Value is sample value.
Value float64
// Timestamp is unix timestamp for the sample in milliseconds.
Timestamp int64
}
// Label is a timeseries label.
type Label struct {
// Name is label name.
Name string
// Value is label value.
Value string
}
// UnmarshalProtobuf unmarshals wr from src.
//
// src mustn't change while wr is in use, since wr points to src.
func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
wr.Reset()
// message WriteRequest {
// repeated TimeSeries timeseries = 1;
// }
tss := wr.Timeseries
labelsPool := wr.labelsPool
samplesPool := wr.samplesPool
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read timeseries data")
}
if len(tss) < cap(tss) {
tss = tss[:len(tss)+1]
} else {
tss = append(tss, TimeSeries{})
}
ts := &tss[len(tss)-1]
labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool)
if err != nil {
return fmt.Errorf("cannot unmarshal timeseries: %w", err)
}
}
}
wr.Timeseries = tss
wr.labelsPool = labelsPool
wr.samplesPool = samplesPool
return nil
}
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) {
// message TimeSeries {
// repeated Label labels = 1;
// repeated Sample samples = 2;
// }
labelsPoolLen := len(labelsPool)
samplesPoolLen := len(samplesPool)
var fc easyproto.FieldContext
for len(src) > 0 {
var err error
src, err = fc.NextField(src)
if err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read label data")
}
if len(labelsPool) < cap(labelsPool) {
labelsPool = labelsPool[:len(labelsPool)+1]
} else {
labelsPool = append(labelsPool, Label{})
}
label := &labelsPool[len(labelsPool)-1]
if err := label.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data")
}
if len(samplesPool) < cap(samplesPool) {
samplesPool = samplesPool[:len(samplesPool)+1]
} else {
samplesPool = append(samplesPool, Sample{})
}
sample := &samplesPool[len(samplesPool)-1]
if err := sample.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
}
}
}
ts.Labels = labelsPool[labelsPoolLen:]
ts.Samples = samplesPool[samplesPoolLen:]
return labelsPool, samplesPool, nil
}
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
// message Label {
// string name = 1;
// string value = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read label name")
}
lbl.Name = name
case 2:
value, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read label value")
}
lbl.Value = value
}
}
return nil
}
func (s *Sample) unmarshalProtobuf(src []byte) (err error) {
// message Sample {
// double value = 1;
// int64 timestamp = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
value, ok := fc.Double()
if !ok {
return fmt.Errorf("cannot read sample value")
}
s.Value = value
case 2:
timestamp, ok := fc.Int64()
if !ok {
return fmt.Errorf("cannot read sample timestamp")
}
s.Timestamp = timestamp
}
}
return nil
}