app/{vminsert,vmagent}: follow-up after 67a55b89a4

- Document the ability to read OpenTelemetry data from Amazon Firehose at docs/CHANGELOG.md

- Simplify parsing Firehose data. There is no need in trying to optimize the parsing with fastjson
  and byte slice tricks, since OpenTelemetry protocol is really slooow because of over-engineering.
  It is better to write clear code for better maintanability in the future.

- Move Firehose parser from /lib/protoparser/firehose to lib/protoparser/opentelemetry/firehose,
  since it is used only by opentelemetry parser.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5893
This commit is contained in:
Aliaksandr Valialkin 2024-02-29 14:38:21 +02:00
parent 3723c809a1
commit 04d13f6149
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 55 additions and 101 deletions

View file

@ -9,7 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
@ -28,7 +28,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func(*[]byte) error
var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" {
if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody

View file

@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/metrics"
)
@ -25,7 +25,7 @@ func InsertHandler(req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func(*[]byte) error
var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" {
if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody

View file

@ -36,6 +36,7 @@ See also [LTS releases](https://docs.victoriametrics.com/LTS-releases.html).
* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): expose `vm_snapshots` m[metric](https://docs.victoriametrics.com/#monitoring), which shows the current number of snapshots created via [snapshot API](https://docs.victoriametrics.com/#how-to-work-with-snapshots).
* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag, which can be used for reducing load on VictoriaMetrics when [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels), [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) or [/api/v1/series](https://docs.victoriametrics.com/url-examples/#apiv1series) are queried with too broad [`extra_filters` or `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements), which match many time series. See [these docs](https://docs.victoriametrics.com/#resource-usage-limits) for details.
* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): allow limiting CPU and RAM usage at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels), [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) and [/api/v1/series](https://docs.victoriametrics.com/url-examples/#apiv1series) on systems with [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate) via `-search.maxLabelsAPIDuration` and `-search.maxLabelsAPISeries` command-line flags. See [these docs](https://docs.victoriametrics.com/#resource-usage-limits) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support reading [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/) metrics in [OpenTelemetry](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) format from [Amazon Data Firehose](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for `client_id` option into [kuma_sd_configs](https://docs.victoriametrics.com/sd_configs/#kuma_sd_configs) in the same way as Prometheus does. See [this pull request](https://github.com/prometheus/prometheus/pull/13278).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for `enable_compression` option in [scrape_configs](https://docs.victoriametrics.com/sd_configs/#scrape_configs) in order to be compatible with Prometheus scrape configs. See [this pull request](https://github.com/prometheus/prometheus/pull/13166) and [this feature request](https://github.com/prometheus/prometheus/issues/12319). Note that `vmagent` was always supporting [`disable_compression` option](https://docs.victoriametrics.com/vmagent/#scrape_config-enhancements) before Prometheus added `enable_compression` option.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol) and [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for pull requests [1](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5783), [2](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5798), [3](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5797).

View file

@ -1,91 +0,0 @@
package firehose
import (
"encoding/base64"
"fmt"
"github.com/valyala/fastjson"
)
// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP body delivered via Firehose into protobuf message.
func ProcessRequestBody(b *[]byte) error {
return unmarshalRequest(b)
}
var jsonParserPool fastjson.ParserPool
// unmarshalRequest extracts and decodes b64 data from Firehose HTTP destination request
//
// {
// "requestId": "<uuid-string>",
// "timestamp": <int64-value>,
// "records": [
// {
// "data": "<base64-encoded-payload>"
// }
// ]
// }
func unmarshalRequest(b *[]byte) error {
p := jsonParserPool.Get()
defer jsonParserPool.Put(p)
v, err := p.ParseBytes(*b)
if err != nil {
return err
}
o, err := v.Object()
if err != nil {
return fmt.Errorf("cannot find Firehose Request objects: %w", err)
}
index := 0
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
switch string(k) {
case "records":
recordObjects, errLocal := v.Array()
if errLocal != nil {
err = fmt.Errorf("cannot find Records array in Firehose Request object: %w", errLocal)
return
}
for _, fr := range recordObjects {
recordObject, errLocal := fr.Object()
if errLocal != nil {
err = fmt.Errorf("cannot find Record object: %w", errLocal)
return
}
if errLocal := unmarshalRecord(b, &index, recordObject); errLocal != nil {
err = fmt.Errorf("cannot unmarshal Record object: %w", errLocal)
return
}
}
}
})
*b = (*b)[:index]
if err != nil {
return fmt.Errorf("cannot parse Firehose Request object: %w", err)
}
return nil
}
func unmarshalRecord(b *[]byte, index *int, o *fastjson.Object) error {
var err error
var inc int
o.Visit(func(k []byte, v *fastjson.Value) {
if v.Type() != fastjson.TypeString {
err = fmt.Errorf("invalid record data type, %q", v.Type())
return
}
valueBytes := v.GetStringBytes()
if len(valueBytes) == 0 {
return
}
inc, err = base64.StdEncoding.Decode((*b)[*index:], valueBytes)
if err != nil {
err = fmt.Errorf("failed to decode and append Firehose Record data: %w", err)
return
}
*index = *index + inc
})
return err
}

View file

@ -0,0 +1,39 @@
package firehose
import (
"encoding/json"
"fmt"
)
// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP request body delivered via Firehose into OpenTelemetry protobuf message.
//
// See https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html
//
// It joins decoded "data" fields from "record" list:
//
// {
// "requestId": "<uuid-string>",
// "timestamp": <int64-value>,
// "records": [
// {
// "data": "<base64-encoded-payload>"
// }
// ]
// }
func ProcessRequestBody(b []byte) ([]byte, error) {
var req struct {
Records []struct {
Data []byte
}
}
if err := json.Unmarshal(b, &req); err != nil {
return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %s", err)
}
var dst []byte
for _, r := range req.Records {
dst = append(dst, r.Data...)
}
return dst, nil
}

View file

@ -6,11 +6,12 @@ import (
func TestProcessRequestBody(t *testing.T) {
data := []byte(`{"records": [{"data": "SGVsbG8sIA=="}, {"data": "d29ybGQh"}]}`)
err := ProcessRequestBody(&data)
result, err := ProcessRequestBody(data)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if string(data) != "Hello, world!" {
t.Fatalf("unexpected string: %q", string(data))
resultExpected := "Hello, world!"
if string(result) != resultExpected {
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
}
}

View file

@ -21,7 +21,9 @@ import (
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
//
// callback shouldn't hold tss items after returning.
func ParseStream(r io.Reader, isGzipped bool, processBody func(*[]byte) error, callback func(tss []prompbmarshal.TimeSeries) error) error {
//
// optional processBody can be used for pre-processing the read request body from r before parsing it in OpenTelemetry format.
func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
@ -257,15 +259,17 @@ func resetLabels(labels []prompbmarshal.Label) []prompbmarshal.Label {
return labels[:0]
}
func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func(*[]byte) error) (*pb.ExportMetricsServiceRequest, error) {
func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func([]byte) ([]byte, error)) (*pb.ExportMetricsServiceRequest, error) {
if _, err := wr.bb.ReadFrom(r); err != nil {
return nil, fmt.Errorf("cannot read request: %w", err)
}
var req pb.ExportMetricsServiceRequest
if processBody != nil {
if err := processBody(&wr.bb.B); err != nil {
data, err := processBody(wr.bb.B)
if err != nil {
return nil, fmt.Errorf("cannot process request body: %w", err)
}
wr.bb.B = append(wr.bb.B[:0], data...)
}
if err := req.UnmarshalProtobuf(wr.bb.B); err != nil {
return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err)