diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c851e0061..bbf95ca64 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,7 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for `username_file` option at `basic_auth` section in [`scrape_configs`](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5511). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5720). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix the increased CPU usage when sending the data to remote storage. The issue has been introduced in [v1.97.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.97.0). * BUGFIX: fix `runtime error: slice bounds out of range` panic, which can occur during query execution. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5733). The bug has been introduced in `v1.97.0`. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle `avg_over_time({some_filter}[d]) keep_metric_names` queries, where [`some_filter`](https://docs.victoriametrics.com/keyconcepts/#filtering) matches multiple time series with multiple names, while `d` is bigger or equal to `3h`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5556). * BUGFIX: dashboards/single: fix typo in query for `version` annotation which falsely produced many version change events. diff --git a/lib/prompbmarshal/prompbmarshal.go b/lib/prompbmarshal/prompbmarshal.go deleted file mode 100644 index e2e87b114..000000000 --- a/lib/prompbmarshal/prompbmarshal.go +++ /dev/null @@ -1,106 +0,0 @@ -package prompbmarshal - -import ( - "github.com/VictoriaMetrics/easyproto" -) - -// WriteRequest represents Prometheus remote write API request. -type WriteRequest struct { - // Timeseries contains a list of time series for the given WriteRequest - Timeseries []TimeSeries -} - -// Reset resets wr for subsequent re-use. -func (wr *WriteRequest) Reset() { - wr.Timeseries = ResetTimeSeries(wr.Timeseries) -} - -// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use. -func ResetTimeSeries(tss []TimeSeries) []TimeSeries { - for i := range tss { - tss[i] = TimeSeries{} - } - return tss[:0] -} - -// TimeSeries represents a single time series. -type TimeSeries struct { - // Labels contains a list of labels for the given TimeSeries - Labels []Label - - // Samples contains a list of samples for the given TimeSeries - Samples []Sample -} - -// Label represents time series label. -type Label struct { - // Name is label name. - Name string - - // Value is label value. - Value string -} - -// Sample represents time series sample -type Sample struct { - // Value is sample value. - Value float64 - - // Timestamp is sample timestamp. - Timestamp int64 -} - -// MarshalProtobuf appends protobuf-marshaled wr to dst and returns the result. -func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte { - m := mp.Get() - wr.appendToProtobuf(m.MessageMarshaler()) - dst = m.Marshal(dst) - mp.Put(m) - return dst -} - -func (wr *WriteRequest) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message WriteRequest { - // repeated TimeSeries timeseries = 1; - // } - tss := wr.Timeseries - for i := range tss { - tss[i].appendToProtobuf(mm.AppendMessage(1)) - } -} - -func (ts *TimeSeries) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message TimeSeries { - // repeated Label labels = 1; - // repeated Sample samples = 2; - // } - labels := ts.Labels - for i := range labels { - labels[i].appendToProtobuf(mm.AppendMessage(1)) - } - - samples := ts.Samples - for i := range samples { - samples[i].appendToProtobuf(mm.AppendMessage(2)) - } -} - -func (lbl *Label) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message Label { - // string name = 1; - // string value = 2; - // } - mm.AppendString(1, lbl.Name) - mm.AppendString(2, lbl.Value) -} - -func (s *Sample) appendToProtobuf(mm *easyproto.MessageMarshaler) { - // message Sample { - // double value = 1; - // int64 timestamp = 2; - // } - mm.AppendDouble(1, s.Value) - mm.AppendInt64(2, s.Timestamp) -} - -var mp easyproto.MarshalerPool diff --git a/lib/prompbmarshal/remote.pb.go b/lib/prompbmarshal/remote.pb.go new file mode 100644 index 000000000..a668fea11 --- /dev/null +++ b/lib/prompbmarshal/remote.pb.go @@ -0,0 +1,64 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: remote.proto + +package prompbmarshal + +import ( + math_bits "math/bits" +) + +type WriteRequest struct { + Timeseries []TimeSeries +} + +func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func encodeVarint(dAtA []byte, offset int, v uint64) int { + offset -= sov(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *WriteRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + return n +} + +func sov(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} diff --git a/lib/prompbmarshal/types.pb.go b/lib/prompbmarshal/types.pb.go new file mode 100644 index 000000000..249e5c80b --- /dev/null +++ b/lib/prompbmarshal/types.pb.go @@ -0,0 +1,155 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: types.proto + +package prompbmarshal + +import ( + encoding_binary "encoding/binary" + math "math" +) + +type Sample struct { + Value float64 + Timestamp int64 +} + +// TimeSeries represents samples and labels for a single time series. +type TimeSeries struct { + Labels []Label + Samples []Sample +} + +type Label struct { + Name string + Value string +} + +func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Timestamp != 0 { + i = encodeVarint(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x10 + } + if m.Value != 0 { + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) + i-- + dAtA[i] = 0x9 + } + return len(dAtA) - i, nil +} + +func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Samples) > 0 { + for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *Label) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Value) > 0 { + i -= len(m.Value) + copy(dAtA[i:], m.Value) + i = encodeVarint(dAtA, i, uint64(len(m.Value))) + i-- + dAtA[i] = 0x12 + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarint(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Sample) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Value != 0 { + n += 9 + } + if m.Timestamp != 0 { + n += 1 + sov(uint64(m.Timestamp)) + } + return n +} + +func (m *TimeSeries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + if len(m.Samples) > 0 { + for _, e := range m.Samples { + l = e.Size() + n += 1 + l + sov(uint64(l)) + } + } + return n +} + +func (m *Label) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } + return n +} diff --git a/lib/prompbmarshal/util.go b/lib/prompbmarshal/util.go new file mode 100644 index 000000000..417ca730c --- /dev/null +++ b/lib/prompbmarshal/util.go @@ -0,0 +1,33 @@ +package prompbmarshal + +import ( + "fmt" +) + +// MarshalProtobuf marshals wr to dst and returns the result. +func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte { + size := wr.Size() + dstLen := len(dst) + if n := size - (cap(dst) - dstLen); n > 0 { + dst = append(dst[:cap(dst)], make([]byte, n)...) + } + dst = dst[:dstLen+size] + n, err := wr.MarshalToSizedBuffer(dst[dstLen:]) + if err != nil { + panic(fmt.Errorf("BUG: unexpected error when marshaling WriteRequest: %w", err)) + } + return dst[:dstLen+n] +} + +// Reset resets wr. +func (wr *WriteRequest) Reset() { + wr.Timeseries = ResetTimeSeries(wr.Timeseries) +} + +// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use. +func ResetTimeSeries(tss []TimeSeries) []TimeSeries { + for i := range tss { + tss[i] = TimeSeries{} + } + return tss[:0] +}