From d2c94a066385b3acf1d24e308deb521fc5d9d007 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 14 Jan 2024 23:04:45 +0200 Subject: [PATCH] lib/prompbmarshal: switch to github.com/VictoriaMetrics/easyproto --- app/vmagent/remotewrite/pendingseries.go | 2 +- app/vmagent/remotewrite/pendingseries_test.go | 2 +- .../remotewrite/pendingseries_timing_test.go | 3 +- app/vmalert/remotewrite/client.go | 9 +- app/vmalert/remotewrite/debug_client.go | 5 +- lib/prompb/prompb_test.go | 41 ++-- lib/prompb/prompb_timing_test.go | 5 +- lib/prompbmarshal/prompbmarshal.go | 106 +++++++++ lib/prompbmarshal/prompbmarshal_test.go | 77 +++++++ .../prompbmarshal_timing_test.go | 74 ++++++ lib/prompbmarshal/remote.pb.go | 79 ------- lib/prompbmarshal/remote.proto | 82 ------- lib/prompbmarshal/types.pb.go | 216 ------------------ lib/prompbmarshal/types.proto | 85 ------- lib/prompbmarshal/util.go | 35 --- lib/promscrape/scrapework.go | 2 +- 16 files changed, 278 insertions(+), 545 deletions(-) create mode 100644 lib/prompbmarshal/prompbmarshal.go create mode 100644 lib/prompbmarshal/prompbmarshal_test.go create mode 100644 lib/prompbmarshal/prompbmarshal_timing_test.go delete mode 100644 lib/prompbmarshal/remote.pb.go delete mode 100644 lib/prompbmarshal/remote.proto delete mode 100644 lib/prompbmarshal/types.pb.go delete mode 100644 lib/prompbmarshal/types.proto delete mode 100644 lib/prompbmarshal/util.go diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index c6591e946..3d582f91f 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -228,7 +228,7 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block return true } bb := writeRequestBufPool.Get() - bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) + bb.B = wr.MarshalProtobuf(bb.B[:0]) if len(bb.B) <= maxUnpackedBlockSize.IntN() { zb := snappyBufPool.Get() if isVMRemoteWrite { diff --git a/app/vmagent/remotewrite/pendingseries_test.go b/app/vmagent/remotewrite/pendingseries_test.go index 726548cb9..14b1bd451 100644 --- a/app/vmagent/remotewrite/pendingseries_test.go +++ b/app/vmagent/remotewrite/pendingseries_test.go @@ -43,7 +43,7 @@ func testPushWriteRequest(t *testing.T, rowsCount, expectedBlockLenProm, expecte } // Check Prometheus remote write - f(false, expectedBlockLenProm, 0) + f(false, expectedBlockLenProm, 3) // Check VictoriaMetrics remote write f(true, expectedBlockLenVM, 15) diff --git a/app/vmagent/remotewrite/pendingseries_timing_test.go b/app/vmagent/remotewrite/pendingseries_timing_test.go index 8fbd4224f..4ac7b6813 100644 --- a/app/vmagent/remotewrite/pendingseries_timing_test.go +++ b/app/vmagent/remotewrite/pendingseries_timing_test.go @@ -4,7 +4,6 @@ import ( "fmt" "testing" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/golang/snappy" "github.com/klauspost/compress/s2" ) @@ -22,7 +21,7 @@ func benchmarkCompressWriteRequest(b *testing.B, compressFunc func(dst, src []by for _, rowsCount := range []int{1, 10, 100, 1e3, 1e4} { b.Run(fmt.Sprintf("rows_%d", rowsCount), func(b *testing.B) { wr := newTestWriteRequest(rowsCount, 10) - data := prompbmarshal.MarshalWriteRequest(nil, wr) + data := wr.MarshalProtobuf(nil) b.ReportAllocs() b.SetBytes(int64(rowsCount)) b.RunParallel(func(pb *testing.PB) { diff --git a/app/vmalert/remotewrite/client.go b/app/vmalert/remotewrite/client.go index ac3dffa65..2c9fc1ac1 100644 --- a/app/vmalert/remotewrite/client.go +++ b/app/vmalert/remotewrite/client.go @@ -208,15 +208,10 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) { if len(wr.Timeseries) < 1 { return } - defer prompbmarshal.ResetWriteRequest(wr) + defer wr.Reset() defer bufferFlushDuration.UpdateDuration(time.Now()) - data, err := wr.Marshal() - if err != nil { - logger.Errorf("failed to marshal WriteRequest: %s", err) - return - } - + data := wr.MarshalProtobuf(nil) b := snappy.Encode(nil, data) retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime diff --git a/app/vmalert/remotewrite/debug_client.go b/app/vmalert/remotewrite/debug_client.go index a3cd17282..482d34de0 100644 --- a/app/vmalert/remotewrite/debug_client.go +++ b/app/vmalert/remotewrite/debug_client.go @@ -49,10 +49,7 @@ func (c *DebugClient) Push(s prompbmarshal.TimeSeries) error { c.wg.Add(1) defer c.wg.Done() wr := &prompbmarshal.WriteRequest{Timeseries: []prompbmarshal.TimeSeries{s}} - data, err := wr.Marshal() - if err != nil { - return fmt.Errorf("failed to marshal the given time series: %w", err) - } + data := wr.MarshalProtobuf(nil) return c.send(data) } diff --git a/lib/prompb/prompb_test.go b/lib/prompb/prompb_test.go index 6e4e04e45..727101206 100644 --- a/lib/prompb/prompb_test.go +++ b/lib/prompb/prompb_test.go @@ -41,23 +41,20 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { Samples: samples, }) } - dataResult, err := wrm.Marshal() - if err != nil { - t.Fatalf("unexpected error: %s", err) - } + dataResult := wrm.MarshalProtobuf(nil) if !bytes.Equal(dataResult, data) { t.Fatalf("unexpected data obtained after marshaling\ngot\n%X\nwant\n%X", dataResult, data) } } + var data []byte wrm := &prompbmarshal.WriteRequest{} - data, err := wrm.Marshal() - if err != nil { - t.Fatalf("unexpected error") - } + + wrm.Reset() + data = wrm.MarshalProtobuf(data[:0]) f(data) - wrm = &prompbmarshal.WriteRequest{} + wrm.Reset() wrm.Timeseries = []prompbmarshal.TimeSeries{ { Labels: []prompbmarshal.Label{ @@ -76,13 +73,10 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { }, }, } - data, err = wrm.Marshal() - if err != nil { - t.Fatalf("unexpected error") - } + data = wrm.MarshalProtobuf(data[:0]) f(data) - wrm = &prompbmarshal.WriteRequest{} + wrm.Reset() wrm.Timeseries = []prompbmarshal.TimeSeries{ { Samples: []prompbmarshal.Sample{ @@ -97,13 +91,10 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { }, }, } - data, err = wrm.Marshal() - if err != nil { - t.Fatalf("unexpected error") - } + data = wrm.MarshalProtobuf(data[:0]) f(data) - wrm = &prompbmarshal.WriteRequest{} + wrm.Reset() wrm.Timeseries = []prompbmarshal.TimeSeries{ { Labels: []prompbmarshal.Label{ @@ -132,13 +123,10 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { }, }, } - data, err = wrm.Marshal() - if err != nil { - t.Fatalf("unexpected error") - } + data = wrm.MarshalProtobuf(data[:0]) f(data) - wrm = &prompbmarshal.WriteRequest{} + wrm.Reset() wrm.Timeseries = []prompbmarshal.TimeSeries{ { Labels: []prompbmarshal.Label{ @@ -180,9 +168,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) { }, }, } - data, err = wrm.Marshal() - if err != nil { - t.Fatalf("unexpected error") - } + data = wrm.MarshalProtobuf(data[:0]) f(data) } diff --git a/lib/prompb/prompb_timing_test.go b/lib/prompb/prompb_timing_test.go index 86e1d4e81..0fb956e2b 100644 --- a/lib/prompb/prompb_timing_test.go +++ b/lib/prompb/prompb_timing_test.go @@ -8,10 +8,7 @@ import ( ) func BenchmarkWriteRequestUnmarshalProtobuf(b *testing.B) { - data, err := benchWriteRequest.Marshal() - if err != nil { - b.Fatalf("unexpected error: %s", err) - } + data := benchWriteRequest.MarshalProtobuf(nil) b.ReportAllocs() b.SetBytes(int64(len(benchWriteRequest.Timeseries))) diff --git a/lib/prompbmarshal/prompbmarshal.go b/lib/prompbmarshal/prompbmarshal.go new file mode 100644 index 000000000..68114352e --- /dev/null +++ b/lib/prompbmarshal/prompbmarshal.go @@ -0,0 +1,106 @@ +package prompbmarshal + +import ( + "github.com/VictoriaMetrics/easyproto" +) + +// WriteRequest represents Prometheus remote write API request. +type WriteRequest struct { + // Timeseries contains a list of time series for the given WriteRequest + Timeseries []TimeSeries +} + +// Reset resets wr for subsequent re-use. +func (wr *WriteRequest) Reset() { + wr.Timeseries = ResetTimeSeries(wr.Timeseries) +} + +// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use. +func ResetTimeSeries(tss []TimeSeries) []TimeSeries { + for i := range tss { + tss[i] = TimeSeries{} + } + return tss[:0] +} + +// TimeSeries represents a single time series. +type TimeSeries struct { + // Labels contains a list of labels for the given TimeSeries + Labels []Label + + // Samples contains a list of samples for the given TimeSeries + Samples []Sample +} + +// Label represents time series label. +type Label struct { + // Name is label name. + Name string + + // Value is label value. + Value string +} + +// Sample represents time series sample +type Sample struct { + // Value is sample value. + Value float64 + + // Timestamp is sample timestamp. + Timestamp int64 +} + +// MarshalProtobuf appends protobuf-marshaled wr to dst and returns the result. +func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte { + // message WriteRequest { + // repeated TimeSeries timeseries = 1; + // } + m := mp.Get() + wr.appendToProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + mp.Put(m) + return dst +} + +func (wr *WriteRequest) appendToProtobuf(mm *easyproto.MessageMarshaler) { + tss := wr.Timeseries + for i := range tss { + tss[i].appendToProtobuf(mm.AppendMessage(1)) + } +} + +func (ts *TimeSeries) appendToProtobuf(mm *easyproto.MessageMarshaler) { + // message TimeSeries { + // repeated Label labels = 1; + // repeated Sample samples = 2; + // } + labels := ts.Labels + for i := range labels { + labels[i].appendToProtobuf(mm.AppendMessage(1)) + } + + samples := ts.Samples + for i := range samples { + samples[i].appendToProtobuf(mm.AppendMessage(2)) + } +} + +func (lbl *Label) appendToProtobuf(mm *easyproto.MessageMarshaler) { + // message Label { + // string name = 1; + // string value = 2; + // } + mm.AppendString(1, lbl.Name) + mm.AppendString(2, lbl.Value) +} + +func (s *Sample) appendToProtobuf(mm *easyproto.MessageMarshaler) { + // message Sample { + // double value = 1; + // int64 timestamp = 2; + // } + mm.AppendDouble(1, s.Value) + mm.AppendInt64(2, s.Timestamp) +} + +var mp easyproto.MarshalerPool diff --git a/lib/prompbmarshal/prompbmarshal_test.go b/lib/prompbmarshal/prompbmarshal_test.go new file mode 100644 index 000000000..99fb28c10 --- /dev/null +++ b/lib/prompbmarshal/prompbmarshal_test.go @@ -0,0 +1,77 @@ +package prompbmarshal_test + +import ( + "bytes" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestWriteRequestMarshalProtobuf(t *testing.T) { + wrm := &prompbmarshal.WriteRequest{ + Timeseries: []prompbmarshal.TimeSeries{ + { + Labels: []prompbmarshal.Label{ + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + { + Name: "instance", + Value: "host-123:4567", + }, + { + Name: "job", + Value: "node-exporter", + }, + }, + Samples: []prompbmarshal.Sample{ + { + Value: 123.3434, + Timestamp: 8939432423, + }, + { + Value: -123.3434, + Timestamp: 18939432423, + }, + }, + }, + }, + } + data := wrm.MarshalProtobuf(nil) + + // Verify that the marshaled protobuf is unmarshaled properly + var wr prompb.WriteRequest + if err := wr.UnmarshalProtobuf(data); err != nil { + t.Fatalf("cannot unmarshal protobuf: %s", err) + } + + // Compare the unmarshaled wr with the original wrm. + wrm.Reset() + for _, ts := range wr.Timeseries { + var labels []prompbmarshal.Label + for _, label := range ts.Labels { + labels = append(labels, prompbmarshal.Label{ + Name: label.Name, + Value: label.Value, + }) + } + var samples []prompbmarshal.Sample + for _, sample := range ts.Samples { + samples = append(samples, prompbmarshal.Sample{ + Value: sample.Value, + Timestamp: sample.Timestamp, + }) + } + wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{ + Labels: labels, + Samples: samples, + }) + } + dataResult := wrm.MarshalProtobuf(nil) + + if !bytes.Equal(dataResult, data) { + t.Fatalf("unexpected data obtained after marshaling\ngot\n%X\nwant\n%X", dataResult, data) + } +} diff --git a/lib/prompbmarshal/prompbmarshal_timing_test.go b/lib/prompbmarshal/prompbmarshal_timing_test.go new file mode 100644 index 000000000..56ded00ff --- /dev/null +++ b/lib/prompbmarshal/prompbmarshal_timing_test.go @@ -0,0 +1,74 @@ +package prompbmarshal + +import ( + "fmt" + "testing" +) + +func BenchmarkWriteRequestMarshalProtobuf(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(len(benchWriteRequest.Timeseries))) + b.RunParallel(func(pb *testing.PB) { + var data []byte + for pb.Next() { + data = benchWriteRequest.MarshalProtobuf(data[:0]) + } + }) +} + +var benchWriteRequest = func() *WriteRequest { + var tss []TimeSeries + for i := 0; i < 1_000; i++ { + ts := TimeSeries{ + Labels: []Label{ + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + { + Name: "instance", + Value: fmt.Sprintf("host-%d:4567", i), + }, + { + Name: "job", + Value: "node-exporter", + }, + { + Name: "pod", + Value: "foo-bar-pod-8983423843", + }, + { + Name: "cpu", + Value: "1", + }, + { + Name: "mode", + Value: "system", + }, + { + Name: "node", + Value: "host-123", + }, + { + Name: "namespace", + Value: "foo-bar-baz", + }, + { + Name: "container", + Value: fmt.Sprintf("aaa-bb-cc-dd-ee-%d", i), + }, + }, + Samples: []Sample{ + { + Value: float64(i), + Timestamp: 1e9 + int64(i)*1000, + }, + }, + } + tss = append(tss, ts) + } + wr := &WriteRequest{ + Timeseries: tss, + } + return wr +}() diff --git a/lib/prompbmarshal/remote.pb.go b/lib/prompbmarshal/remote.pb.go deleted file mode 100644 index 6d8c29bd2..000000000 --- a/lib/prompbmarshal/remote.pb.go +++ /dev/null @@ -1,79 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: remote.proto - -package prompbmarshal - -import ( - math_bits "math/bits" -) - -type WriteRequest struct { - Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` -} - -func (m *WriteRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Timeseries) > 0 { - for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintRemote(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } - } - return len(dAtA) - i, nil -} - -func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { - offset -= sovRemote(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return base -} -func (m *WriteRequest) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.Timeseries) > 0 { - for _, e := range m.Timeseries { - l = e.Size() - n += 1 + l + sovRemote(uint64(l)) - } - } - return n -} - -func sovRemote(x uint64) (n int) { - return (math_bits.Len64(x|1) + 6) / 7 -} diff --git a/lib/prompbmarshal/remote.proto b/lib/prompbmarshal/remote.proto deleted file mode 100644 index 5f82182ed..000000000 --- a/lib/prompbmarshal/remote.proto +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2016 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package prometheus; - -option go_package = "prompbmarshal"; - -import "types.proto"; -import "gogoproto/gogo.proto"; - -message WriteRequest { - repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; -} - -// ReadRequest represents a remote read request. -message ReadRequest { - repeated Query queries = 1; - - enum ResponseType { - // Server will return a single ReadResponse message with matched series that includes list of raw samples. - // It's recommended to use streamed response types instead. - // - // Response headers: - // Content-Type: "application/x-protobuf" - // Content-Encoding: "snappy" - SAMPLES = 0; - // Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series. - // Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum. - // - // Response headers: - // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" - // Content-Encoding: "" - STREAMED_XOR_CHUNKS = 1; - } - - // accepted_response_types allows negotiating the content type of the response. - // - // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is - // implemented by server, error is returned. - // For request that do not contain `accepted_response_types` field the SAMPLES response type will be used. - repeated ResponseType accepted_response_types = 2; -} - -// ReadResponse is a response when response_type equals SAMPLES. -message ReadResponse { - // In same order as the request's queries. - repeated QueryResult results = 1; -} - -message Query { - int64 start_timestamp_ms = 1; - int64 end_timestamp_ms = 2; - repeated prometheus.LabelMatcher matchers = 3; - prometheus.ReadHints hints = 4; -} - -message QueryResult { - // Samples within a time series must be ordered by time. - repeated prometheus.TimeSeries timeseries = 1; -} - -// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. -// We strictly stream full series after series, optionally split by time. This means that a single frame can contain -// partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. -message ChunkedReadResponse { - repeated prometheus.ChunkedSeries chunked_series = 1; - - // query_index represents an index of the query from ReadRequest.queries these chunks relates to. - int64 query_index = 2; -} diff --git a/lib/prompbmarshal/types.pb.go b/lib/prompbmarshal/types.pb.go deleted file mode 100644 index 423535339..000000000 --- a/lib/prompbmarshal/types.pb.go +++ /dev/null @@ -1,216 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: types.proto - -package prompbmarshal - -import ( - encoding_binary "encoding/binary" - math "math" - math_bits "math/bits" -) - -type Sample struct { - Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` - Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` -} - -// TimeSeries represents samples and labels for a single time series. -type TimeSeries struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` -} - -type Label struct { - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` -} - -func (m *Sample) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Sample) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Timestamp != 0 { - i = encodeVarintTypes(dAtA, i, uint64(m.Timestamp)) - i-- - dAtA[i] = 0x10 - } - if m.Value != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) - i-- - dAtA[i] = 0x9 - } - return len(dAtA) - i, nil -} - -func (m *TimeSeries) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *TimeSeries) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Samples) > 0 { - for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTypes(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - } - if len(m.Labels) > 0 { - for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTypes(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } - } - return len(dAtA) - i, nil -} - -func (m *Label) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Label) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Label) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Value) > 0 { - i -= len(m.Value) - copy(dAtA[i:], m.Value) - i = encodeVarintTypes(dAtA, i, uint64(len(m.Value))) - i-- - dAtA[i] = 0x12 - } - if len(m.Name) > 0 { - i -= len(m.Name) - copy(dAtA[i:], m.Name) - i = encodeVarintTypes(dAtA, i, uint64(len(m.Name))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { - offset -= sovTypes(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return base -} -func (m *Sample) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.Value != 0 { - n += 9 - } - if m.Timestamp != 0 { - n += 1 + sovTypes(uint64(m.Timestamp)) - } - return n -} - -func (m *TimeSeries) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.Labels) > 0 { - for _, e := range m.Labels { - l = e.Size() - n += 1 + l + sovTypes(uint64(l)) - } - } - if len(m.Samples) > 0 { - for _, e := range m.Samples { - l = e.Size() - n += 1 + l + sovTypes(uint64(l)) - } - } - return n -} - -func (m *Label) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Name) - if l > 0 { - n += 1 + l + sovTypes(uint64(l)) - } - l = len(m.Value) - if l > 0 { - n += 1 + l + sovTypes(uint64(l)) - } - return n -} - -func sovTypes(x uint64) (n int) { - return (math_bits.Len64(x|1) + 6) / 7 -} diff --git a/lib/prompbmarshal/types.proto b/lib/prompbmarshal/types.proto deleted file mode 100644 index 0d047b8c6..000000000 --- a/lib/prompbmarshal/types.proto +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2017 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package prometheus; - -option go_package = "prompbmarshal"; - -import "gogoproto/gogo.proto"; - -message Sample { - double value = 1; - int64 timestamp = 2; -} - -// TimeSeries represents samples and labels for a single time series. -message TimeSeries { - repeated Label labels = 1 [(gogoproto.nullable) = false]; - repeated Sample samples = 2 [(gogoproto.nullable) = false]; -} - -message Label { - string name = 1; - string value = 2; -} - -message Labels { - repeated Label labels = 1 [(gogoproto.nullable) = false]; -} - -// Matcher specifies a rule, which can match or set of labels or not. -message LabelMatcher { - enum Type { - EQ = 0; - NEQ = 1; - RE = 2; - NRE = 3; - } - Type type = 1; - string name = 2; - string value = 3; -} - -message ReadHints { - int64 step_ms = 1; // Query step size in milliseconds. - string func = 2; // String representation of surrounding function or aggregation. - int64 start_ms = 3; // Start time in milliseconds. - int64 end_ms = 4; // End time in milliseconds. - repeated string grouping = 5; // List of label names used in aggregation. - bool by = 6; // Indicate whether it is without or by. - int64 range_ms = 7; // Range vector selector range in milliseconds. -} - -// Chunk represents a TSDB chunk. -// Time range [min, max] is inclusive. -message Chunk { - int64 min_time_ms = 1; - int64 max_time_ms = 2; - - // We require this to match chunkenc.Encoding. - enum Encoding { - UNKNOWN = 0; - XOR = 1; - } - Encoding type = 3; - bytes data = 4; -} - -// ChunkedSeries represents single, encoded time series. -message ChunkedSeries { - // Labels should be sorted. - repeated Label labels = 1 [(gogoproto.nullable) = false]; - // Chunks will be in start time order and may overlap. - repeated Chunk chunks = 2 [(gogoproto.nullable) = false]; -} diff --git a/lib/prompbmarshal/util.go b/lib/prompbmarshal/util.go deleted file mode 100644 index ef766e02a..000000000 --- a/lib/prompbmarshal/util.go +++ /dev/null @@ -1,35 +0,0 @@ -package prompbmarshal - -import ( - "fmt" -) - -// MarshalWriteRequest marshals wr to dst and returns the result. -func MarshalWriteRequest(dst []byte, wr *WriteRequest) []byte { - size := wr.Size() - dstLen := len(dst) - if n := size - (cap(dst) - dstLen); n > 0 { - dst = append(dst[:cap(dst)], make([]byte, n)...) - } - dst = dst[:dstLen+size] - n, err := wr.MarshalToSizedBuffer(dst[dstLen:]) - if err != nil { - panic(fmt.Errorf("BUG: unexpected error when marshaling WriteRequest: %w", err)) - } - return dst[:dstLen+n] -} - -// ResetWriteRequest resets wr. -func ResetWriteRequest(wr *WriteRequest) { - wr.Timeseries = ResetTimeSeries(wr.Timeseries) -} - -// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use. -func ResetTimeSeries(tss []TimeSeries) []TimeSeries { - for i := range tss { - ts := tss[i] - ts.Labels = nil - ts.Samples = nil - } - return tss[:0] -} diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index f941a3d26..5858ae880 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -728,7 +728,7 @@ func (wc *writeRequestCtx) reset() { } func (wc *writeRequestCtx) resetNoRows() { - prompbmarshal.ResetWriteRequest(&wc.writeRequest) + wc.writeRequest.Reset() labels := wc.labels for i := range labels {