From 0597718435155499db2f74ae0d2001b11a2c5de5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 14 Jan 2024 21:06:01 +0200 Subject: [PATCH] lib/protoparser/datadogv2: add support for reading protobuf-encoded requests at /api/v2/series endpoint Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5094 --- docs/CHANGELOG.md | 2 +- go.mod | 1 + go.sum | 2 + lib/protoparser/datadogv2/parser.go | 178 ++++- .../VictoriaMetrics/easyproto/LICENSE | 190 +++++ .../VictoriaMetrics/easyproto/README.md | 219 ++++++ .../VictoriaMetrics/easyproto/doc.go | 3 + .../VictoriaMetrics/easyproto/reader.go | 739 ++++++++++++++++++ .../VictoriaMetrics/easyproto/writer.go | 718 +++++++++++++++++ vendor/modules.txt | 3 + 10 files changed, 2052 insertions(+), 3 deletions(-) create mode 100644 vendor/github.com/VictoriaMetrics/easyproto/LICENSE create mode 100644 vendor/github.com/VictoriaMetrics/easyproto/README.md create mode 100644 vendor/github.com/VictoriaMetrics/easyproto/doc.go create mode 100644 vendor/github.com/VictoriaMetrics/easyproto/reader.go create mode 100644 vendor/github.com/VictoriaMetrics/easyproto/writer.go diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a545f897f..15bb378d3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,7 +28,7 @@ The sandbox cluster installation is running under the constant load generated by ## tip -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). JSON protocol is supported right now. Protobuf protocol will be supported later. See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags: - `-datasource.oauth2.endpointParams` for `-datasource.url` diff --git a/go.mod b/go.mod index 17b334b6f..63b3c6633 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go/storage v1.35.1 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 + github.com/VictoriaMetrics/easyproto v0.1.3 github.com/VictoriaMetrics/fastcache v1.12.2 // Do not use the original github.com/valyala/fasthttp because of issues diff --git a/go.sum b/go.sum index 6fdf75f23..3d74a4508 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0/go.mod h1:wP83 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= +github.com/VictoriaMetrics/easyproto v0.1.3 h1:8in4J7DdI+umTJK+0LA/NPC68NmmAv+Tn2WY5DSAniM= +github.com/VictoriaMetrics/easyproto v0.1.3/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/VictoriaMetrics/fasthttp v1.2.0 h1:nd9Wng4DlNtaI27WlYh5mGXCJOmee/2c2blTJwfyU9I= diff --git a/lib/protoparser/datadogv2/parser.go b/lib/protoparser/datadogv2/parser.go index 55b57b007..8655bb427 100644 --- a/lib/protoparser/datadogv2/parser.go +++ b/lib/protoparser/datadogv2/parser.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/easyproto" ) // Request represents DataDog POST request to /api/v2/series @@ -58,8 +59,42 @@ func UnmarshalJSON(req *Request, b []byte) error { // b shouldn't be modified when req is in use. func UnmarshalProtobuf(req *Request, b []byte) error { req.reset() - _ = b - return fmt.Errorf("unimplemented") + return req.unmarshalProtobuf(b) +} + +func (req *Request) unmarshalProtobuf(src []byte) error { + // message Request { + // repeated Series series = 1; + // } + // + // See https://github.com/DataDog/agent-payload/blob/d7c5dcc63970d0e19678a342e7718448dd777062/proto/metrics/agent_payload.proto + series := req.Series + var fc easyproto.FieldContext + for len(src) > 0 { + tail, err := fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot unmarshal next field: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read series data") + } + if len(series) < cap(series) { + series = series[:len(series)+1] + } else { + series = append(series, Series{}) + } + s := &series[len(series)-1] + if err := s.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal series: %w", err) + } + } + src = tail + } + req.Series = series + return nil } // Series represents a series item from DataDog POST request to /api/v2/series @@ -114,6 +149,81 @@ func (s *Series) reset() { s.Tags = tags[:0] } +func (s *Series) unmarshalProtobuf(src []byte) error { + // message MetricSeries { + // string metric = 2; + // repeated Point points = 4; + // repeated Resource resources = 1; + // string source_type_name = 7; + // repeated string tags = 3; + // } + // + // See https://github.com/DataDog/agent-payload/blob/d7c5dcc63970d0e19678a342e7718448dd777062/proto/metrics/agent_payload.proto + points := s.Points + resources := s.Resources + tags := s.Tags + var fc easyproto.FieldContext + for len(src) > 0 { + tail, err := fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot unmarshal next field: %w", err) + } + switch fc.FieldNum { + case 2: + metric, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal metric") + } + s.Metric = metric + case 4: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read point data") + } + if len(points) < cap(points) { + points = points[:len(points)+1] + } else { + points = append(points, Point{}) + } + pt := &points[len(points)-1] + if err := pt.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal point: %s", err) + } + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read resource data") + } + if len(resources) < cap(resources) { + resources = resources[:len(resources)+1] + } else { + resources = append(resources, Resource{}) + } + r := &resources[len(resources)-1] + if err := r.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal resource: %w", err) + } + case 7: + sourceTypeName, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal source_type_name") + } + s.SourceTypeName = sourceTypeName + case 3: + tag, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal tag") + } + tags = append(tags, tag) + } + src = tail + } + s.Points = points + s.Resources = resources + s.Tags = tags + return nil +} + // Point represents a point from DataDog POST request to /api/v2/series // // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics @@ -130,6 +240,38 @@ func (pt *Point) reset() { pt.Value = 0 } +func (pt *Point) unmarshalProtobuf(src []byte) error { + // message Point { + // double value = 1; + // int64 timestamp = 2; + // } + // + // See https://github.com/DataDog/agent-payload/blob/d7c5dcc63970d0e19678a342e7718448dd777062/proto/metrics/agent_payload.proto + var fc easyproto.FieldContext + for len(src) > 0 { + tail, err := fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot unmarshal next field: %w", err) + } + switch fc.FieldNum { + case 1: + value, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot unmarshal value") + } + pt.Value = value + case 2: + timestamp, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot unmarshal timestamp") + } + pt.Timestamp = timestamp + } + src = tail + } + return nil +} + // Resource is series resource from DataDog POST request to /api/v2/series // // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics @@ -142,3 +284,35 @@ func (r *Resource) reset() { r.Name = "" r.Type = "" } + +func (r *Resource) unmarshalProtobuf(src []byte) error { + // message Resource { + // string type = 1; + // string name = 2; + // } + // + // See https://github.com/DataDog/agent-payload/blob/d7c5dcc63970d0e19678a342e7718448dd777062/proto/metrics/agent_payload.proto + var fc easyproto.FieldContext + for len(src) > 0 { + tail, err := fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot unmarshal next field: %w", err) + } + switch fc.FieldNum { + case 1: + typ, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal type") + } + r.Type = typ + case 2: + name, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal name") + } + r.Name = name + } + src = tail + } + return nil +} diff --git a/vendor/github.com/VictoriaMetrics/easyproto/LICENSE b/vendor/github.com/VictoriaMetrics/easyproto/LICENSE new file mode 100644 index 000000000..c6b28e5af --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/easyproto/LICENSE @@ -0,0 +1,190 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2023-2024 VictoriaMetrics, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/VictoriaMetrics/easyproto/README.md b/vendor/github.com/VictoriaMetrics/easyproto/README.md new file mode 100644 index 000000000..caaa40b3b --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/easyproto/README.md @@ -0,0 +1,219 @@ +[![GoDoc](https://godoc.org/github.com/VictoriaMetrics/easyproto?status.svg)](http://godoc.org/github.com/VictoriaMetrics/easyproto) + +# easyproto + +Package [github.com/VictoriaMetrics/easyproto](http://godoc.org/github.com/VictoriaMetrics/easyproto) provides simple building blocks +for marshaling and unmarshaling of [protobuf](https://protobuf.dev/) messages with [proto3 encoding](https://protobuf.dev/programming-guides/encoding/). + +## Features + +- There is no need in [protoc](https://grpc.io/docs/protoc-installation/) or [go generate](https://go.dev/blog/generate) - + just write simple maintainable code for marshaling and unmarshaling protobuf messages. +- `easyproto` doesn't increase your binary size by tens of megabytes unlike traditional `protoc`-combiled code may do. +- `easyproto` allows writing zero-alloc code for marshaling and unmarshaling of arbitrary complex protobuf messages. See [examples](#examples). + +## Restrictions + +- It supports only [proto3 encoding](https://protobuf.dev/programming-guides/encoding/), e.g. it doesn't support `proto2` encoding + features such as [proto2 groups](https://protobuf.dev/programming-guides/proto2/#groups). +- It doesn't provide helpers for marshaling and unmarshaling of [well-known types](https://protobuf.dev/reference/protobuf/google.protobuf/), + since they aren't used too much in practice. + +## Examples + +Suppose you need marshaling and unmarshaling of the following `timeseries` message: + +```proto +message timeseries { + string name = 1; + repeated sample samples = 2; +} + +message sample { + double value = 1; + int64 timestamp = 2; +} +``` + +At first let's create the corresponding data structures in Go: + +```go +type Timeseries struct { + Name string + Samples []Sample +} + +type Sample struct { + Value float64 + Timestamp int64 +} +``` + +Since you write the code on yourself without any `go generate` and `protoc` invocations, +you are free to use arbitrary fields and methods in these structs. You can also specify the most suitable types for these fields. +For example, the `Sample` struct may be written as the following if you need an ability to detect empty values and timestamps: + +```go +type Sample struct { + Value *float64 + Timestamp *int64 +} +``` + +* [How to marshal `Timeseries` struct to protobuf message](#marshaling) +* [How to unmarshal protobuf message to `Timeseries` struct](#unmarshaling) + +### Marshaling + +The following code can be used for marshaling `Timeseries` struct to protobuf message: + +```go +import ( + "github.com/VictoriaMetrics/easyproto" +) + +// MarshalProtobuf marshals ts into protobuf message, appends this message to dst and returns the result. +// +// This function doesn't allocate memory on repeated calls. +func (ts *Timeseries) MarshalProtobuf(dst []byte) []byte { + m := mp.Get() + ts.marshalProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + mp.Put(m) + return dst +} + +func (ts *Timeseries) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendString(1, ts.Name) + for _, s := range ts.Samples { + s.marshalProtobuf(mm.AppendMessage(2)) + } +} + +func (s *Sample) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendDouble(1, s.Value) + mm.AppendInt64(2, s.Timestamp) +} + +var mp easyproto.MarshalerPool +``` + +Note that you are free to modify this code according to your needs, since you write and maintain it. +For example, you can construct arbitrary protobuf messages on the fly without the need to prepare the source struct for marshaling: + +```go +func CreateProtobufMessageOnTheFly() []byte { + // Dynamically construct timeseries message with 10 samples + var m easyproto.Marshaler + mm := m.MessageMarshaler() + mm.AppendString(1, "foo") + for i := 0; i < 10; i++ { + mmSample := mm.AppendMessage(2) + mmSample.AppendDouble(1, float64(i)/10) + mmSample.AppendInt64(2, int64(i)*1000) + } + return m.Marshal(nil) +} +``` + +This may be useful in tests. + +### Unmarshaling + +The following code can be used for unmarshaling [`timeseries` message](#examples) into `Timeseries` struct: + +```go +// UnmarshalProtobuf unmarshals ts from protobuf message at src. +func (ts *Timeseries) UnmarshalProtobuf(src []byte) (err error) { + // Set default Timeseries values + ts.Name = "" + ts.Samples = ts.Samples[:0] + + // Parse Timeseries message at src + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in Timeseries message") + } + switch fc.FieldNum { + case 1: + name, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read Timeseries name") + } + // name refers to src. This means that the name changes when src changes. + // Make a copy with strings.Clone(name) if needed. + ts.Name = name + case 2: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Timeseries sample data") + } + ts.Samples = append(ts.Samples, Sample{}) + s := &ts.Samples[len(ts.Samples)-1] + if err := s.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal sample: %w", err) + } + } + } + return nil +} + +// UnmarshalProtobuf unmarshals s from protobuf message at src. +func (s *Sample) UnmarshalProtobuf(src []byte) (err error) { + // Set default Sample values + s.Value = 0 + s.Timestamp = 0 + + // Parse Sample message at src + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in sample") + } + switch fc.FieldNum { + case 1: + value, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read sample value") + } + s.Value = value + case 2: + timestamp, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read sample timestamp") + } + s.Timestamp = timestamp + } + } + return nil +} +``` + +You are free to modify this code according to your needs, since you wrote it and you maintain it. + +It is possible to extract the needed data from arbitrary protobuf messages without the need to create a destination struct. +For example, the following code extracts `timeseries` name from protobuf message, while ignoring all the other fields: + +```go +func GetTimeseriesName(src []byte) (name string, err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if src != nil { + return "", fmt.Errorf("cannot read the next field") + } + if fc.FieldNum == 1 { + name, ok := fc.String() + if !ok { + return "", fmt.Errorf("cannot read timeseries name") + } + // Return a copy of name, since name refers to src. + return strings.Clone(name), nil + } + } + return "", fmt.Errorf("timeseries name isn't found in the message") +} +``` diff --git a/vendor/github.com/VictoriaMetrics/easyproto/doc.go b/vendor/github.com/VictoriaMetrics/easyproto/doc.go new file mode 100644 index 000000000..036f82587 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/easyproto/doc.go @@ -0,0 +1,3 @@ +// Package easyproto provides building blocks for marshaling and unmarshaling protobuf v3 messages +// according to https://protobuf.dev/programming-guides/encoding/ . +package easyproto diff --git a/vendor/github.com/VictoriaMetrics/easyproto/reader.go b/vendor/github.com/VictoriaMetrics/easyproto/reader.go new file mode 100644 index 000000000..4ffa4db53 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/easyproto/reader.go @@ -0,0 +1,739 @@ +package easyproto + +import ( + "encoding/binary" + "fmt" + "math" + "unsafe" +) + +// FieldContext represents a single protobuf-encoded field after NextField() call. +type FieldContext struct { + // FieldNum is the number of protobuf field read after NextField() call. + FieldNum uint32 + + // wireType is the wire type for the given field + wireType wireType + + // data is probobuf-encoded field data for wireType=wireTypeLen + data []byte + + // intValue contains int value for wireType!=wireTypeLen + intValue uint64 +} + +// NextField reads the next field from protobuf-encoded src. +// +// It returns the tail left after reading the next field from src. +// +// It is unsafe modifying src while FieldContext is in use. +func (fc *FieldContext) NextField(src []byte) ([]byte, error) { + if len(src) >= 2 { + n := uint16(src[0])<<8 | uint16(src[1]) + if (n&0x8080 == 0) && (n&0x0700 == (uint16(wireTypeLen) << 8)) { + // Fast path - read message with the length smaller than 0x80 bytes. + msgLen := int(n & 0xff) + src = src[2:] + if len(src) < msgLen { + return src, fmt.Errorf("cannot read field for from %d bytes; need at least %d bytes", len(src), msgLen) + } + fc.FieldNum = uint32(n >> (8 + 3)) + fc.wireType = wireTypeLen + fc.data = src[:msgLen] + src = src[msgLen:] + return src, nil + } + } + + // Read field tag. See https://protobuf.dev/programming-guides/encoding/#structure + if len(src) == 0 { + return src, fmt.Errorf("cannot unmarshal field from empty message") + } + + var fieldNum uint64 + tag := uint64(src[0]) + if tag < 0x80 { + src = src[1:] + fieldNum = tag >> 3 + } else { + var offset int + tag, offset = binary.Uvarint(src) + if offset <= 0 { + return src, fmt.Errorf("cannot unmarshal field tag from uvarint") + } + src = src[offset:] + fieldNum = tag >> 3 + if fieldNum > math.MaxUint32 { + return src, fmt.Errorf("fieldNum=%d is bigger than uint32max=%d", fieldNum, math.MaxUint32) + } + } + + wt := wireType(tag & 0x07) + + fc.FieldNum = uint32(fieldNum) + fc.wireType = wt + + // Read the remaining data + if wt == wireTypeLen { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return src, fmt.Errorf("cannot read message length for field #%d", fieldNum) + } + src = src[offset:] + if uint64(len(src)) < u64 { + return src, fmt.Errorf("cannot read data for field #%d from %d bytes; need at least %d bytes", fieldNum, len(src), u64) + } + fc.data = src[:u64] + src = src[u64:] + return src, nil + } + if wt == wireTypeVarint { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return src, fmt.Errorf("cannot read varint after field tag for field #%d", fieldNum) + } + src = src[offset:] + fc.intValue = u64 + return src, nil + } + if wt == wireTypeI64 { + if len(src) < 8 { + return src, fmt.Errorf("cannot read i64 for field #%d", fieldNum) + } + u64 := binary.LittleEndian.Uint64(src) + src = src[8:] + fc.intValue = u64 + return src, nil + } + if wt == wireTypeI32 { + if len(src) < 4 { + return src, fmt.Errorf("cannot read i32 for field #%d", fieldNum) + } + u32 := binary.LittleEndian.Uint32(src) + src = src[4:] + fc.intValue = uint64(u32) + return src, nil + } + return src, fmt.Errorf("unknown wireType=%d", wt) +} + +// UnmarshalMessageLen unmarshals protobuf message length from src. +// +// It returns the tail left after unmarshaling message length from src. +// +// It is expected that src is marshaled with Marshaler.MarshalWithLen(). +// +// False is returned if message length cannot be unmarshaled from src. +func UnmarshalMessageLen(src []byte) (int, []byte, bool) { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return 0, src, false + } + src = src[offset:] + if u64 > math.MaxInt32 { + return 0, src, false + } + return int(u64), src, true +} + +// wireType is the type of of protobuf-encoded field +// +// See https://protobuf.dev/programming-guides/encoding/#structure +type wireType byte + +const ( + // VARINT type - one of int32, int64, uint32, uint64, sint32, sint64, bool, enum + wireTypeVarint = wireType(0) + + // I64 type + wireTypeI64 = wireType(1) + + // Len type + wireTypeLen = wireType(2) + + // I32 type + wireTypeI32 = wireType(5) +) + +// Int32 returns int32 value for fc. +// +// False is returned if fc doesn't contain int32 value. +func (fc *FieldContext) Int32() (int32, bool) { + if fc.wireType != wireTypeVarint { + return 0, false + } + return getInt32(fc.intValue) +} + +// Int64 returns int64 value for fc. +// +// False is returned if fc doesn't contain int64 value. +func (fc *FieldContext) Int64() (int64, bool) { + if fc.wireType != wireTypeVarint { + return 0, false + } + return int64(fc.intValue), true +} + +// Uint32 returns uint32 value for fc. +// +// False is returned if fc doesn't contain uint32 value. +func (fc *FieldContext) Uint32() (uint32, bool) { + if fc.wireType != wireTypeVarint { + return 0, false + } + return getUint32(fc.intValue) +} + +// Uint64 returns uint64 value for fc. +// +// False is returned if fc doesn't contain uint64 value. +func (fc *FieldContext) Uint64() (uint64, bool) { + if fc.wireType != wireTypeVarint { + return 0, false + } + return fc.intValue, true +} + +// Sint32 returns sint32 value for fc. +// +// False is returned if fc doesn't contain sint32 value. +func (fc *FieldContext) Sint32() (int32, bool) { + if fc.wireType != wireTypeVarint { + return 0, false + } + u32, ok := getUint32(fc.intValue) + if !ok { + return 0, false + } + i32 := decodeZigZagInt32(u32) + return i32, true +} + +// Sint64 returns sint64 value for fc. +// +// False is returned if fc doesn't contain sint64 value. +func (fc *FieldContext) Sint64() (int64, bool) { + if fc.wireType != wireTypeVarint { + return 0, false + } + i64 := decodeZigZagInt64(fc.intValue) + return i64, true +} + +// Bool returns bool value for fc. +// +// False is returned in the second result if fc doesn't contain bool value. +func (fc *FieldContext) Bool() (bool, bool) { + if fc.wireType != wireTypeVarint { + return false, false + } + return getBool(fc.intValue) +} + +// Fixed64 returns fixed64 value for fc. +// +// False is returned if fc doesn't contain fixed64 value. +func (fc *FieldContext) Fixed64() (uint64, bool) { + if fc.wireType != wireTypeI64 { + return 0, false + } + return fc.intValue, true +} + +// Sfixed64 returns sfixed64 value for fc. +// +// False is returned if fc doesn't contain sfixed64 value. +func (fc *FieldContext) Sfixed64() (int64, bool) { + if fc.wireType != wireTypeI64 { + return 0, false + } + return int64(fc.intValue), true +} + +// Double returns dobule value for fc. +// +// False is returned if fc doesn't contain double value. +func (fc *FieldContext) Double() (float64, bool) { + if fc.wireType != wireTypeI64 { + return 0, false + } + v := math.Float64frombits(fc.intValue) + return v, true +} + +// String returns string value for fc. +// +// The returned string is valid while the underlying buffer isn't changed. +// +// False is returned if fc doesn't contain string value. +func (fc *FieldContext) String() (string, bool) { + if fc.wireType != wireTypeLen { + return "", false + } + s := unsafeBytesToString(fc.data) + return s, true +} + +// Bytes returns bytes value for fc. +// +// The returned byte slice is valid while the underlying buffer isn't changed. +// +// False is returned if fc doesn't contain bytes value. +func (fc *FieldContext) Bytes() ([]byte, bool) { + if fc.wireType != wireTypeLen { + return nil, false + } + return fc.data, true +} + +// MessageData returns protobuf message data for fc. +// +// False is returned if fc doesn't contain message data. +func (fc *FieldContext) MessageData() ([]byte, bool) { + if fc.wireType != wireTypeLen { + return nil, false + } + return fc.data, true +} + +// Fixed32 returns fixed32 value for fc. +// +// False is returned if fc doesn't contain fixed32 value. +func (fc *FieldContext) Fixed32() (uint32, bool) { + if fc.wireType != wireTypeI32 { + return 0, false + } + u32 := mustGetUint32(fc.intValue) + return u32, true +} + +// Sfixed32 returns sfixed32 value for fc. +// +// False is returned if fc doesn't contain sfixed value. +func (fc *FieldContext) Sfixed32() (int32, bool) { + if fc.wireType != wireTypeI32 { + return 0, false + } + i32 := mustGetInt32(fc.intValue) + return i32, true +} + +// Float returns float value for fc. +// +// False is returned if fc doesn't contain float value. +func (fc *FieldContext) Float() (float32, bool) { + if fc.wireType != wireTypeI32 { + return 0, false + } + u32 := mustGetUint32(fc.intValue) + v := math.Float32frombits(u32) + return v, true +} + +// UnpackInt32s unpacks int32 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain int32 values. +func (fc *FieldContext) UnpackInt32s(dst []int32) ([]int32, bool) { + if fc.wireType == wireTypeVarint { + i32, ok := getInt32(fc.intValue) + if !ok { + return dst, false + } + dst = append(dst, i32) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return dstOrig, false + } + src = src[offset:] + i32, ok := getInt32(u64) + if !ok { + return dstOrig, false + } + dst = append(dst, i32) + } + return dst, true +} + +// UnpackInt64s unpacks int64 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain int64 values. +func (fc *FieldContext) UnpackInt64s(dst []int64) ([]int64, bool) { + if fc.wireType == wireTypeVarint { + dst = append(dst, int64(fc.intValue)) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return dstOrig, false + } + src = src[offset:] + dst = append(dst, int64(u64)) + } + return dst, true +} + +// UnpackUint32s unpacks uint32 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain uint32 values. +func (fc *FieldContext) UnpackUint32s(dst []uint32) ([]uint32, bool) { + if fc.wireType == wireTypeVarint { + u32, ok := getUint32(fc.intValue) + if !ok { + return dst, false + } + dst = append(dst, u32) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return dstOrig, false + } + src = src[offset:] + u32, ok := getUint32(u64) + if !ok { + return dstOrig, false + } + dst = append(dst, u32) + } + return dst, true +} + +// UnpackUint64s unpacks uint64 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain uint64 values. +func (fc *FieldContext) UnpackUint64s(dst []uint64) ([]uint64, bool) { + if fc.wireType == wireTypeVarint { + dst = append(dst, fc.intValue) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return dstOrig, false + } + src = src[offset:] + dst = append(dst, u64) + } + return dst, true +} + +// UnpackSint32s unpacks sint32 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain sint32 values. +func (fc *FieldContext) UnpackSint32s(dst []int32) ([]int32, bool) { + if fc.wireType == wireTypeVarint { + u32, ok := getUint32(fc.intValue) + if !ok { + return dst, false + } + i32 := decodeZigZagInt32(u32) + dst = append(dst, i32) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return dstOrig, false + } + src = src[offset:] + u32, ok := getUint32(u64) + if !ok { + return dstOrig, false + } + i32 := decodeZigZagInt32(u32) + dst = append(dst, i32) + } + return dst, true +} + +// UnpackSint64s unpacks sint64 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain sint64 values. +func (fc *FieldContext) UnpackSint64s(dst []int64) ([]int64, bool) { + if fc.wireType == wireTypeVarint { + i64 := decodeZigZagInt64(fc.intValue) + dst = append(dst, i64) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return dstOrig, false + } + src = src[offset:] + i64 := decodeZigZagInt64(u64) + dst = append(dst, i64) + } + return dst, true +} + +// UnpackBools unpacks bool values from fc, appends them to dst and returns the result. +// +// False is returned in the second result if fc doesn't contain bool values. +func (fc *FieldContext) UnpackBools(dst []bool) ([]bool, bool) { + if fc.wireType == wireTypeVarint { + v, ok := getBool(fc.intValue) + if !ok { + return dst, false + } + dst = append(dst, v) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + u64, offset := binary.Uvarint(src) + if offset <= 0 { + return dstOrig, false + } + src = src[offset:] + v, ok := getBool(u64) + if !ok { + return dst, false + } + dst = append(dst, v) + } + return dst, true +} + +// UnpackFixed64s unpacks fixed64 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain fixed64 values. +func (fc *FieldContext) UnpackFixed64s(dst []uint64) ([]uint64, bool) { + if fc.wireType == wireTypeI64 { + u64 := fc.intValue + dst = append(dst, u64) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + if len(src) < 8 { + return dstOrig, false + } + u64 := binary.LittleEndian.Uint64(src) + src = src[8:] + dst = append(dst, u64) + } + return dst, true +} + +// UnpackSfixed64s unpacks sfixed64 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain sfixed64 values. +func (fc *FieldContext) UnpackSfixed64s(dst []int64) ([]int64, bool) { + if fc.wireType == wireTypeI64 { + u64 := fc.intValue + dst = append(dst, int64(u64)) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + if len(src) < 8 { + return dstOrig, false + } + u64 := binary.LittleEndian.Uint64(src) + src = src[8:] + dst = append(dst, int64(u64)) + } + return dst, true +} + +// UnpackDoubles unpacks double values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain double values. +func (fc *FieldContext) UnpackDoubles(dst []float64) ([]float64, bool) { + if fc.wireType == wireTypeI64 { + v := math.Float64frombits(fc.intValue) + dst = append(dst, v) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + if len(src) < 8 { + return dstOrig, false + } + u64 := binary.LittleEndian.Uint64(src) + src = src[8:] + v := math.Float64frombits(u64) + dst = append(dst, v) + } + return dst, true +} + +// UnpackFixed32s unpacks fixed32 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain fixed32 values. +func (fc *FieldContext) UnpackFixed32s(dst []uint32) ([]uint32, bool) { + if fc.wireType == wireTypeI32 { + u32 := mustGetUint32(fc.intValue) + dst = append(dst, u32) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + if len(src) < 4 { + return dstOrig, false + } + u32 := binary.LittleEndian.Uint32(src) + src = src[4:] + dst = append(dst, u32) + } + return dst, true +} + +// UnpackSfixed32s unpacks sfixed32 values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain sfixed32 values. +func (fc *FieldContext) UnpackSfixed32s(dst []int32) ([]int32, bool) { + if fc.wireType == wireTypeI32 { + i32 := mustGetInt32(fc.intValue) + dst = append(dst, i32) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + if len(src) < 4 { + return dstOrig, false + } + u32 := binary.LittleEndian.Uint32(src) + src = src[4:] + dst = append(dst, int32(u32)) + } + return dst, true +} + +// UnpackFloats unpacks float values from fc, appends them to dst and returns the result. +// +// False is returned if fc doesn't contain float values. +func (fc *FieldContext) UnpackFloats(dst []float32) ([]float32, bool) { + if fc.wireType == wireTypeI32 { + u32 := mustGetUint32(fc.intValue) + v := math.Float32frombits(u32) + dst = append(dst, v) + return dst, true + } + if fc.wireType != wireTypeLen { + return dst, false + } + src := fc.data + dstOrig := dst + for len(src) > 0 { + if len(src) < 4 { + return dstOrig, false + } + u32 := binary.LittleEndian.Uint32(src) + src = src[4:] + v := math.Float32frombits(u32) + dst = append(dst, v) + } + return dst, true +} + +func decodeZigZagInt64(u64 uint64) int64 { + return int64(u64>>1) ^ (int64(u64<<63) >> 63) +} + +func decodeZigZagInt32(u32 uint32) int32 { + return int32(u32>>1) ^ (int32(u32<<31) >> 31) +} + +func unsafeBytesToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} + +func getInt32(u64 uint64) (int32, bool) { + u32, ok := getUint32(u64) + if !ok { + return 0, false + } + return int32(u32), true +} + +func getUint32(u64 uint64) (uint32, bool) { + if u64 > math.MaxUint32 { + return 0, false + } + return uint32(u64), true +} + +func mustGetInt32(u64 uint64) int32 { + u32 := mustGetUint32(u64) + return int32(u32) +} + +func mustGetUint32(u64 uint64) uint32 { + u32, ok := getUint32(u64) + if !ok { + panic(fmt.Errorf("BUG: cannot get uint32 from %d", u64)) + } + return u32 +} + +func getBool(u64 uint64) (bool, bool) { + if u64 == 0 { + return false, true + } + if u64 == 1 { + return true, true + } + return false, false +} diff --git a/vendor/github.com/VictoriaMetrics/easyproto/writer.go b/vendor/github.com/VictoriaMetrics/easyproto/writer.go new file mode 100644 index 000000000..6cbc9343e --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/easyproto/writer.go @@ -0,0 +1,718 @@ +package easyproto + +import ( + "encoding/binary" + "math" + "math/bits" + "sync" +) + +// MarshalerPool is a pool of Marshaler structs. +type MarshalerPool struct { + p sync.Pool +} + +// Get obtains a Marshaler from the pool. +// +// The returned Marshaler can be returned to the pool via Put after it is no longer needed. +func (mp *MarshalerPool) Get() *Marshaler { + v := mp.p.Get() + if v == nil { + return &Marshaler{} + } + return v.(*Marshaler) +} + +// Put returns the given m to the pool. +// +// m cannot be used after returning to the pool. +func (mp *MarshalerPool) Put(m *Marshaler) { + m.Reset() + mp.p.Put(m) +} + +// Marshaler helps marshaling arbitrary protobuf messages. +// +// Construct message with Append* functions at MessageMarshaler() and then call Marshal* for marshaling the constructed message. +// +// It is unsafe to use a single Marshaler instance from multiple concurrently running goroutines. +// +// It is recommended re-cycling Marshaler via MarshalerPool in order to reduce memory allocations. +type Marshaler struct { + // mm contains the root MessageMarshaler. + mm *MessageMarshaler + + // buf contains temporary data needed for marshaling the protobuf message. + buf []byte + + // fs contains fields for the currently marshaled message. + fs []field + + // mms contains MessageMarshaler structs for the currently marshaled message. + mms []MessageMarshaler +} + +// MessageMarshaler helps constructing protobuf message for marshaling. +// +// MessageMarshaler must be obtained via Marshaler.MessageMarshaler(). +type MessageMarshaler struct { + // m is the parent Marshaler for the given MessageMarshaler. + m *Marshaler + + // tag contains protobuf message tag for the given MessageMarshaler. + tag uint64 + + // firstFieldIdx contains the index of the first field in the Marshaler.fs, which belongs to MessageMarshaler. + firstFieldIdx int + + // lastFieldIdx is the index of the last field in the Marshaler.fs, which belongs to MessageMarshaler. + lastFieldIdx int +} + +func (mm *MessageMarshaler) reset() { + mm.m = nil + mm.tag = 0 + mm.firstFieldIdx = -1 + mm.lastFieldIdx = -1 +} + +type field struct { + // messageSize is the size of marshaled protobuf message for the given field. + messageSize uint64 + + // dataStart is the start offset of field data at Marshaler.buf. + dataStart int + + // dataEnd is the end offset of field data at Marshaler.buf. + dataEnd int + + // nextFieldIdx contains an index of the next field in Marshaler.fs. + nextFieldIdx int + + // childMessageMarshalerIdx contains an index of child MessageMarshaler in Marshaler.mms. + childMessageMarshalerIdx int +} + +func (f *field) reset() { + f.messageSize = 0 + f.dataStart = 0 + f.dataEnd = 0 + f.nextFieldIdx = -1 + f.childMessageMarshalerIdx = -1 +} + +// Reset resets m, so it can be re-used. +func (m *Marshaler) Reset() { + m.mm = nil + m.buf = m.buf[:0] + + // There is no need in resetting individual fields, since they are reset in newFieldIndex() + m.fs = m.fs[:0] + + // There is no need in resetting individual MessageMarshaler items, since they are reset in newMessageMarshalerIndex() + m.mms = m.mms[:0] +} + +// MarshalWithLen marshals m, appends its length together with the marshaled m to dst and returns the result. +// +// E.g. appends length-delimited protobuf message to dst. +// The length of the resulting message can be read via UnmarshalMessageLen() function. +// +// See also Marshal. +func (m *Marshaler) MarshalWithLen(dst []byte) []byte { + if m.mm == nil { + dst = marshalVarUint64(dst, 0) + return dst + } + if firstFieldIdx := m.mm.firstFieldIdx; firstFieldIdx >= 0 { + f := &m.fs[firstFieldIdx] + messageSize := f.initMessageSize(m) + if cap(dst) == 0 { + dst = make([]byte, messageSize+10) + dst = dst[:0] + } + dst = marshalVarUint64(dst, messageSize) + dst = f.marshal(dst, m) + } + return dst +} + +// Marshal appends marshaled protobuf m to dst and returns the result. +// +// The marshaled message can be read via FieldContext.NextField(). +// +// See also MarshalWithLen. +func (m *Marshaler) Marshal(dst []byte) []byte { + if m.mm == nil { + // Nothing to marshal + return dst + } + if firstFieldIdx := m.mm.firstFieldIdx; firstFieldIdx >= 0 { + f := &m.fs[firstFieldIdx] + messageSize := f.initMessageSize(m) + if cap(dst) == 0 { + dst = make([]byte, messageSize) + dst = dst[:0] + } + dst = f.marshal(dst, m) + } + return dst +} + +// MessageMarshaler returns message marshaler for the given m. +func (m *Marshaler) MessageMarshaler() *MessageMarshaler { + if mm := m.mm; mm != nil { + return mm + } + idx := m.newMessageMarshalerIndex() + mm := &m.mms[idx] + m.mm = mm + return mm +} + +func (m *Marshaler) newMessageMarshalerIndex() int { + mms := m.mms + mmsLen := len(mms) + if cap(mms) > mmsLen { + mms = mms[:mmsLen+1] + } else { + mms = append(mms, MessageMarshaler{}) + } + m.mms = mms + mm := &mms[mmsLen] + mm.reset() + mm.m = m + return mmsLen +} + +func (m *Marshaler) newFieldIndex() int { + fs := m.fs + fsLen := len(fs) + if cap(fs) > fsLen { + fs = fs[:fsLen+1] + } else { + fs = append(fs, field{}) + } + m.fs = fs + fs[fsLen].reset() + return fsLen +} + +// AppendInt32 appends the given int32 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendInt32(fieldNum uint32, i32 int32) { + mm.AppendUint64(fieldNum, uint64(uint32(i32))) +} + +// AppendInt64 appends the given int64 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendInt64(fieldNum uint32, i64 int64) { + mm.AppendUint64(fieldNum, uint64(i64)) +} + +// AppendUint32 appends the given uint32 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendUint32(fieldNum, u32 uint32) { + mm.AppendUint64(fieldNum, uint64(u32)) +} + +// AppendUint64 appends the given uint64 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendUint64(fieldNum uint32, u64 uint64) { + tag := makeTag(fieldNum, wireTypeVarint) + + m := mm.m + dst := m.buf + dstLen := len(dst) + if tag < 0x80 { + dst = append(dst, byte(tag)) + } else { + dst = marshalVarUint64(dst, tag) + } + dst = marshalVarUint64(dst, u64) + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +// AppendSint32 appends the given sint32 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSint32(fieldNum uint32, i32 int32) { + u64 := uint64(encodeZigZagInt32(i32)) + mm.AppendUint64(fieldNum, u64) +} + +// AppendSint64 appends the given sint64 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSint64(fieldNum uint32, i64 int64) { + u64 := encodeZigZagInt64(i64) + mm.AppendUint64(fieldNum, u64) +} + +// AppendBool appends the given bool value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendBool(fieldNum uint32, v bool) { + u64 := uint64(0) + if v { + u64 = 1 + } + mm.AppendUint64(fieldNum, u64) +} + +// AppendFixed64 appends fixed64 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendFixed64(fieldNum uint32, u64 uint64) { + tag := makeTag(fieldNum, wireTypeI64) + + m := mm.m + dst := m.buf + dstLen := len(dst) + if tag < 0x80 { + dst = append(dst, byte(tag)) + } else { + dst = marshalVarUint64(dst, tag) + } + dst = marshalUint64(dst, u64) + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +// AppendSfixed64 appends sfixed64 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSfixed64(fieldNum uint32, i64 int64) { + mm.AppendFixed64(fieldNum, uint64(i64)) +} + +// AppendDouble appends double value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendDouble(fieldNum uint32, f float64) { + u64 := math.Float64bits(f) + mm.AppendFixed64(fieldNum, u64) +} + +// AppendString appends string value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendString(fieldNum uint32, s string) { + tag := makeTag(fieldNum, wireTypeLen) + + m := mm.m + dst := m.buf + dstLen := len(dst) + sLen := len(s) + if tag < 0x80 && sLen < 0x80 { + dst = append(dst, byte(tag), byte(sLen)) + } else { + dst = marshalVarUint64(dst, tag) + dst = marshalVarUint64(dst, uint64(sLen)) + } + dst = append(dst, s...) + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +// AppendBytes appends bytes value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendBytes(fieldNum uint32, b []byte) { + s := unsafeBytesToString(b) + mm.AppendString(fieldNum, s) +} + +// AppendMessage appends protobuf message with the given fieldNum to m. +// +// The function returns the MessageMarshaler for constructing the appended message. +func (mm *MessageMarshaler) AppendMessage(fieldNum uint32) *MessageMarshaler { + tag := makeTag(fieldNum, wireTypeLen) + + f := mm.newField() + m := mm.m + f.childMessageMarshalerIdx = m.newMessageMarshalerIndex() + mmChild := &m.mms[f.childMessageMarshalerIdx] + mmChild.tag = tag + return mmChild +} + +// AppendFixed32 appends fixed32 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendFixed32(fieldNum, u32 uint32) { + tag := makeTag(fieldNum, wireTypeI32) + + m := mm.m + dst := m.buf + dstLen := len(dst) + if tag < 0x80 { + dst = append(dst, byte(tag)) + } else { + dst = marshalVarUint64(dst, tag) + } + dst = marshalUint32(dst, u32) + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +// AppendSfixed32 appends sfixed32 value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSfixed32(fieldNum uint32, i32 int32) { + mm.AppendFixed32(fieldNum, uint32(i32)) +} + +// AppendFloat appends float value under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendFloat(fieldNum uint32, f float32) { + u32 := math.Float32bits(f) + mm.AppendFixed32(fieldNum, u32) +} + +// AppendInt32s appends the given int32 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendInt32s(fieldNum uint32, i32s []int32) { + child := mm.AppendMessage(fieldNum) + child.appendInt32s(i32s) +} + +// AppendInt64s appends the given int64 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendInt64s(fieldNum uint32, i64s []int64) { + child := mm.AppendMessage(fieldNum) + child.appendInt64s(i64s) +} + +// AppendUint32s appends the given uint32 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendUint32s(fieldNum uint32, u32s []uint32) { + child := mm.AppendMessage(fieldNum) + child.appendUint32s(u32s) +} + +// AppendUint64s appends the given uint64 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendUint64s(fieldNum uint32, u64s []uint64) { + child := mm.AppendMessage(fieldNum) + child.appendUint64s(u64s) +} + +// AppendSint32s appends the given sint32 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSint32s(fieldNum uint32, i32s []int32) { + child := mm.AppendMessage(fieldNum) + child.appendSint32s(i32s) +} + +// AppendSint64s appends the given sint64 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSint64s(fieldNum uint32, i64s []int64) { + child := mm.AppendMessage(fieldNum) + child.appendSint64s(i64s) +} + +// AppendBools appends the given bool values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendBools(fieldNum uint32, bs []bool) { + child := mm.AppendMessage(fieldNum) + child.appendBools(bs) +} + +// AppendFixed64s appends the given fixed64 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendFixed64s(fieldNum uint32, u64s []uint64) { + child := mm.AppendMessage(fieldNum) + child.appendFixed64s(u64s) +} + +// AppendSfixed64s appends the given sfixed64 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSfixed64s(fieldNum uint32, i64s []int64) { + child := mm.AppendMessage(fieldNum) + child.appendSfixed64s(i64s) +} + +// AppendDoubles appends the given double values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendDoubles(fieldNum uint32, fs []float64) { + child := mm.AppendMessage(fieldNum) + child.appendDoubles(fs) +} + +// AppendFixed32s appends the given fixed32 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendFixed32s(fieldNum uint32, u32s []uint32) { + child := mm.AppendMessage(fieldNum) + child.appendFixed32s(u32s) +} + +// AppendSfixed32s appends the given sfixed32 values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendSfixed32s(fieldNum uint32, i32s []int32) { + child := mm.AppendMessage(fieldNum) + child.appendSfixed32s(i32s) +} + +// AppendFloats appends the given float values under the given fieldNum to mm. +func (mm *MessageMarshaler) AppendFloats(fieldNum uint32, fs []float32) { + child := mm.AppendMessage(fieldNum) + child.appendFloats(fs) +} + +func (mm *MessageMarshaler) appendInt32s(i32s []int32) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, i32 := range i32s { + dst = marshalVarUint64(dst, uint64(uint32(i32))) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendUint32s(u32s []uint32) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, u32 := range u32s { + dst = marshalVarUint64(dst, uint64(u32)) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendSint32s(i32s []int32) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, i32 := range i32s { + u64 := uint64(encodeZigZagInt32(i32)) + dst = marshalVarUint64(dst, u64) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendInt64s(i64s []int64) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, i64 := range i64s { + dst = marshalVarUint64(dst, uint64(i64)) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendUint64s(u64s []uint64) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, u64 := range u64s { + dst = marshalVarUint64(dst, u64) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendSint64s(i64s []int64) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, i64 := range i64s { + u64 := encodeZigZagInt64(i64) + dst = marshalVarUint64(dst, u64) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendBools(bs []bool) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, b := range bs { + u64 := uint64(0) + if b { + u64 = 1 + } + dst = marshalVarUint64(dst, u64) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendFixed64s(u64s []uint64) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, u64 := range u64s { + dst = marshalUint64(dst, u64) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendSfixed64s(i64s []int64) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, i64 := range i64s { + dst = marshalUint64(dst, uint64(i64)) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendFixed32s(u32s []uint32) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, u32 := range u32s { + dst = marshalUint32(dst, u32) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendSfixed32s(i32s []int32) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, i32 := range i32s { + dst = marshalUint32(dst, uint32(i32)) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendDoubles(fs []float64) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, f := range fs { + u64 := math.Float64bits(f) + dst = marshalUint64(dst, u64) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendFloats(fs []float32) { + m := mm.m + dst := m.buf + dstLen := len(dst) + for _, f := range fs { + u32 := math.Float32bits(f) + dst = marshalUint32(dst, u32) + } + m.buf = dst + + mm.appendField(m, dstLen, len(dst)) +} + +func (mm *MessageMarshaler) appendField(m *Marshaler, dataStart, dataEnd int) { + if lastFieldIdx := mm.lastFieldIdx; lastFieldIdx >= 0 { + if f := &m.fs[lastFieldIdx]; f.childMessageMarshalerIdx == -1 && f.dataEnd == dataStart { + f.dataEnd = dataEnd + return + } + } + f := mm.newField() + f.dataStart = dataStart + f.dataEnd = dataEnd +} + +func (mm *MessageMarshaler) newField() *field { + m := mm.m + idx := m.newFieldIndex() + f := &m.fs[idx] + if lastFieldIdx := mm.lastFieldIdx; lastFieldIdx >= 0 { + m.fs[lastFieldIdx].nextFieldIdx = idx + } else { + mm.firstFieldIdx = idx + } + mm.lastFieldIdx = idx + return f +} + +func (f *field) initMessageSize(m *Marshaler) uint64 { + n := uint64(0) + for { + if childMessageMarshalerIdx := f.childMessageMarshalerIdx; childMessageMarshalerIdx < 0 { + n += uint64(f.dataEnd - f.dataStart) + } else { + mmChild := m.mms[childMessageMarshalerIdx] + if tag := mmChild.tag; tag < 0x80 { + n++ + } else { + n += varuintLen(tag) + } + messageSize := uint64(0) + if firstFieldIdx := mmChild.firstFieldIdx; firstFieldIdx >= 0 { + messageSize = m.fs[firstFieldIdx].initMessageSize(m) + } + n += messageSize + if messageSize < 0x80 { + n++ + } else { + n += varuintLen(messageSize) + } + f.messageSize = messageSize + } + nextFieldIdx := f.nextFieldIdx + if nextFieldIdx < 0 { + return n + } + f = &m.fs[nextFieldIdx] + } +} + +func (f *field) marshal(dst []byte, m *Marshaler) []byte { + for { + if childMessageMarshalerIdx := f.childMessageMarshalerIdx; childMessageMarshalerIdx < 0 { + data := m.buf[f.dataStart:f.dataEnd] + dst = append(dst, data...) + } else { + mmChild := m.mms[childMessageMarshalerIdx] + tag := mmChild.tag + messageSize := f.messageSize + if tag < 0x80 && messageSize < 0x80 { + dst = append(dst, byte(tag), byte(messageSize)) + } else { + dst = marshalVarUint64(dst, mmChild.tag) + dst = marshalVarUint64(dst, f.messageSize) + } + if firstFieldIdx := mmChild.firstFieldIdx; firstFieldIdx >= 0 { + dst = m.fs[firstFieldIdx].marshal(dst, m) + } + } + nextFieldIdx := f.nextFieldIdx + if nextFieldIdx < 0 { + return dst + } + f = &m.fs[nextFieldIdx] + } +} + +func marshalUint64(dst []byte, u64 uint64) []byte { + return binary.LittleEndian.AppendUint64(dst, u64) +} + +func marshalUint32(dst []byte, u32 uint32) []byte { + return binary.LittleEndian.AppendUint32(dst, u32) +} + +func marshalVarUint64(dst []byte, u64 uint64) []byte { + if u64 < 0x80 { + // Fast path + dst = append(dst, byte(u64)) + return dst + } + for u64 > 0x7f { + dst = append(dst, 0x80|byte(u64)) + u64 >>= 7 + } + dst = append(dst, byte(u64)) + return dst +} + +func encodeZigZagInt64(i64 int64) uint64 { + return uint64((i64 << 1) ^ (i64 >> 63)) +} + +func encodeZigZagInt32(i32 int32) uint32 { + return uint32((i32 << 1) ^ (i32 >> 31)) +} + +func makeTag(fieldNum uint32, wt wireType) uint64 { + return (uint64(fieldNum) << 3) | uint64(wt) +} + +// varuintLen returns the number of bytes needed for varuint-encoding of u64. +// +// Note that it returns 0 for u64=0, so this case must be handled separately. +func varuintLen(u64 uint64) uint64 { + return uint64(((byte(bits.Len64(u64))) + 6) / 7) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 39ac4c546..895a55cfa 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -89,6 +89,9 @@ github.com/AzureAD/microsoft-authentication-library-for-go/apps/internal/options github.com/AzureAD/microsoft-authentication-library-for-go/apps/internal/shared github.com/AzureAD/microsoft-authentication-library-for-go/apps/internal/version github.com/AzureAD/microsoft-authentication-library-for-go/apps/public +# github.com/VictoriaMetrics/easyproto v0.1.3 +## explicit; go 1.18 +github.com/VictoriaMetrics/easyproto # github.com/VictoriaMetrics/fastcache v1.12.2 ## explicit; go 1.13 github.com/VictoriaMetrics/fastcache