lib/prompb: switch to github.com/VictoriaMetrics/easyproto

This commit is contained in:
Aliaksandr Valialkin 2024-01-14 22:46:06 +02:00
parent f2229c2e42
commit c005245741
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
10 changed files with 487 additions and 745 deletions

View file

@ -140,7 +140,7 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
return return
} }
wr := &prompb.WriteRequest{} wr := &prompb.WriteRequest{}
if err := wr.Unmarshal(b); err != nil { if err := wr.UnmarshalProtobuf(b); err != nil {
rw.err(w, fmt.Errorf("unmarhsal err: %w", err)) rw.err(w, fmt.Errorf("unmarhsal err: %w", err))
return return
} }

214
lib/prompb/prompb.go Normal file
View file

@ -0,0 +1,214 @@
package prompb
import (
"fmt"
"github.com/VictoriaMetrics/easyproto"
)
// WriteRequest represents Prometheus remote write API request.
type WriteRequest struct {
// Timeseries is a list of time series in the given WriteRequest
Timeseries []TimeSeries
labelsPool []Label
samplesPool []Sample
}
// Reset resets wr for subsequent re-use.
func (wr *WriteRequest) Reset() {
tss := wr.Timeseries
for i := range tss {
tss[i] = TimeSeries{}
}
wr.Timeseries = tss[:0]
labelsPool := wr.labelsPool
for i := range labelsPool {
labelsPool[i] = Label{}
}
wr.labelsPool = labelsPool[:0]
samplesPool := wr.samplesPool
for i := range samplesPool {
samplesPool[i] = Sample{}
}
wr.samplesPool = samplesPool[:0]
}
// TimeSeries is a timeseries.
type TimeSeries struct {
// Labels is a list of labels for the given TimeSeries
Labels []Label
// Samples is a list of samples for the given TimeSeries
Samples []Sample
}
// Sample is a timeseries sample.
type Sample struct {
// Value is sample value.
Value float64
// Timestamp is unix timestamp for the sample in milliseconds.
Timestamp int64
}
// Label is a timeseries label.
type Label struct {
// Name is label name.
Name string
// Value is label value.
Value string
}
// UnmarshalProtobuf unmarshals wr from src.
//
// src mustn't change while wr is in use, since wr points to src.
func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) {
wr.Reset()
// message WriteRequest {
// repeated TimeSeries timeseries = 1;
// }
tss := wr.Timeseries
labelsPool := wr.labelsPool
samplesPool := wr.samplesPool
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read timeseries data")
}
if len(tss) < cap(tss) {
tss = tss[:len(tss)+1]
} else {
tss = append(tss, TimeSeries{})
}
ts := &tss[len(tss)-1]
labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool)
if err != nil {
return fmt.Errorf("cannot unmarshal timeseries: %w", err)
}
}
}
wr.Timeseries = tss
wr.labelsPool = labelsPool
wr.samplesPool = samplesPool
return nil
}
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) {
// message TimeSeries {
// repeated Label labels = 1;
// repeated Sample samples = 2;
// }
labelsPoolLen := len(labelsPool)
samplesPoolLen := len(samplesPool)
var fc easyproto.FieldContext
for len(src) > 0 {
var err error
src, err = fc.NextField(src)
if err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read label data")
}
if len(labelsPool) < cap(labelsPool) {
labelsPool = labelsPool[:len(labelsPool)+1]
} else {
labelsPool = append(labelsPool, Label{})
}
label := &labelsPool[len(labelsPool)-1]
if err := label.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data")
}
if len(samplesPool) < cap(samplesPool) {
samplesPool = samplesPool[:len(samplesPool)+1]
} else {
samplesPool = append(samplesPool, Sample{})
}
sample := &samplesPool[len(samplesPool)-1]
if err := sample.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
}
}
}
ts.Labels = labelsPool[labelsPoolLen:]
ts.Samples = samplesPool[samplesPoolLen:]
return labelsPool, samplesPool, nil
}
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
// message Label {
// string name = 1;
// string value = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read label name")
}
lbl.Name = name
case 2:
value, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read label value")
}
lbl.Value = value
}
}
return nil
}
func (s *Sample) unmarshalProtobuf(src []byte) (err error) {
// message Sample {
// double value = 1;
// int64 timestamp = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
value, ok := fc.Double()
if !ok {
return fmt.Errorf("cannot read sample value")
}
s.Value = value
case 2:
timestamp, ok := fc.Int64()
if !ok {
return fmt.Errorf("cannot read sample timestamp")
}
s.Timestamp = timestamp
}
}
return nil
}

188
lib/prompb/prompb_test.go Normal file
View file

@ -0,0 +1,188 @@
package prompb_test
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
var wr prompb.WriteRequest
f := func(data []byte) {
t.Helper()
// Verify that the marshaled protobuf is unmarshaled properly
if err := wr.UnmarshalProtobuf(data); err != nil {
t.Fatalf("cannot unmarshal protobuf: %s", err)
}
// Compare the unmarshaled wr with the original wrm.
var wrm prompbmarshal.WriteRequest
for _, ts := range wr.Timeseries {
var labels []prompbmarshal.Label
for _, label := range ts.Labels {
labels = append(labels, prompbmarshal.Label{
Name: label.Name,
Value: label.Value,
})
}
var samples []prompbmarshal.Sample
for _, sample := range ts.Samples {
samples = append(samples, prompbmarshal.Sample{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
}
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples,
})
}
dataResult, err := wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !bytes.Equal(dataResult, data) {
t.Fatalf("unexpected data obtained after marshaling\ngot\n%X\nwant\n%X", dataResult, data)
}
}
wrm := &prompbmarshal.WriteRequest{}
data, err := wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Labels: []prompbmarshal.Label{
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
{
Name: "instance",
Value: "host-123:4567",
},
{
Name: "job",
Value: "node-exporter",
},
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Samples: []prompbmarshal.Sample{
{
Value: 123.3434,
Timestamp: 8939432423,
},
{
Value: -123.3434,
Timestamp: 18939432423,
},
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Labels: []prompbmarshal.Label{
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
{
Name: "instance",
Value: "host-123:4567",
},
{
Name: "job",
Value: "node-exporter",
},
},
Samples: []prompbmarshal.Sample{
{
Value: 123.3434,
Timestamp: 8939432423,
},
{
Value: -123.3434,
Timestamp: 18939432423,
},
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Labels: []prompbmarshal.Label{
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
{
Name: "instance",
Value: "host-123:4567",
},
{
Name: "job",
Value: "node-exporter",
},
},
Samples: []prompbmarshal.Sample{
{
Value: 123.3434,
Timestamp: 8939432423,
},
{
Value: -123.3434,
Timestamp: 18939432423,
},
},
},
{
Labels: []prompbmarshal.Label{
{
Name: "foo",
Value: "bar",
},
},
Samples: []prompbmarshal.Sample{
{
Value: 9873,
},
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
f(data)
}

View file

@ -0,0 +1,83 @@
package prompb
import (
"fmt"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func BenchmarkWriteRequestUnmarshalProtobuf(b *testing.B) {
data, err := benchWriteRequest.Marshal()
if err != nil {
b.Fatalf("unexpected error: %s", err)
}
b.ReportAllocs()
b.SetBytes(int64(len(benchWriteRequest.Timeseries)))
b.RunParallel(func(pb *testing.PB) {
var wr WriteRequest
for pb.Next() {
if err := wr.UnmarshalProtobuf(data); err != nil {
panic(fmt.Errorf("unexpected error: %s", err))
}
}
})
}
var benchWriteRequest = func() *prompbmarshal.WriteRequest {
var tss []prompbmarshal.TimeSeries
for i := 0; i < 10_000; i++ {
ts := prompbmarshal.TimeSeries{
Labels: []prompbmarshal.Label{
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
{
Name: "instance",
Value: fmt.Sprintf("host-%d:4567", i),
},
{
Name: "job",
Value: "node-exporter",
},
{
Name: "pod",
Value: "foo-bar-pod-8983423843",
},
{
Name: "cpu",
Value: "1",
},
{
Name: "mode",
Value: "system",
},
{
Name: "node",
Value: "host-123",
},
{
Name: "namespace",
Value: "foo-bar-baz",
},
{
Name: "container",
Value: fmt.Sprintf("aaa-bb-cc-dd-ee-%d", i),
},
},
Samples: []prompbmarshal.Sample{
{
Value: float64(i),
Timestamp: 1e9 + int64(i)*1000,
},
},
}
tss = append(tss, ts)
}
wrm := &prompbmarshal.WriteRequest{
Timeseries: tss,
}
return wrm
}()

View file

@ -1,210 +0,0 @@
// Code generated from remote.proto
package prompb
import (
"fmt"
"io"
)
// WriteRequest represents Prometheus remote write API request
type WriteRequest struct {
Timeseries []TimeSeries
labelsPool []Label
samplesPool []Sample
}
// Unmarshal unmarshals m from dAtA.
func (m *WriteRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return errIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return errIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return errInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if cap(m.Timeseries) > len(m.Timeseries) {
m.Timeseries = m.Timeseries[:len(m.Timeseries)+1]
} else {
m.Timeseries = append(m.Timeseries, TimeSeries{})
}
ts := &m.Timeseries[len(m.Timeseries)-1]
var err error
m.labelsPool, m.samplesPool, err = ts.Unmarshal(dAtA[iNdEx:postIndex], m.labelsPool, m.samplesPool)
if err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return errInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipRemote(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, errInvalidLengthRemote
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
start := iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipRemote(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
errInvalidLengthRemote = fmt.Errorf("proto: negative length found during unmarshaling")
errIntOverflowRemote = fmt.Errorf("proto: integer overflow")
)

View file

@ -1,23 +0,0 @@
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus;
option go_package = "prompb";
import "types.proto";
message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
}

View file

@ -1,457 +0,0 @@
// Code generated manually from types.proto
package prompb
import (
"encoding/binary"
"fmt"
"io"
"math"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// Sample is a timeseries sample.
type Sample struct {
Value float64
Timestamp int64
}
// TimeSeries is a timeseries.
type TimeSeries struct {
Labels []Label
Samples []Sample
}
// Label is a timeseries label
type Label struct {
Name string
Value string
}
// Unmarshal unmarshals sample from dAtA.
func (m *Sample) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return errIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Sample: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Sample: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 1 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
}
v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.Value = float64(math.Float64frombits(v))
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
m.Timestamp = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return errIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Timestamp |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return errInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
// Unmarshal unmarshals timeseries from dAtA.
func (m *TimeSeries) Unmarshal(dAtA []byte, dstLabels []Label, dstSamples []Sample) ([]Label, []Sample, error) {
labelsStart := len(dstLabels)
samplesStart := len(dstSamples)
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return dstLabels, dstSamples, errIntOverflowTypes
}
if iNdEx >= l {
return dstLabels, dstSamples, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return dstLabels, dstSamples, fmt.Errorf("proto: TimeSeries: wiretype end group for non-group")
}
if fieldNum <= 0 {
return dstLabels, dstSamples, fmt.Errorf("proto: TimeSeries: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return dstLabels, dstSamples, fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return dstLabels, dstSamples, errIntOverflowTypes
}
if iNdEx >= l {
return dstLabels, dstSamples, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return dstLabels, dstSamples, errInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex > l {
return dstLabels, dstSamples, io.ErrUnexpectedEOF
}
if cap(dstLabels) > len(dstLabels) {
dstLabels = dstLabels[:len(dstLabels)+1]
} else {
dstLabels = append(dstLabels, Label{})
}
lb := &dstLabels[len(dstLabels)-1]
if err := lb.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return dstLabels, dstSamples, err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return dstLabels, dstSamples, fmt.Errorf("proto: wrong wireType = %d for field Samples", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return dstLabels, dstSamples, errIntOverflowTypes
}
if iNdEx >= l {
return dstLabels, dstSamples, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return dstLabels, dstSamples, errInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex > l {
return dstLabels, dstSamples, io.ErrUnexpectedEOF
}
if cap(dstSamples) > len(dstSamples) {
dstSamples = dstSamples[:len(dstSamples)+1]
} else {
dstSamples = append(dstSamples, Sample{})
}
s := &dstSamples[len(dstSamples)-1]
if err := s.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return dstLabels, dstSamples, err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return dstLabels, dstSamples, err
}
if skippy < 0 {
return dstLabels, dstSamples, errInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return dstLabels, dstSamples, io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return dstLabels, dstSamples, io.ErrUnexpectedEOF
}
m.Labels = dstLabels[labelsStart:]
m.Samples = dstSamples[samplesStart:]
return dstLabels, dstSamples, nil
}
// Unmarshal unmarshals Label from dAtA.
func (m *Label) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return errIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Label: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Label: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return errIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return errInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = bytesutil.ToUnsafeString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return errIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return errInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = bytesutil.ToUnsafeString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return errInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipTypes(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowTypes
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowTypes
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowTypes
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, errInvalidLengthTypes
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
start := iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, errIntOverflowTypes
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipTypes(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
errInvalidLengthTypes = fmt.Errorf("proto: negative length found during unmarshaling")
errIntOverflowTypes = fmt.Errorf("proto: integer overflow")
)

View file

@ -1,34 +0,0 @@
// Copyright 2017 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus;
option go_package = "prompb";
import "gogoproto/gogo.proto";
message Sample {
double value = 1;
int64 timestamp = 2;
}
message TimeSeries {
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
}
message Label {
string name = 1;
string value = 2;
}

View file

@ -1,19 +0,0 @@
package prompb
// Reset resets wr.
func (wr *WriteRequest) Reset() {
for i := range wr.Timeseries {
wr.Timeseries[i] = TimeSeries{}
}
wr.Timeseries = wr.Timeseries[:0]
for i := range wr.labelsPool {
wr.labelsPool[i] = Label{}
}
wr.labelsPool = wr.labelsPool[:0]
for i := range wr.samplesPool {
wr.samplesPool[i] = Sample{}
}
wr.samplesPool = wr.samplesPool[:0]
}

View file

@ -69,7 +69,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
} }
wr := getWriteRequest() wr := getWriteRequest()
defer putWriteRequest(wr) defer putWriteRequest(wr)
if err := wr.Unmarshal(bb.B); err != nil { if err := wr.UnmarshalProtobuf(bb.B); err != nil {
unmarshalErrors.Inc() unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(bb.B), err) return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(bb.B), err)
} }