From 04d13f6149ce6ed54a7228eee2bd0e0c2d8763a5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 29 Feb 2024 14:38:21 +0200 Subject: [PATCH] app/{vminsert,vmagent}: follow-up after 67a55b89a488e47451438ddc73331cafb38c01ea - Document the ability to read OpenTelemetry data from Amazon Firehose at docs/CHANGELOG.md - Simplify parsing Firehose data. There is no need in trying to optimize the parsing with fastjson and byte slice tricks, since OpenTelemetry protocol is really slooow because of over-engineering. It is better to write clear code for better maintanability in the future. - Move Firehose parser from /lib/protoparser/firehose to lib/protoparser/opentelemetry/firehose, since it is used only by opentelemetry parser. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5893 --- app/vmagent/opentelemetry/request_handler.go | 4 +- app/vminsert/opentelemetry/request_handler.go | 4 +- docs/CHANGELOG.md | 1 + lib/protoparser/firehose/parser.go | 91 ------------------- .../opentelemetry/firehose/parser.go | 39 ++++++++ .../firehose/parser_test.go | 7 +- .../opentelemetry/stream/streamparser.go | 10 +- 7 files changed, 55 insertions(+), 101 deletions(-) delete mode 100644 lib/protoparser/firehose/parser.go create mode 100644 lib/protoparser/opentelemetry/firehose/parser.go rename lib/protoparser/{ => opentelemetry}/firehose/parser_test.go (55%) diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index 5809b8c13..09f6bf080 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -9,7 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -28,7 +28,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" - var processBody func(*[]byte) error + var processBody func([]byte) ([]byte, error) if req.Header.Get("Content-Type") == "application/json" { if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" { processBody = firehose.ProcessRequestBody diff --git a/app/vminsert/opentelemetry/request_handler.go b/app/vminsert/opentelemetry/request_handler.go index d9c1786b2..72560108e 100644 --- a/app/vminsert/opentelemetry/request_handler.go +++ b/app/vminsert/opentelemetry/request_handler.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" "github.com/VictoriaMetrics/metrics" ) @@ -25,7 +25,7 @@ func InsertHandler(req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" - var processBody func(*[]byte) error + var processBody func([]byte) ([]byte, error) if req.Header.Get("Content-Type") == "application/json" { if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" { processBody = firehose.ProcessRequestBody diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bb395d113..8075ae52c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -36,6 +36,7 @@ See also [LTS releases](https://docs.victoriametrics.com/LTS-releases.html). * FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): expose `vm_snapshots` m[metric](https://docs.victoriametrics.com/#monitoring), which shows the current number of snapshots created via [snapshot API](https://docs.victoriametrics.com/#how-to-work-with-snapshots). * FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag, which can be used for reducing load on VictoriaMetrics when [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels), [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) or [/api/v1/series](https://docs.victoriametrics.com/url-examples/#apiv1series) are queried with too broad [`extra_filters` or `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements), which match many time series. See [these docs](https://docs.victoriametrics.com/#resource-usage-limits) for details. * FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): allow limiting CPU and RAM usage at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels), [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) and [/api/v1/series](https://docs.victoriametrics.com/url-examples/#apiv1series) on systems with [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate) via `-search.maxLabelsAPIDuration` and `-search.maxLabelsAPISeries` command-line flags. See [these docs](https://docs.victoriametrics.com/#resource-usage-limits) for details. +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support reading [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/) metrics in [OpenTelemetry](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) format from [Amazon Data Firehose](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for `client_id` option into [kuma_sd_configs](https://docs.victoriametrics.com/sd_configs/#kuma_sd_configs) in the same way as Prometheus does. See [this pull request](https://github.com/prometheus/prometheus/pull/13278). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for `enable_compression` option in [scrape_configs](https://docs.victoriametrics.com/sd_configs/#scrape_configs) in order to be compatible with Prometheus scrape configs. See [this pull request](https://github.com/prometheus/prometheus/pull/13166) and [this feature request](https://github.com/prometheus/prometheus/issues/12319). Note that `vmagent` was always supporting [`disable_compression` option](https://docs.victoriametrics.com/vmagent/#scrape_config-enhancements) before Prometheus added `enable_compression` option. * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol) and [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for pull requests [1](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5783), [2](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5798), [3](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5797). diff --git a/lib/protoparser/firehose/parser.go b/lib/protoparser/firehose/parser.go deleted file mode 100644 index 48c7d6f4c..000000000 --- a/lib/protoparser/firehose/parser.go +++ /dev/null @@ -1,91 +0,0 @@ -package firehose - -import ( - "encoding/base64" - "fmt" - "github.com/valyala/fastjson" -) - -// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP body delivered via Firehose into protobuf message. -func ProcessRequestBody(b *[]byte) error { - return unmarshalRequest(b) -} - -var jsonParserPool fastjson.ParserPool - -// unmarshalRequest extracts and decodes b64 data from Firehose HTTP destination request -// -// { -// "requestId": "", -// "timestamp": , -// "records": [ -// { -// "data": "" -// } -// ] -// } -func unmarshalRequest(b *[]byte) error { - p := jsonParserPool.Get() - defer jsonParserPool.Put(p) - - v, err := p.ParseBytes(*b) - if err != nil { - return err - } - o, err := v.Object() - if err != nil { - return fmt.Errorf("cannot find Firehose Request objects: %w", err) - } - index := 0 - o.Visit(func(k []byte, v *fastjson.Value) { - if err != nil { - return - } - switch string(k) { - case "records": - recordObjects, errLocal := v.Array() - if errLocal != nil { - err = fmt.Errorf("cannot find Records array in Firehose Request object: %w", errLocal) - return - } - for _, fr := range recordObjects { - recordObject, errLocal := fr.Object() - if errLocal != nil { - err = fmt.Errorf("cannot find Record object: %w", errLocal) - return - } - if errLocal := unmarshalRecord(b, &index, recordObject); errLocal != nil { - err = fmt.Errorf("cannot unmarshal Record object: %w", errLocal) - return - } - } - } - }) - *b = (*b)[:index] - if err != nil { - return fmt.Errorf("cannot parse Firehose Request object: %w", err) - } - return nil -} - -func unmarshalRecord(b *[]byte, index *int, o *fastjson.Object) error { - var err error - var inc int - o.Visit(func(k []byte, v *fastjson.Value) { - if v.Type() != fastjson.TypeString { - err = fmt.Errorf("invalid record data type, %q", v.Type()) - return - } - valueBytes := v.GetStringBytes() - if len(valueBytes) == 0 { - return - } - inc, err = base64.StdEncoding.Decode((*b)[*index:], valueBytes) - if err != nil { - err = fmt.Errorf("failed to decode and append Firehose Record data: %w", err) - return - } - *index = *index + inc - }) - return err -} diff --git a/lib/protoparser/opentelemetry/firehose/parser.go b/lib/protoparser/opentelemetry/firehose/parser.go new file mode 100644 index 000000000..7e7fb6200 --- /dev/null +++ b/lib/protoparser/opentelemetry/firehose/parser.go @@ -0,0 +1,39 @@ +package firehose + +import ( + "encoding/json" + "fmt" +) + +// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP request body delivered via Firehose into OpenTelemetry protobuf message. +// +// See https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html +// +// It joins decoded "data" fields from "record" list: +// +// { +// "requestId": "", +// "timestamp": , +// "records": [ +// { +// "data": "" +// } +// ] +// } +func ProcessRequestBody(b []byte) ([]byte, error) { + var req struct { + Records []struct { + Data []byte + } + } + if err := json.Unmarshal(b, &req); err != nil { + return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %s", err) + } + + var dst []byte + for _, r := range req.Records { + dst = append(dst, r.Data...) + } + + return dst, nil +} diff --git a/lib/protoparser/firehose/parser_test.go b/lib/protoparser/opentelemetry/firehose/parser_test.go similarity index 55% rename from lib/protoparser/firehose/parser_test.go rename to lib/protoparser/opentelemetry/firehose/parser_test.go index 647f927df..fdede8568 100644 --- a/lib/protoparser/firehose/parser_test.go +++ b/lib/protoparser/opentelemetry/firehose/parser_test.go @@ -6,11 +6,12 @@ import ( func TestProcessRequestBody(t *testing.T) { data := []byte(`{"records": [{"data": "SGVsbG8sIA=="}, {"data": "d29ybGQh"}]}`) - err := ProcessRequestBody(&data) + result, err := ProcessRequestBody(data) if err != nil { t.Fatalf("unexpected error: %v", err) } - if string(data) != "Hello, world!" { - t.Fatalf("unexpected string: %q", string(data)) + resultExpected := "Hello, world!" + if string(result) != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) } } diff --git a/lib/protoparser/opentelemetry/stream/streamparser.go b/lib/protoparser/opentelemetry/stream/streamparser.go index 30bd0e1dd..d0b18c7ce 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser.go +++ b/lib/protoparser/opentelemetry/stream/streamparser.go @@ -21,7 +21,9 @@ import ( // ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows. // // callback shouldn't hold tss items after returning. -func ParseStream(r io.Reader, isGzipped bool, processBody func(*[]byte) error, callback func(tss []prompbmarshal.TimeSeries) error) error { +// +// optional processBody can be used for pre-processing the read request body from r before parsing it in OpenTelemetry format. +func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -257,15 +259,17 @@ func resetLabels(labels []prompbmarshal.Label) []prompbmarshal.Label { return labels[:0] } -func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func(*[]byte) error) (*pb.ExportMetricsServiceRequest, error) { +func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func([]byte) ([]byte, error)) (*pb.ExportMetricsServiceRequest, error) { if _, err := wr.bb.ReadFrom(r); err != nil { return nil, fmt.Errorf("cannot read request: %w", err) } var req pb.ExportMetricsServiceRequest if processBody != nil { - if err := processBody(&wr.bb.B); err != nil { + data, err := processBody(wr.bb.B) + if err != nil { return nil, fmt.Errorf("cannot process request body: %w", err) } + wr.bb.B = append(wr.bb.B[:0], data...) } if err := req.UnmarshalProtobuf(wr.bb.B); err != nil { return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err)