From 8aaa828ba3f49a43cb8de89a0bcae160f7988c14 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 1 Feb 2024 06:33:03 +0200 Subject: [PATCH] lib/prompbmarshal: return back custom protobuf marshaler for lib/prompbmarshal.WriteRequest The easyproto-based marshaler is 2x slower than the previous custom marshaler, so let's stick with it. This improves the performance for sending data to remote storage at vmagent and reduces CPU usage to pre-v1.97.0 levels. --- docs/CHANGELOG.md | 1 + lib/prompbmarshal/prompbmarshal.go | 106 -------------------- lib/prompbmarshal/remote.pb.go | 64 ++++++++++++ lib/prompbmarshal/types.pb.go | 155 +++++++++++++++++++++++++++++ lib/prompbmarshal/util.go | 33 ++++++ 5 files changed, 253 insertions(+), 106 deletions(-) delete mode 100644 lib/prompbmarshal/prompbmarshal.go create mode 100644 lib/prompbmarshal/remote.pb.go create mode 100644 lib/prompbmarshal/types.pb.go create mode 100644 lib/prompbmarshal/util.go diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c851e0061..bbf95ca64 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,7 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for `username_file` option at `basic_auth` section in [`scrape_configs`](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5511). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5720). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix the increased CPU usage when sending the data to remote storage. The issue has been introduced in [v1.97.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.97.0). * BUGFIX: fix `runtime error: slice bounds out of range` panic, which can occur during query execution. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5733). The bug has been introduced in `v1.97.0`. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle `avg_over_time({some_filter}[d]) keep_metric_names` queries, where [`some_filter`](https://docs.victoriametrics.com/keyconcepts/#filtering) matches multiple time series with multiple names, while `d` is bigger or equal to `3h`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5556). * BUGFIX: dashboards/single: fix typo in query for `version` annotation which falsely produced many version change events. diff --git a/lib/prompbmarshal/prompbmarshal.go b/lib/prompbmarshal/prompbmarshal.go deleted file mode 100644 index e2e87b114..000000000 --- a/lib/prompbmarshal/prompbmarshal.go +++ /dev/null @@ -1,106 +0,0 @@ -package prompbmarshal - -import ( - "github.com/VictoriaMetrics/easyproto" -) - -// WriteRequest represents Prometheus remote write API request. -type WriteRequest struct { - // Timeseries contains a list of time series for the given WriteRequest - Timeseries []TimeSeries -} - -// Reset resets wr for subsequent re-use. -func (wr *WriteRequest) Reset() { - wr.Timeseries = ResetTimeSeries(wr.Timeseries) -} - -// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use. -func ResetTimeSeries(tss []TimeSeries) []TimeSeries { - for i := range tss { - tss[i] = TimeSeries{} - } - return tss[:0] -} - -// TimeSeries represents a single time series. -type TimeSeries struct { - // Labels contains a list of labels for the given TimeSeries - Labels []Label - - // Samples contains a list of samples for the given TimeSeries - Samples []Sample -} - -// Label represents time series label. -type Label struct { - // Name is label name. - Name string - - // Value is label value. - Value string -} - -// Sample represents time series sample -type Sample struct { - // Value is sample value. - Value float64 - - // Timestamp is sample timestamp. - Timestamp int64 -} - -// MarshalProtobuf appends protobuf-marshaled wr to dst and returns the result. -func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte { - m := mp.Get() - wr.appendToProtobuf(m.MessageMarshaler()) - dst = m.Marshal(dst) - mp.Put(m) - return dst -} - -func (wr *WriteRequest) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message WriteRequest { - // repeated TimeSeries timeseries = 1; - // } - tss := wr.Timeseries - for i := range tss { - tss[i].appendToProtobuf(mm.AppendMessage(1)) - } -} - -func (ts *TimeSeries) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message TimeSeries { - // repeated Label labels = 1; - // repeated Sample samples = 2; - // } - labels := ts.Labels - for i := range labels { - labels[i].appendToProtobuf(mm.AppendMessage(1)) - } - - samples := ts.Samples - for i := range samples { - samples[i].appendToProtobuf(mm.AppendMessage(2)) - } -} - -func (lbl *Label) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message Label { - // string name = 1; - // string value = 2; - // } - mm.AppendString(1, lbl.Name) - mm.AppendString(2, lbl.Value) -} - -func (s *Sample) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message Sample { - // double value = 1; - // int64 timestamp = 2; - // } - mm.AppendDouble(1, s.Value) - mm.AppendInt64(2, s.Timestamp) -} - -var mp easyproto.MarshalerPool diff --git a/lib/prompbmarshal/remote.pb.go b/lib/prompbmarshal/remote.pb.go new file mode 100644 index 000000000..a668fea11 --- /dev/null +++ b/lib/prompbmarshal/remote.pb.go @@ -0,0 +1,64 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: remote.proto + +package prompbmarshal + +import ( + math_bits "math/bits" +) + +type WriteRequest struct { + Timeseries []TimeSeries +} + +func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func encodeVarint(dAtA []byte, offset int, v uint64) int { + offset -= sov(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *WriteRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + return n +} + +func sov(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} diff --git a/lib/prompbmarshal/types.pb.go b/lib/prompbmarshal/types.pb.go new file mode 100644 index 000000000..249e5c80b --- /dev/null +++ b/lib/prompbmarshal/types.pb.go @@ -0,0 +1,155 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: types.proto + +package prompbmarshal + +import ( + encoding_binary "encoding/binary" + math "math" +) + +type Sample struct { + Value float64 + Timestamp int64 +} + +// TimeSeries represents samples and labels for a single time series. +type TimeSeries struct { + Labels []Label + Samples []Sample +} + +type Label struct { + Name string + Value string +} + +func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Timestamp != 0 { + i = encodeVarint(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x10 + } + if m.Value != 0 { + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) + i-- + dAtA[i] = 0x9 + } + return len(dAtA) - i, nil +} + +func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Samples) > 0 { + for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *Label) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Value) > 0 { + i -= len(m.Value) + copy(dAtA[i:], m.Value) + i = encodeVarint(dAtA, i, uint64(len(m.Value))) + i-- + dAtA[i] = 0x12 + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarint(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Sample) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Value != 0 { + n += 9 + } + if m.Timestamp != 0 { + n += 1 + sov(uint64(m.Timestamp)) + } + return n +} + +func (m *TimeSeries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + if len(m.Samples) > 0 { + for _, e := range m.Samples { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + return n +} + +func (m *Label) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } + return n +} diff --git a/lib/prompbmarshal/util.go b/lib/prompbmarshal/util.go new file mode 100644 index 000000000..417ca730c --- /dev/null +++ b/lib/prompbmarshal/util.go @@ -0,0 +1,33 @@ +package prompbmarshal + +import ( + "fmt" +) + +// MarshalProtobuf marshals wr to dst and returns the result. +func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte { + size := wr.Size() + dstLen := len(dst) + if n := size - (cap(dst) - dstLen); n > 0 { + dst = append(dst[:cap(dst)], make([]byte, n)...) + } + dst = dst[:dstLen+size] + n, err := wr.MarshalToSizedBuffer(dst[dstLen:]) + if err != nil { + panic(fmt.Errorf("BUG: unexpected error when marshaling WriteRequest: %w", err)) + } + return dst[:dstLen+n] +} + +// Reset resets wr. +func (wr *WriteRequest) Reset() { + wr.Timeseries = ResetTimeSeries(wr.Timeseries) +} + +// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use. +func ResetTimeSeries(tss []TimeSeries) []TimeSeries { + for i := range tss { + tss[i] = TimeSeries{} + } + return tss[:0] +}