lib/prompbmarshal: return back custom protobuf marshaler for lib/prompbmarshal.WriteRequest

The easyproto-based marshaler is 2x slower than the previous custom marshaler,
so let's stick with it. This improves the performance for sending data to remote storage at vmagent
and reduces CPU usage to pre-v1.97.0 levels.
This commit is contained in:
Aliaksandr Valialkin 2024-02-01 06:33:03 +02:00
parent faeabfc730
commit 6c56f49f9c
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 253 additions and 106 deletions

View file

@ -30,6 +30,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for `username_file` option at `basic_auth` section in [`scrape_configs`](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5511). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5720).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix the increased CPU usage when sending the data to remote storage. The issue has been introduced in [v1.97.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.97.0).
* BUGFIX: fix `runtime error: slice bounds out of range` panic, which can occur during query execution. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5733). The bug has been introduced in `v1.97.0`.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle `avg_over_time({some_filter}[d]) keep_metric_names` queries, where [`some_filter`](https://docs.victoriametrics.com/keyconcepts/#filtering) matches multiple time series with multiple names, while `d` is bigger or equal to `3h`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5556).
* BUGFIX: dashboards/single: fix typo in query for `version` annotation which falsely produced many version change events.

View file

@ -1,106 +0,0 @@
package prompbmarshal
import (
"github.com/VictoriaMetrics/easyproto"
)
// WriteRequest represents Prometheus remote write API request.
type WriteRequest struct {
// Timeseries contains a list of time series for the given WriteRequest
Timeseries []TimeSeries
}
// Reset resets wr for subsequent re-use.
func (wr *WriteRequest) Reset() {
wr.Timeseries = ResetTimeSeries(wr.Timeseries)
}
// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use.
func ResetTimeSeries(tss []TimeSeries) []TimeSeries {
for i := range tss {
tss[i] = TimeSeries{}
}
return tss[:0]
}
// TimeSeries represents a single time series.
type TimeSeries struct {
// Labels contains a list of labels for the given TimeSeries
Labels []Label
// Samples contains a list of samples for the given TimeSeries
Samples []Sample
}
// Label represents time series label.
type Label struct {
// Name is label name.
Name string
// Value is label value.
Value string
}
// Sample represents time series sample
type Sample struct {
// Value is sample value.
Value float64
// Timestamp is sample timestamp.
Timestamp int64
}
// MarshalProtobuf appends protobuf-marshaled wr to dst and returns the result.
func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte {
m := mp.Get()
wr.appendToProtobuf(m.MessageMarshaler())
dst = m.Marshal(dst)
mp.Put(m)
return dst
}
func (wr *WriteRequest) appendToProtobuf(mm *easyproto.MessageMarshaler) {
// message WriteRequest {
// repeated TimeSeries timeseries = 1;
// }
tss := wr.Timeseries
for i := range tss {
tss[i].appendToProtobuf(mm.AppendMessage(1))
}
}
func (ts *TimeSeries) appendToProtobuf(mm *easyproto.MessageMarshaler) {
// message TimeSeries {
// repeated Label labels = 1;
// repeated Sample samples = 2;
// }
labels := ts.Labels
for i := range labels {
labels[i].appendToProtobuf(mm.AppendMessage(1))
}
samples := ts.Samples
for i := range samples {
samples[i].appendToProtobuf(mm.AppendMessage(2))
}
}
func (lbl *Label) appendToProtobuf(mm *easyproto.MessageMarshaler) {
// message Label {
// string name = 1;
// string value = 2;
// }
mm.AppendString(1, lbl.Name)
mm.AppendString(2, lbl.Value)
}
func (s *Sample) appendToProtobuf(mm *easyproto.MessageMarshaler) {
// message Sample {
// double value = 1;
// int64 timestamp = 2;
// }
mm.AppendDouble(1, s.Value)
mm.AppendInt64(2, s.Timestamp)
}
var mp easyproto.MarshalerPool

View file

@ -0,0 +1,64 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: remote.proto
package prompbmarshal
import (
math_bits "math/bits"
)
type WriteRequest struct {
Timeseries []TimeSeries
}
func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarint(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func encodeVarint(dAtA []byte, offset int, v uint64) int {
offset -= sov(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *WriteRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sov(uint64(l))
}
}
return n
}
func sov(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}

View file

@ -0,0 +1,155 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: types.proto
package prompbmarshal
import (
encoding_binary "encoding/binary"
math "math"
)
type Sample struct {
Value float64
Timestamp int64
}
// TimeSeries represents samples and labels for a single time series.
type TimeSeries struct {
Labels []Label
Samples []Sample
}
type Label struct {
Name string
Value string
}
func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Timestamp != 0 {
i = encodeVarint(dAtA, i, uint64(m.Timestamp))
i--
dAtA[i] = 0x10
}
if m.Value != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
i--
dAtA[i] = 0x9
}
return len(dAtA) - i, nil
}
func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarint(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.Labels) > 0 {
for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarint(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *Label) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarint(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarint(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *Sample) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Value != 0 {
n += 9
}
if m.Timestamp != 0 {
n += 1 + sov(uint64(m.Timestamp))
}
return n
}
func (m *TimeSeries) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Labels) > 0 {
for _, e := range m.Labels {
l = e.Size()
n += 1 + l + sov(uint64(l))
}
}
if len(m.Samples) > 0 {
for _, e := range m.Samples {
l = e.Size()
n += 1 + l + sov(uint64(l))
}
}
return n
}
func (m *Label) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
return n
}

33
lib/prompbmarshal/util.go Normal file
View file

@ -0,0 +1,33 @@
package prompbmarshal
import (
"fmt"
)
// MarshalProtobuf marshals wr to dst and returns the result.
func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte {
size := wr.Size()
dstLen := len(dst)
if n := size - (cap(dst) - dstLen); n > 0 {
dst = append(dst[:cap(dst)], make([]byte, n)...)
}
dst = dst[:dstLen+size]
n, err := wr.MarshalToSizedBuffer(dst[dstLen:])
if err != nil {
panic(fmt.Errorf("BUG: unexpected error when marshaling WriteRequest: %w", err))
}
return dst[:dstLen+n]
}
// Reset resets wr.
func (wr *WriteRequest) Reset() {
wr.Timeseries = ResetTimeSeries(wr.Timeseries)
}
// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use.
func ResetTimeSeries(tss []TimeSeries) []TimeSeries {
for i := range tss {
tss[i] = TimeSeries{}
}
return tss[:0]
}