From 904e95fc69295b90ae0b5b9e1b172dc87deae3da Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 2 Apr 2024 17:51:18 +0300 Subject: [PATCH] app/vmagent: simplify code after 509df44d03631a863fab00e202a6449c19d784e1 - Simplify the code in order to improve its maintenance - Properly pass tenant ID when processing multi-tenant opentelemetry request at vmagent Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6016 --- app/vmagent/main.go | 15 ++++--- app/vmagent/opentelemetry/request_handler.go | 39 ++++++------------- app/vminsert/main.go | 8 ++-- app/vminsert/opentelemetry/request_handler.go | 39 ++++++------------- docs/CHANGELOG.md | 2 +- .../opentelemetry/firehose/http.go | 26 +++++++++++++ 6 files changed, 65 insertions(+), 64 deletions(-) create mode 100644 lib/protoparser/opentelemetry/firehose/http.go diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 107d701b4..e34934bfa 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -40,6 +40,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -317,11 +318,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics": opentelemetryPushRequests.Inc() - writeResponse, err := opentelemetry.InsertHandler(nil, r) - if err != nil { + if err := opentelemetry.InsertHandler(nil, r); err != nil { opentelemetryPushErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true } - writeResponse(w, time.Now(), err) + firehose.WriteSuccessResponse(w, r) return true case "/newrelic": newrelicCheckRequest.Inc() @@ -562,11 +564,12 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri return true case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics": opentelemetryPushRequests.Inc() - writeResponse, err := opentelemetry.InsertHandler(nil, r) - if err != nil { + if err := opentelemetry.InsertHandler(at, r); err != nil { opentelemetryPushErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true } - writeResponse(w, time.Now(), err) + firehose.WriteSuccessResponse(w, r) return true case "newrelic": newrelicCheckRequest.Inc() diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index 5dd252db7..09f6bf080 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -3,12 +3,10 @@ package opentelemetry import ( "fmt" "net/http" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" @@ -23,35 +21,22 @@ var ( rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`) ) -// WriteResponseFn function to write HTTP response data -type WriteResponseFn func(http.ResponseWriter, time.Time, error) - // InsertHandler processes opentelemetry metrics. -func InsertHandler(at *auth.Token, req *http.Request) (WriteResponseFn, error) { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - var processBody func([]byte) ([]byte, error) - writeResponse := func(w http.ResponseWriter, _ time.Time, err error) { - if err == nil { - w.WriteHeader(http.StatusOK) - } else { - httpserver.Errorf(w, req, "%s", err) - } - } - if req.Header.Get("Content-Type") == "application/json" { - if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" { - processBody = firehose.ProcessRequestBody - writeResponse = func(w http.ResponseWriter, t time.Time, err error) { - firehose.ResponseWriter(w, t, fhRequestID, err) - } - } else { - return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") - } - } +func InsertHandler(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { - return writeResponse, err + return err } - return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + var processBody func([]byte) ([]byte, error) + if req.Header.Get("Content-Type") == "application/json" { + if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" { + processBody = firehose.ProcessRequestBody + } else { + return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + } + } + return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(at, tss, extraLabels) }) } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 6f86bc028..7f2c403de 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -40,6 +40,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -218,11 +219,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics": opentelemetryPushRequests.Inc() - writeResponse, err := opentelemetry.InsertHandler(r) - if err != nil { + if err := opentelemetry.InsertHandler(r); err != nil { opentelemetryPushErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true } - writeResponse(w, startTime, err) + firehose.WriteSuccessResponse(w, r) return true case "/newrelic": newrelicCheckRequest.Inc() diff --git a/app/vminsert/opentelemetry/request_handler.go b/app/vminsert/opentelemetry/request_handler.go index 839ad82f1..72560108e 100644 --- a/app/vminsert/opentelemetry/request_handler.go +++ b/app/vminsert/opentelemetry/request_handler.go @@ -3,11 +3,9 @@ package opentelemetry import ( "fmt" "net/http" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" @@ -20,35 +18,22 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`) ) -// WriteResponseFn function to write HTTP response data -type WriteResponseFn func(http.ResponseWriter, time.Time, error) - // InsertHandler processes opentelemetry metrics. -func InsertHandler(req *http.Request) (WriteResponseFn, error) { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - var processBody func([]byte) ([]byte, error) - writeResponse := func(w http.ResponseWriter, _ time.Time, err error) { - if err == nil { - w.WriteHeader(http.StatusOK) - } else { - httpserver.Errorf(w, req, "%s", err) - } - } - if req.Header.Get("Content-Type") == "application/json" { - if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" { - processBody = firehose.ProcessRequestBody - writeResponse = func(w http.ResponseWriter, t time.Time, err error) { - firehose.ResponseWriter(w, t, fhRequestID, err) - } - } else { - return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") - } - } +func InsertHandler(req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { - return writeResponse, err + return err } - return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + var processBody func([]byte) ([]byte, error) + if req.Header.Get("Content-Type") == "application/json" { + if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" { + processBody = firehose.ProcessRequestBody + } else { + return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") + } + } + return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { return insertRows(tss, extraLabels) }) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 70af834c9..f2812242a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -66,10 +66,10 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case. * BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): return proper resonses for [AWS Firehose](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat) requests according to [these docs](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6016). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): properly parse TLS key and CA files for [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x) and [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb) migration modes. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix VictoriaLogs UI query handling to correctly apply `_time` filter across all queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5920). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): limit duration of requests to /api/v1/labels, /api/v1/label/.../values or /api/v1/series with `-search.maxLabelsAPIDuration` duration. Before, `-search.maxExportDuration` value was used by mistake. The bug has been introduced in [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0). Thanks to @kbweave for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5992). -* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix response body and headers for AWS Firehose HTTP Destination. * BUGFIX: properly wait for force merge to be completed during the shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5944) for the details. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): set correct `endsAt` value in notifications sent to the Alertmanager. Previously, a rule with evaluation intervals lower than 10s could never be triggered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5995) for details. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details. diff --git a/lib/protoparser/opentelemetry/firehose/http.go b/lib/protoparser/opentelemetry/firehose/http.go new file mode 100644 index 000000000..a222e7279 --- /dev/null +++ b/lib/protoparser/opentelemetry/firehose/http.go @@ -0,0 +1,26 @@ +package firehose + +import ( + "fmt" + "net/http" + "time" +) + +// WriteSuccessResponse writes success response for AWS Firehose request. +// +// See https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat +func WriteSuccessResponse(w http.ResponseWriter, r *http.Request) { + requestID := r.Header.Get("X-Amz-Firehose-Request-Id") + if requestID == "" { + // This isn't a AWS firehose request - just return an empty response in this case. + w.WriteHeader(http.StatusOK) + return + } + + body := fmt.Sprintf(`{"requestId":%q,"timestamp":%d}`, requestID, time.Now().UnixMilli()) + + h := w.Header() + h.Set("Content-Type", "application/json") + h.Set("Content-Length", fmt.Sprintf("%d", len(body))) + w.Write([]byte(body)) +}