diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index e923f750c..5809b8c13 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -27,10 +28,15 @@ func InsertHandler(at *auth.Token, req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" + var processBody func(*[]byte) error if req.Header.Get("Content-Type") == "application/json" { - return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" { + processBody = firehose.ProcessRequestBody + } else { + return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + } } - return stream.ParseStream(req.Body, isGzipped, func(tss []prompbmarshal.TimeSeries) error { + return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(at, tss, extraLabels) }) } diff --git a/app/vminsert/opentelemetry/request_handler.go b/app/vminsert/opentelemetry/request_handler.go index 165fa275c..d9c1786b2 100644 --- a/app/vminsert/opentelemetry/request_handler.go +++ b/app/vminsert/opentelemetry/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" "github.com/VictoriaMetrics/metrics" ) @@ -24,10 +25,15 @@ func InsertHandler(req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" + var processBody func(*[]byte) error if req.Header.Get("Content-Type") == "application/json" { - return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" { + processBody = firehose.ProcessRequestBody + } else { + return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + } } - return stream.ParseStream(req.Body, isGzipped, func(tss []prompbmarshal.TimeSeries) error { + return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(tss, extraLabels) }) } diff --git a/lib/protoparser/firehose/parser.go b/lib/protoparser/firehose/parser.go new file mode 100644 index 000000000..48c7d6f4c --- /dev/null +++ b/lib/protoparser/firehose/parser.go @@ -0,0 +1,91 @@ +package firehose + +import ( + "encoding/base64" + "fmt" + "github.com/valyala/fastjson" +) + +// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP body delivered via Firehose into protobuf message. +func ProcessRequestBody(b *[]byte) error { + return unmarshalRequest(b) +} + +var jsonParserPool fastjson.ParserPool + +// unmarshalRequest extracts and decodes b64 data from Firehose HTTP destination request +// +// { +// "requestId": "", +// "timestamp": , +// "records": [ +// { +// "data": "" +// } +// ] +// } +func unmarshalRequest(b *[]byte) error { + p := jsonParserPool.Get() + defer jsonParserPool.Put(p) + + v, err := p.ParseBytes(*b) + if err != nil { + return err + } + o, err := v.Object() + if err != nil { + return fmt.Errorf("cannot find Firehose Request objects: %w", err) + } + index := 0 + o.Visit(func(k []byte, v *fastjson.Value) { + if err != nil { + return + } + switch string(k) { + case "records": + recordObjects, errLocal := v.Array() + if errLocal != nil { + err = fmt.Errorf("cannot find Records array in Firehose Request object: %w", errLocal) + return + } + for _, fr := range recordObjects { + recordObject, errLocal := fr.Object() + if errLocal != nil { + err = fmt.Errorf("cannot find Record object: %w", errLocal) + return + } + if errLocal := unmarshalRecord(b, &index, recordObject); errLocal != nil { + err = fmt.Errorf("cannot unmarshal Record object: %w", errLocal) + return + } + } + } + }) + *b = (*b)[:index] + if err != nil { + return fmt.Errorf("cannot parse Firehose Request object: %w", err) + } + return nil +} + +func unmarshalRecord(b *[]byte, index *int, o *fastjson.Object) error { + var err error + var inc int + o.Visit(func(k []byte, v *fastjson.Value) { + if v.Type() != fastjson.TypeString { + err = fmt.Errorf("invalid record data type, %q", v.Type()) + return + } + valueBytes := v.GetStringBytes() + if len(valueBytes) == 0 { + return + } + inc, err = base64.StdEncoding.Decode((*b)[*index:], valueBytes) + if err != nil { + err = fmt.Errorf("failed to decode and append Firehose Record data: %w", err) + return + } + *index = *index + inc + }) + return err +} diff --git a/lib/protoparser/firehose/parser_test.go b/lib/protoparser/firehose/parser_test.go new file mode 100644 index 000000000..647f927df --- /dev/null +++ b/lib/protoparser/firehose/parser_test.go @@ -0,0 +1,16 @@ +package firehose + +import ( + "testing" +) + +func TestProcessRequestBody(t *testing.T) { + data := []byte(`{"records": [{"data": "SGVsbG8sIA=="}, {"data": "d29ybGQh"}]}`) + err := ProcessRequestBody(&data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if string(data) != "Hello, world!" { + t.Fatalf("unexpected string: %q", string(data)) + } +} diff --git a/lib/protoparser/opentelemetry/stream/streamparser.go b/lib/protoparser/opentelemetry/stream/streamparser.go index 9d4dab2cc..30bd0e1dd 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser.go +++ b/lib/protoparser/opentelemetry/stream/streamparser.go @@ -21,7 +21,7 @@ import ( // ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows. // // callback shouldn't hold tss items after returning. -func ParseStream(r io.Reader, isGzipped bool, callback func(tss []prompbmarshal.TimeSeries) error) error { +func ParseStream(r io.Reader, isGzipped bool, processBody func(*[]byte) error, callback func(tss []prompbmarshal.TimeSeries) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -37,7 +37,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(tss []prompbmarshal. wr := getWriteContext() defer putWriteContext(wr) - req, err := wr.readAndUnpackRequest(r) + req, err := wr.readAndUnpackRequest(r, processBody) if err != nil { return fmt.Errorf("cannot unpack OpenTelemetry metrics: %w", err) } @@ -257,11 +257,16 @@ func resetLabels(labels []prompbmarshal.Label) []prompbmarshal.Label { return labels[:0] } -func (wr *writeContext) readAndUnpackRequest(r io.Reader) (*pb.ExportMetricsServiceRequest, error) { +func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func(*[]byte) error) (*pb.ExportMetricsServiceRequest, error) { if _, err := wr.bb.ReadFrom(r); err != nil { return nil, fmt.Errorf("cannot read request: %w", err) } var req pb.ExportMetricsServiceRequest + if processBody != nil { + if err := processBody(&wr.bb.B); err != nil { + return nil, fmt.Errorf("cannot process request body: %w", err) + } + } if err := req.UnmarshalProtobuf(wr.bb.B); err != nil { return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err) } diff --git a/lib/protoparser/opentelemetry/stream/streamparser_test.go b/lib/protoparser/opentelemetry/stream/streamparser_test.go index c5de4c38a..1d5281d4a 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_test.go +++ b/lib/protoparser/opentelemetry/stream/streamparser_test.go @@ -121,7 +121,7 @@ func TestParseStream(t *testing.T) { func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeries) error) error { // Verify parsing without compression - if err := ParseStream(bytes.NewBuffer(data), false, checkSeries); err != nil { + if err := ParseStream(bytes.NewBuffer(data), false, nil, checkSeries); err != nil { return fmt.Errorf("error when parsing data: %w", err) } @@ -134,7 +134,7 @@ func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeri if err := zw.Close(); err != nil { return fmt.Errorf("cannot close gzip writer: %w", err) } - if err := ParseStream(&bb, true, checkSeries); err != nil { + if err := ParseStream(&bb, true, nil, checkSeries); err != nil { return fmt.Errorf("error when parsing compressed data: %w", err) } diff --git a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go b/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go index c2186cb95..e1f73aaf1 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go +++ b/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go @@ -24,7 +24,7 @@ func BenchmarkParseStream(b *testing.B) { data := pbRequest.MarshalProtobuf(nil) for p.Next() { - err := ParseStream(bytes.NewBuffer(data), false, func(tss []prompbmarshal.TimeSeries) error { + err := ParseStream(bytes.NewBuffer(data), false, nil, func(tss []prompbmarshal.TimeSeries) error { return nil }) if err != nil {