From c005245741fc3d7d744f258959be2a5ae388f8ec Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 14 Jan 2024 22:46:06 +0200 Subject: [PATCH] lib/prompb: switch to github.com/VictoriaMetrics/easyproto --- app/vmalert/remotewrite/client_test.go | 2 +- lib/prompb/prompb.go | 214 ++++++++ lib/prompb/prompb_test.go | 188 +++++++ lib/prompb/prompb_timing_test.go | 83 ++++ lib/prompb/remote.pb.go | 210 -------- lib/prompb/remote.proto | 23 - lib/prompb/types.pb.go | 457 ------------------ lib/prompb/types.proto | 34 -- lib/prompb/util.go | 19 - .../promremotewrite/stream/streamparser.go | 2 +- 10 files changed, 487 insertions(+), 745 deletions(-) create mode 100644 lib/prompb/prompb.go create mode 100644 lib/prompb/prompb_test.go create mode 100644 lib/prompb/prompb_timing_test.go delete mode 100644 lib/prompb/remote.pb.go delete mode 100644 lib/prompb/remote.proto delete mode 100644 lib/prompb/types.pb.go delete mode 100644 lib/prompb/types.proto delete mode 100644 lib/prompb/util.go diff --git a/app/vmalert/remotewrite/client_test.go b/app/vmalert/remotewrite/client_test.go index ec2a9ea4c..5a9461c24 100644 --- a/app/vmalert/remotewrite/client_test.go +++ b/app/vmalert/remotewrite/client_test.go @@ -140,7 +140,7 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) { return } wr := &prompb.WriteRequest{} - if err := wr.Unmarshal(b); err != nil { + if err := wr.UnmarshalProtobuf(b); err != nil { rw.err(w, fmt.Errorf("unmarhsal err: %w", err)) return } diff --git a/lib/prompb/prompb.go b/lib/prompb/prompb.go new file mode 100644 index 000000000..34a1c5716 --- /dev/null +++ b/lib/prompb/prompb.go @@ -0,0 +1,214 @@ +package prompb + +import ( + "fmt" + + "github.com/VictoriaMetrics/easyproto" +) + +// WriteRequest represents Prometheus remote write API request. +type WriteRequest struct { + // Timeseries is a list of time series in the given WriteRequest + Timeseries []TimeSeries + + labelsPool []Label + samplesPool []Sample +} + +// Reset resets wr for subsequent re-use. +func (wr *WriteRequest) Reset() { + tss := wr.Timeseries + for i := range tss { + tss[i] = TimeSeries{} + } + wr.Timeseries = tss[:0] + + labelsPool := wr.labelsPool + for i := range labelsPool { + labelsPool[i] = Label{} + } + wr.labelsPool = labelsPool[:0] + + samplesPool := wr.samplesPool + for i := range samplesPool { + samplesPool[i] = Sample{} + } + wr.samplesPool = samplesPool[:0] +} + +// TimeSeries is a timeseries. +type TimeSeries struct { + // Labels is a list of labels for the given TimeSeries + Labels []Label + + // Samples is a list of samples for the given TimeSeries + Samples []Sample +} + +// Sample is a timeseries sample. +type Sample struct { + // Value is sample value. + Value float64 + + // Timestamp is unix timestamp for the sample in milliseconds. + Timestamp int64 +} + +// Label is a timeseries label. +type Label struct { + // Name is label name. + Name string + + // Value is label value. + Value string +} + +// UnmarshalProtobuf unmarshals wr from src. +// +// src mustn't change while wr is in use, since wr points to src. +func (wr *WriteRequest) UnmarshalProtobuf(src []byte) (err error) { + wr.Reset() + + // message WriteRequest { + // repeated TimeSeries timeseries = 1; + // } + tss := wr.Timeseries + labelsPool := wr.labelsPool + samplesPool := wr.samplesPool + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read the next field: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read timeseries data") + } + if len(tss) < cap(tss) { + tss = tss[:len(tss)+1] + } else { + tss = append(tss, TimeSeries{}) + } + ts := &tss[len(tss)-1] + labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool) + if err != nil { + return fmt.Errorf("cannot unmarshal timeseries: %w", err) + } + } + } + wr.Timeseries = tss + wr.labelsPool = labelsPool + wr.samplesPool = samplesPool + return nil +} + +func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) { + // message TimeSeries { + // repeated Label labels = 1; + // repeated Sample samples = 2; + // } + labelsPoolLen := len(labelsPool) + samplesPoolLen := len(samplesPool) + var fc easyproto.FieldContext + for len(src) > 0 { + var err error + src, err = fc.NextField(src) + if err != nil { + return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return labelsPool, samplesPool, fmt.Errorf("cannot read label data") + } + if len(labelsPool) < cap(labelsPool) { + labelsPool = labelsPool[:len(labelsPool)+1] + } else { + labelsPool = append(labelsPool, Label{}) + } + label := &labelsPool[len(labelsPool)-1] + if err := label.unmarshalProtobuf(data); err != nil { + return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err) + } + case 2: + data, ok := fc.MessageData() + if !ok { + return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data") + } + if len(samplesPool) < cap(samplesPool) { + samplesPool = samplesPool[:len(samplesPool)+1] + } else { + samplesPool = append(samplesPool, Sample{}) + } + sample := &samplesPool[len(samplesPool)-1] + if err := sample.unmarshalProtobuf(data); err != nil { + return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err) + } + } + } + ts.Labels = labelsPool[labelsPoolLen:] + ts.Samples = samplesPool[samplesPoolLen:] + return labelsPool, samplesPool, nil +} + +func (lbl *Label) unmarshalProtobuf(src []byte) (err error) { + // message Label { + // string name = 1; + // string value = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read the next field: %w", err) + } + switch fc.FieldNum { + case 1: + name, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read label name") + } + lbl.Name = name + case 2: + value, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read label value") + } + lbl.Value = value + } + } + return nil +} + +func (s *Sample) unmarshalProtobuf(src []byte) (err error) { + // message Sample { + // double value = 1; + // int64 timestamp = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read the next field: %w", err) + } + switch fc.FieldNum { + case 1: + value, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read sample value") + } + s.Value = value + case 2: + timestamp, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read sample timestamp") + } + s.Timestamp = timestamp + } + } + return nil +} diff --git a/lib/prompb/prompb_test.go b/lib/prompb/prompb_test.go new file mode 100644 index 000000000..6e4e04e45 --- /dev/null +++ b/lib/prompb/prompb_test.go @@ -0,0 +1,188 @@ +package prompb_test + +import ( + "bytes" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestWriteRequestUnmarshalProtobuf(t *testing.T) { + var wr prompb.WriteRequest + + f := func(data []byte) { + t.Helper() + + // Verify that the marshaled protobuf is unmarshaled properly + if err := wr.UnmarshalProtobuf(data); err != nil { + t.Fatalf("cannot unmarshal protobuf: %s", err) + } + + // Compare the unmarshaled wr with the original wrm. + var wrm prompbmarshal.WriteRequest + for _, ts := range wr.Timeseries { + var labels []prompbmarshal.Label + for _, label := range ts.Labels { + labels = append(labels, prompbmarshal.Label{ + Name: label.Name, + Value: label.Value, + }) + } + var samples []prompbmarshal.Sample + for _, sample := range ts.Samples { + samples = append(samples, prompbmarshal.Sample{ + Value: sample.Value, + Timestamp: sample.Timestamp, + }) + } + wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ + Labels: labels, + Samples: samples, + }) + } + dataResult, err := wrm.Marshal() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !bytes.Equal(dataResult, data) { + t.Fatalf("unexpected data obtained after marshaling\ngot\n%X\nwant\n%X", dataResult, data) + } + } + + wrm := &prompbmarshal.WriteRequest{} + data, err := wrm.Marshal() + if err != nil { + t.Fatalf("unexpected error") + } + f(data) + + wrm = &prompbmarshal.WriteRequest{} + wrm.Timeseries = []prompbmarshal.TimeSeries{ + { + Labels: []prompbmarshal.Label{ + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + { + Name: "instance", + Value: "host-123:4567", + }, + { + Name: "job", + Value: "node-exporter", + }, + }, + }, + } + data, err = wrm.Marshal() + if err != nil { + t.Fatalf("unexpected error") + } + f(data) + + wrm = &prompbmarshal.WriteRequest{} + wrm.Timeseries = []prompbmarshal.TimeSeries{ + { + Samples: []prompbmarshal.Sample{ + { + Value: 123.3434, + Timestamp: 8939432423, + }, + { + Value: -123.3434, + Timestamp: 18939432423, + }, + }, + }, + } + data, err = wrm.Marshal() + if err != nil { + t.Fatalf("unexpected error") + } + f(data) + + wrm = &prompbmarshal.WriteRequest{} + wrm.Timeseries = []prompbmarshal.TimeSeries{ + { + Labels: []prompbmarshal.Label{ + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + { + Name: "instance", + Value: "host-123:4567", + }, + { + Name: "job", + Value: "node-exporter", + }, + }, + Samples: []prompbmarshal.Sample{ + { + Value: 123.3434, + Timestamp: 8939432423, + }, + { + Value: -123.3434, + Timestamp: 18939432423, + }, + }, + }, + } + data, err = wrm.Marshal() + if err != nil { + t.Fatalf("unexpected error") + } + f(data) + + wrm = &prompbmarshal.WriteRequest{} + wrm.Timeseries = []prompbmarshal.TimeSeries{ + { + Labels: []prompbmarshal.Label{ + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + { + Name: "instance", + Value: "host-123:4567", + }, + { + Name: "job", + Value: "node-exporter", + }, + }, + Samples: []prompbmarshal.Sample{ + { + Value: 123.3434, + Timestamp: 8939432423, + }, + { + Value: -123.3434, + Timestamp: 18939432423, + }, + }, + }, + { + Labels: []prompbmarshal.Label{ + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompbmarshal.Sample{ + { + Value: 9873, + }, + }, + }, + } + data, err = wrm.Marshal() + if err != nil { + t.Fatalf("unexpected error") + } + f(data) +} diff --git a/lib/prompb/prompb_timing_test.go b/lib/prompb/prompb_timing_test.go new file mode 100644 index 000000000..86e1d4e81 --- /dev/null +++ b/lib/prompb/prompb_timing_test.go @@ -0,0 +1,83 @@ +package prompb + +import ( + "fmt" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func BenchmarkWriteRequestUnmarshalProtobuf(b *testing.B) { + data, err := benchWriteRequest.Marshal() + if err != nil { + b.Fatalf("unexpected error: %s", err) + } + + b.ReportAllocs() + b.SetBytes(int64(len(benchWriteRequest.Timeseries))) + b.RunParallel(func(pb *testing.PB) { + var wr WriteRequest + for pb.Next() { + if err := wr.UnmarshalProtobuf(data); err != nil { + panic(fmt.Errorf("unexpected error: %s", err)) + } + } + }) +} + +var benchWriteRequest = func() *prompbmarshal.WriteRequest { + var tss []prompbmarshal.TimeSeries + for i := 0; i < 10_000; i++ { + ts := prompbmarshal.TimeSeries{ + Labels: []prompbmarshal.Label{ + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + { + Name: "instance", + Value: fmt.Sprintf("host-%d:4567", i), + }, + { + Name: "job", + Value: "node-exporter", + }, + { + Name: "pod", + Value: "foo-bar-pod-8983423843", + }, + { + Name: "cpu", + Value: "1", + }, + { + Name: "mode", + Value: "system", + }, + { + Name: "node", + Value: "host-123", + }, + { + Name: "namespace", + Value: "foo-bar-baz", + }, + { + Name: "container", + Value: fmt.Sprintf("aaa-bb-cc-dd-ee-%d", i), + }, + }, + Samples: []prompbmarshal.Sample{ + { + Value: float64(i), + Timestamp: 1e9 + int64(i)*1000, + }, + }, + } + tss = append(tss, ts) + } + wrm := &prompbmarshal.WriteRequest{ + Timeseries: tss, + } + return wrm +}() diff --git a/lib/prompb/remote.pb.go b/lib/prompb/remote.pb.go deleted file mode 100644 index 2491c147b..000000000 --- a/lib/prompb/remote.pb.go +++ /dev/null @@ -1,210 +0,0 @@ -// Code generated from remote.proto - -package prompb - -import ( - "fmt" - "io" -) - -// WriteRequest represents Prometheus remote write API request -type WriteRequest struct { - Timeseries []TimeSeries - - labelsPool []Label - samplesPool []Sample -} - -// Unmarshal unmarshals m from dAtA. -func (m *WriteRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return errIntOverflowRemote - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return errIntOverflowRemote - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return errInvalidLengthRemote - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if cap(m.Timeseries) > len(m.Timeseries) { - m.Timeseries = m.Timeseries[:len(m.Timeseries)+1] - } else { - m.Timeseries = append(m.Timeseries, TimeSeries{}) - } - ts := &m.Timeseries[len(m.Timeseries)-1] - var err error - m.labelsPool, m.samplesPool, err = ts.Unmarshal(dAtA[iNdEx:postIndex], m.labelsPool, m.samplesPool) - if err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipRemote(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return errInvalidLengthRemote - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipRemote(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowRemote - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowRemote - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowRemote - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - iNdEx += length - if length < 0 { - return 0, errInvalidLengthRemote - } - return iNdEx, nil - case 3: - for { - var innerWire uint64 - start := iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowRemote - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipRemote(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} - -var ( - errInvalidLengthRemote = fmt.Errorf("proto: negative length found during unmarshaling") - errIntOverflowRemote = fmt.Errorf("proto: integer overflow") -) diff --git a/lib/prompb/remote.proto b/lib/prompb/remote.proto deleted file mode 100644 index cc146ed0f..000000000 --- a/lib/prompb/remote.proto +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2016 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package prometheus; - -option go_package = "prompb"; - -import "types.proto"; - -message WriteRequest { - repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; -} diff --git a/lib/prompb/types.pb.go b/lib/prompb/types.pb.go deleted file mode 100644 index 8fd98b216..000000000 --- a/lib/prompb/types.pb.go +++ /dev/null @@ -1,457 +0,0 @@ -// Code generated manually from types.proto - -package prompb - -import ( - "encoding/binary" - "fmt" - "io" - "math" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" -) - -// Sample is a timeseries sample. -type Sample struct { - Value float64 - Timestamp int64 -} - -// TimeSeries is a timeseries. -type TimeSeries struct { - Labels []Label - Samples []Sample -} - -// Label is a timeseries label -type Label struct { - Name string - Value string -} - -// Unmarshal unmarshals sample from dAtA. -func (m *Sample) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return errIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Sample: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Sample: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) - } - var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - v = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Value = float64(math.Float64frombits(v)) - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) - } - m.Timestamp = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return errIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Timestamp |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipTypes(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return errInvalidLengthTypes - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} - -// Unmarshal unmarshals timeseries from dAtA. -func (m *TimeSeries) Unmarshal(dAtA []byte, dstLabels []Label, dstSamples []Sample) ([]Label, []Sample, error) { - labelsStart := len(dstLabels) - samplesStart := len(dstSamples) - - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return dstLabels, dstSamples, errIntOverflowTypes - } - if iNdEx >= l { - return dstLabels, dstSamples, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return dstLabels, dstSamples, fmt.Errorf("proto: TimeSeries: wiretype end group for non-group") - } - if fieldNum <= 0 { - return dstLabels, dstSamples, fmt.Errorf("proto: TimeSeries: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return dstLabels, dstSamples, fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return dstLabels, dstSamples, errIntOverflowTypes - } - if iNdEx >= l { - return dstLabels, dstSamples, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return dstLabels, dstSamples, errInvalidLengthTypes - } - postIndex := iNdEx + msglen - if postIndex > l { - return dstLabels, dstSamples, io.ErrUnexpectedEOF - } - if cap(dstLabels) > len(dstLabels) { - dstLabels = dstLabels[:len(dstLabels)+1] - } else { - dstLabels = append(dstLabels, Label{}) - } - lb := &dstLabels[len(dstLabels)-1] - if err := lb.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return dstLabels, dstSamples, err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return dstLabels, dstSamples, fmt.Errorf("proto: wrong wireType = %d for field Samples", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return dstLabels, dstSamples, errIntOverflowTypes - } - if iNdEx >= l { - return dstLabels, dstSamples, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return dstLabels, dstSamples, errInvalidLengthTypes - } - postIndex := iNdEx + msglen - if postIndex > l { - return dstLabels, dstSamples, io.ErrUnexpectedEOF - } - if cap(dstSamples) > len(dstSamples) { - dstSamples = dstSamples[:len(dstSamples)+1] - } else { - dstSamples = append(dstSamples, Sample{}) - } - s := &dstSamples[len(dstSamples)-1] - if err := s.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return dstLabels, dstSamples, err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipTypes(dAtA[iNdEx:]) - if err != nil { - return dstLabels, dstSamples, err - } - if skippy < 0 { - return dstLabels, dstSamples, errInvalidLengthTypes - } - if (iNdEx + skippy) > l { - return dstLabels, dstSamples, io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return dstLabels, dstSamples, io.ErrUnexpectedEOF - } - - m.Labels = dstLabels[labelsStart:] - m.Samples = dstSamples[samplesStart:] - return dstLabels, dstSamples, nil -} - -// Unmarshal unmarshals Label from dAtA. -func (m *Label) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return errIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Label: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Label: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return errIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return errInvalidLengthTypes - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Name = bytesutil.ToUnsafeString(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return errIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return errInvalidLengthTypes - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Value = bytesutil.ToUnsafeString(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipTypes(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return errInvalidLengthTypes - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} - -func skipTypes(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowTypes - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowTypes - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowTypes - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - iNdEx += length - if length < 0 { - return 0, errInvalidLengthTypes - } - return iNdEx, nil - case 3: - for { - var innerWire uint64 - start := iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, errIntOverflowTypes - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipTypes(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} - -var ( - errInvalidLengthTypes = fmt.Errorf("proto: negative length found during unmarshaling") - errIntOverflowTypes = fmt.Errorf("proto: integer overflow") -) diff --git a/lib/prompb/types.proto b/lib/prompb/types.proto deleted file mode 100644 index a3ce85b41..000000000 --- a/lib/prompb/types.proto +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2017 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package prometheus; - -option go_package = "prompb"; - -import "gogoproto/gogo.proto"; - -message Sample { - double value = 1; - int64 timestamp = 2; -} - -message TimeSeries { - repeated Label labels = 1 [(gogoproto.nullable) = false]; - repeated Sample samples = 2 [(gogoproto.nullable) = false]; -} - -message Label { - string name = 1; - string value = 2; -} diff --git a/lib/prompb/util.go b/lib/prompb/util.go deleted file mode 100644 index a606ff581..000000000 --- a/lib/prompb/util.go +++ /dev/null @@ -1,19 +0,0 @@ -package prompb - -// Reset resets wr. -func (wr *WriteRequest) Reset() { - for i := range wr.Timeseries { - wr.Timeseries[i] = TimeSeries{} - } - wr.Timeseries = wr.Timeseries[:0] - - for i := range wr.labelsPool { - wr.labelsPool[i] = Label{} - } - wr.labelsPool = wr.labelsPool[:0] - - for i := range wr.samplesPool { - wr.samplesPool[i] = Sample{} - } - wr.samplesPool = wr.samplesPool[:0] -} diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index 485ccb850..347a63217 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -69,7 +69,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer } wr := getWriteRequest() defer putWriteRequest(wr) - if err := wr.Unmarshal(bb.B); err != nil { + if err := wr.UnmarshalProtobuf(bb.B); err != nil { unmarshalErrors.Inc() return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(bb.B), err) }