mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
214 lines
5 KiB
Go
214 lines
5 KiB
Go
package prompb
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/VictoriaMetrics/easyproto"
|
|
)
|
|
|
|
// WriteRequest represents Prometheus remote write API request.
|
|
type WriteRequest struct {
|
|
// Timeseries is a list of time series in the given WriteRequest
|
|
Timeseries []TimeSeries
|
|
|
|
labelsPool []Label
|
|
samplesPool []Sample
|
|
}
|
|
|
|
// Reset resets wr for subsequent re-use.
|
|
func (wr *WriteRequest) Reset() {
|
|
tss := wr.Timeseries
|
|
for i := range tss {
|
|
tss[i] = TimeSeries{}
|
|
}
|
|
wr.Timeseries = tss[:0]
|
|
|
|
labelsPool := wr.labelsPool
|
|
for i := range labelsPool {
|
|
labelsPool[i] = Label{}
|
|
}
|
|
wr.labelsPool = labelsPool[:0]
|
|
|
|
samplesPool := wr.samplesPool
|
|
for i := range samplesPool {
|
|
samplesPool[i] = Sample{}
|
|
}
|
|
wr.samplesPool = samplesPool[:0]
|
|
}
|
|
|
|
// TimeSeries is a timeseries.
|
|
type TimeSeries struct {
|
|
// Labels is a list of labels for the given TimeSeries
|
|
Labels []Label
|
|
|
|
// Samples is a list of samples for the given TimeSeries
|
|
Samples []Sample
|
|
}
|
|
|
|
// Sample is a timeseries sample.
|
|
type Sample struct {
|
|
// Value is sample value.
|
|
Value float64
|
|
|
|
// Timestamp is unix timestamp for the sample in milliseconds.
|
|
Timestamp int64
|
|
}
|
|
|
|
// Label is a timeseries label.
|
|
type Label struct {
|
|
// Name is label name.
|
|
Name string
|
|
|
|
// Value is label value.
|
|
Value string
|
|
}
|
|
|
|
// UnmarshalProtobuf unmarshals wr from src.
|
|
//
|
|
// src mustn't change while wr is in use, since wr points to src.
|
|
func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
|
|
wr.Reset()
|
|
|
|
// message WriteRequest {
|
|
// repeated TimeSeries timeseries = 1;
|
|
// }
|
|
tss := wr.Timeseries
|
|
labelsPool := wr.labelsPool
|
|
samplesPool := wr.samplesPool
|
|
var fc easyproto.FieldContext
|
|
for len(src) > 0 {
|
|
src, err = fc.NextField(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read the next field: %w", err)
|
|
}
|
|
switch fc.FieldNum {
|
|
case 1:
|
|
data, ok := fc.MessageData()
|
|
if !ok {
|
|
return fmt.Errorf("cannot read timeseries data")
|
|
}
|
|
if len(tss) < cap(tss) {
|
|
tss = tss[:len(tss)+1]
|
|
} else {
|
|
tss = append(tss, TimeSeries{})
|
|
}
|
|
ts := &tss[len(tss)-1]
|
|
labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal timeseries: %w", err)
|
|
}
|
|
}
|
|
}
|
|
wr.Timeseries = tss
|
|
wr.labelsPool = labelsPool
|
|
wr.samplesPool = samplesPool
|
|
return nil
|
|
}
|
|
|
|
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) {
|
|
// message TimeSeries {
|
|
// repeated Label labels = 1;
|
|
// repeated Sample samples = 2;
|
|
// }
|
|
labelsPoolLen := len(labelsPool)
|
|
samplesPoolLen := len(samplesPool)
|
|
var fc easyproto.FieldContext
|
|
for len(src) > 0 {
|
|
var err error
|
|
src, err = fc.NextField(src)
|
|
if err != nil {
|
|
return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
|
|
}
|
|
switch fc.FieldNum {
|
|
case 1:
|
|
data, ok := fc.MessageData()
|
|
if !ok {
|
|
return labelsPool, samplesPool, fmt.Errorf("cannot read label data")
|
|
}
|
|
if len(labelsPool) < cap(labelsPool) {
|
|
labelsPool = labelsPool[:len(labelsPool)+1]
|
|
} else {
|
|
labelsPool = append(labelsPool, Label{})
|
|
}
|
|
label := &labelsPool[len(labelsPool)-1]
|
|
if err := label.unmarshalProtobuf(data); err != nil {
|
|
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
|
|
}
|
|
case 2:
|
|
data, ok := fc.MessageData()
|
|
if !ok {
|
|
return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data")
|
|
}
|
|
if len(samplesPool) < cap(samplesPool) {
|
|
samplesPool = samplesPool[:len(samplesPool)+1]
|
|
} else {
|
|
samplesPool = append(samplesPool, Sample{})
|
|
}
|
|
sample := &samplesPool[len(samplesPool)-1]
|
|
if err := sample.unmarshalProtobuf(data); err != nil {
|
|
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
|
|
}
|
|
}
|
|
}
|
|
ts.Labels = labelsPool[labelsPoolLen:]
|
|
ts.Samples = samplesPool[samplesPoolLen:]
|
|
return labelsPool, samplesPool, nil
|
|
}
|
|
|
|
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
|
|
// message Label {
|
|
// string name = 1;
|
|
// string value = 2;
|
|
// }
|
|
var fc easyproto.FieldContext
|
|
for len(src) > 0 {
|
|
src, err = fc.NextField(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read the next field: %w", err)
|
|
}
|
|
switch fc.FieldNum {
|
|
case 1:
|
|
name, ok := fc.String()
|
|
if !ok {
|
|
return fmt.Errorf("cannot read label name")
|
|
}
|
|
lbl.Name = name
|
|
case 2:
|
|
value, ok := fc.String()
|
|
if !ok {
|
|
return fmt.Errorf("cannot read label value")
|
|
}
|
|
lbl.Value = value
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Sample) unmarshalProtobuf(src []byte) (err error) {
|
|
// message Sample {
|
|
// double value = 1;
|
|
// int64 timestamp = 2;
|
|
// }
|
|
var fc easyproto.FieldContext
|
|
for len(src) > 0 {
|
|
src, err = fc.NextField(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read the next field: %w", err)
|
|
}
|
|
switch fc.FieldNum {
|
|
case 1:
|
|
value, ok := fc.Double()
|
|
if !ok {
|
|
return fmt.Errorf("cannot read sample value")
|
|
}
|
|
s.Value = value
|
|
case 2:
|
|
timestamp, ok := fc.Int64()
|
|
if !ok {
|
|
return fmt.Errorf("cannot read sample timestamp")
|
|
}
|
|
s.Timestamp = timestamp
|
|
}
|
|
}
|
|
return nil
|
|
}
|