{vmagent,vminsert}: added firehose http destination opentelemetry data ingestion support (#5893)

Co-authored-by: Andrii Chubatiuk <wachy@Andriis-MBP-2.lan>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Andrii Chubatiuk 2024-02-29 14:03:24 +02:00 committed by GitHub
parent 1217c1f2da
commit 67a55b89a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 134 additions and 10 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
@ -27,10 +28,15 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func(*[]byte) error
if req.Header.Get("Content-Type") == "application/json" {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody
} else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
return stream.ParseStream(req.Body, isGzipped, func(tss []prompbmarshal.TimeSeries) error {
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
}

View file

@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/metrics"
)
@ -24,10 +25,15 @@ func InsertHandler(req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func(*[]byte) error
if req.Header.Get("Content-Type") == "application/json" {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody
} else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
return stream.ParseStream(req.Body, isGzipped, func(tss []prompbmarshal.TimeSeries) error {
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(tss, extraLabels)
})
}

View file

@ -0,0 +1,91 @@
package firehose
import (
"encoding/base64"
"fmt"
"github.com/valyala/fastjson"
)
// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP body delivered via Firehose into protobuf message.
func ProcessRequestBody(b *[]byte) error {
return unmarshalRequest(b)
}
var jsonParserPool fastjson.ParserPool
// unmarshalRequest extracts and decodes b64 data from Firehose HTTP destination request
//
// {
// "requestId": "<uuid-string>",
// "timestamp": <int64-value>,
// "records": [
// {
// "data": "<base64-encoded-payload>"
// }
// ]
// }
func unmarshalRequest(b *[]byte) error {
p := jsonParserPool.Get()
defer jsonParserPool.Put(p)
v, err := p.ParseBytes(*b)
if err != nil {
return err
}
o, err := v.Object()
if err != nil {
return fmt.Errorf("cannot find Firehose Request objects: %w", err)
}
index := 0
o.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
switch string(k) {
case "records":
recordObjects, errLocal := v.Array()
if errLocal != nil {
err = fmt.Errorf("cannot find Records array in Firehose Request object: %w", errLocal)
return
}
for _, fr := range recordObjects {
recordObject, errLocal := fr.Object()
if errLocal != nil {
err = fmt.Errorf("cannot find Record object: %w", errLocal)
return
}
if errLocal := unmarshalRecord(b, &index, recordObject); errLocal != nil {
err = fmt.Errorf("cannot unmarshal Record object: %w", errLocal)
return
}
}
}
})
*b = (*b)[:index]
if err != nil {
return fmt.Errorf("cannot parse Firehose Request object: %w", err)
}
return nil
}
func unmarshalRecord(b *[]byte, index *int, o *fastjson.Object) error {
var err error
var inc int
o.Visit(func(k []byte, v *fastjson.Value) {
if v.Type() != fastjson.TypeString {
err = fmt.Errorf("invalid record data type, %q", v.Type())
return
}
valueBytes := v.GetStringBytes()
if len(valueBytes) == 0 {
return
}
inc, err = base64.StdEncoding.Decode((*b)[*index:], valueBytes)
if err != nil {
err = fmt.Errorf("failed to decode and append Firehose Record data: %w", err)
return
}
*index = *index + inc
})
return err
}

View file

@ -0,0 +1,16 @@
package firehose
import (
"testing"
)
func TestProcessRequestBody(t *testing.T) {
data := []byte(`{"records": [{"data": "SGVsbG8sIA=="}, {"data": "d29ybGQh"}]}`)
err := ProcessRequestBody(&data)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if string(data) != "Hello, world!" {
t.Fatalf("unexpected string: %q", string(data))
}
}

View file

@ -21,7 +21,7 @@ import (
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
//
// callback shouldn't hold tss items after returning.
func ParseStream(r io.Reader, isGzipped bool, callback func(tss []prompbmarshal.TimeSeries) error) error {
func ParseStream(r io.Reader, isGzipped bool, processBody func(*[]byte) error, callback func(tss []prompbmarshal.TimeSeries) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
@ -37,7 +37,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(tss []prompbmarshal.
wr := getWriteContext()
defer putWriteContext(wr)
req, err := wr.readAndUnpackRequest(r)
req, err := wr.readAndUnpackRequest(r, processBody)
if err != nil {
return fmt.Errorf("cannot unpack OpenTelemetry metrics: %w", err)
}
@ -257,11 +257,16 @@ func resetLabels(labels []prompbmarshal.Label) []prompbmarshal.Label {
return labels[:0]
}
func (wr *writeContext) readAndUnpackRequest(r io.Reader) (*pb.ExportMetricsServiceRequest, error) {
func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func(*[]byte) error) (*pb.ExportMetricsServiceRequest, error) {
if _, err := wr.bb.ReadFrom(r); err != nil {
return nil, fmt.Errorf("cannot read request: %w", err)
}
var req pb.ExportMetricsServiceRequest
if processBody != nil {
if err := processBody(&wr.bb.B); err != nil {
return nil, fmt.Errorf("cannot process request body: %w", err)
}
}
if err := req.UnmarshalProtobuf(wr.bb.B); err != nil {
return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err)
}

View file

@ -121,7 +121,7 @@ func TestParseStream(t *testing.T) {
func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeries) error) error {
// Verify parsing without compression
if err := ParseStream(bytes.NewBuffer(data), false, checkSeries); err != nil {
if err := ParseStream(bytes.NewBuffer(data), false, nil, checkSeries); err != nil {
return fmt.Errorf("error when parsing data: %w", err)
}
@ -134,7 +134,7 @@ func checkParseStream(data []byte, checkSeries func(tss []prompbmarshal.TimeSeri
if err := zw.Close(); err != nil {
return fmt.Errorf("cannot close gzip writer: %w", err)
}
if err := ParseStream(&bb, true, checkSeries); err != nil {
if err := ParseStream(&bb, true, nil, checkSeries); err != nil {
return fmt.Errorf("error when parsing compressed data: %w", err)
}

View file

@ -24,7 +24,7 @@ func BenchmarkParseStream(b *testing.B) {
data := pbRequest.MarshalProtobuf(nil)
for p.Next() {
err := ParseStream(bytes.NewBuffer(data), false, func(tss []prompbmarshal.TimeSeries) error {
err := ParseStream(bytes.NewBuffer(data), false, nil, func(tss []prompbmarshal.TimeSeries) error {
return nil
})
if err != nil {