diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 1f1f4b884..bb50d56f8 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -315,12 +315,11 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics": opentelemetryPushRequests.Inc() - if err := opentelemetry.InsertHandler(nil, r); err != nil { + writeResponse, err := opentelemetry.InsertHandler(nil, r) + if err != nil { opentelemetryPushErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true } - w.WriteHeader(http.StatusOK) + writeResponse(w, time.Now(), err) return true case "/newrelic": newrelicCheckRequest.Inc() @@ -561,12 +560,11 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri return true case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics": opentelemetryPushRequests.Inc() - if err := opentelemetry.InsertHandler(at, r); err != nil { + writeResponse, err := opentelemetry.InsertHandler(nil, r) + if err != nil { opentelemetryPushErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true } - w.WriteHeader(http.StatusOK) + writeResponse(w, time.Now(), err) return true case "newrelic": newrelicCheckRequest.Inc() diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index 09f6bf080..5dd252db7 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -3,10 +3,12 @@ package opentelemetry import ( "fmt" "net/http" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" @@ -21,22 +23,35 @@ var ( rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`) ) +// WriteResponseFn function to write HTTP response data +type WriteResponseFn func(http.ResponseWriter, time.Time, error) + // InsertHandler processes opentelemetry metrics. -func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) - if err != nil { - return err - } +func InsertHandler(at *auth.Token, req *http.Request) (WriteResponseFn, error) { isGzipped := req.Header.Get("Content-Encoding") == "gzip" var processBody func([]byte) ([]byte, error) - if req.Header.Get("Content-Type") == "application/json" { - if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" { - processBody = firehose.ProcessRequestBody + writeResponse := func(w http.ResponseWriter, _ time.Time, err error) { + if err == nil { + w.WriteHeader(http.StatusOK) } else { - return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + httpserver.Errorf(w, req, "%s", err) } } - return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { + if req.Header.Get("Content-Type") == "application/json" { + if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" { + processBody = firehose.ProcessRequestBody + writeResponse = func(w http.ResponseWriter, t time.Time, err error) { + firehose.ResponseWriter(w, t, fhRequestID, err) + } + } else { + return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + } + } + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return writeResponse, err + } + return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(at, tss, extraLabels) }) } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 866e6e4a2..43dfcca0f 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -298,12 +298,11 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics": opentelemetryPushRequests.Inc() - if err := opentelemetry.InsertHandler(at, r); err != nil { + writeResponse, err := opentelemetry.InsertHandler(at, r) + if err != nil { opentelemetryPushErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true } - w.WriteHeader(http.StatusOK) + writeResponse(w, startTime, err) return true case "newrelic": newrelicCheckRequest.Inc() diff --git a/app/vminsert/opentelemetry/request_handler.go b/app/vminsert/opentelemetry/request_handler.go index f683d80cc..156196ec7 100644 --- a/app/vminsert/opentelemetry/request_handler.go +++ b/app/vminsert/opentelemetry/request_handler.go @@ -3,10 +3,12 @@ package opentelemetry import ( "fmt" "net/http" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" @@ -22,22 +24,35 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`) ) +// WriteResponseFn function to write HTTP response data +type WriteResponseFn func(http.ResponseWriter, time.Time, error) + // InsertHandler processes opentelemetry metrics. -func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) - if err != nil { - return err - } +func InsertHandler(at *auth.Token, req *http.Request) (WriteResponseFn, error) { isGzipped := req.Header.Get("Content-Encoding") == "gzip" var processBody func([]byte) ([]byte, error) - if req.Header.Get("Content-Type") == "application/json" { - if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" { - processBody = firehose.ProcessRequestBody + writeResponse := func(w http.ResponseWriter, _ time.Time, err error) { + if err == nil { + w.WriteHeader(http.StatusOK) } else { - return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + httpserver.Errorf(w, req, "%s", err) } } - return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { + if req.Header.Get("Content-Type") == "application/json" { + if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" { + processBody = firehose.ProcessRequestBody + writeResponse = func(w http.ResponseWriter, t time.Time, err error) { + firehose.ResponseWriter(w, t, fhRequestID, err) + } + } else { + return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + } + } + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return writeResponse, err + } + return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(at, tss, extraLabels) }) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index dccc896de..d0aa6e3f4 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -76,6 +76,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): set correct `endsAt` value in notifications sent to the Alertmanager. Previously, a rule with evaluation intervals lower than 10s could never be triggered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5995) for details. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): respect `-remoteWrite.maxBatchSize` at shutdown period. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025). Thanks to @jiekun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6039). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fixed response body and headers for AWS Firehose HTTP Destination. ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) diff --git a/lib/protoparser/opentelemetry/firehose/parser.go b/lib/protoparser/opentelemetry/firehose/parser.go index e6801e3e7..fb3f4f649 100644 --- a/lib/protoparser/opentelemetry/firehose/parser.go +++ b/lib/protoparser/opentelemetry/firehose/parser.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "encoding/json" "fmt" + "net/http" + "time" ) // ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP request body delivered via Firehose into OpenTelemetry protobuf message. @@ -48,3 +50,17 @@ func ProcessRequestBody(b []byte) ([]byte, error) { } return dst, nil } + +// ResponseWriter writes response for AWS Firehose HTTP Endpoint request +// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat +func ResponseWriter(w http.ResponseWriter, ct time.Time, reqID string, err error) { + var respBody string + ts := ct.UnixMilli() + if err == nil { + respBody = fmt.Sprintf(`{"requestId": %q,"timestamp": %d}`, reqID, ts) + } else { + respBody = fmt.Sprintf(`{"requestId": %q,"timestamp": %d,"errorMessage": %q}`, reqID, ts, err) + } + w.Header().Add("Content-Type", "application/json") + w.Write([]byte(respBody)) +}