app/{vmagent,vminsert}: fixed firehose response (#6016)

This commit is contained in:
Andrii Chubatiuk 2024-03-26 14:20:41 +02:00 committed by GitHub
parent cb23685681
commit 509df44d03
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 76 additions and 32 deletions

View file

@ -315,12 +315,11 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true return true
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics": case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
opentelemetryPushRequests.Inc() opentelemetryPushRequests.Inc()
if err := opentelemetry.InsertHandler(nil, r); err != nil { writeResponse, err := opentelemetry.InsertHandler(nil, r)
if err != nil {
opentelemetryPushErrors.Inc() opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
} }
w.WriteHeader(http.StatusOK) writeResponse(w, time.Now(), err)
return true return true
case "/newrelic": case "/newrelic":
newrelicCheckRequest.Inc() newrelicCheckRequest.Inc()
@ -561,12 +560,11 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
return true return true
case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics": case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics":
opentelemetryPushRequests.Inc() opentelemetryPushRequests.Inc()
if err := opentelemetry.InsertHandler(at, r); err != nil { writeResponse, err := opentelemetry.InsertHandler(nil, r)
if err != nil {
opentelemetryPushErrors.Inc() opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
} }
w.WriteHeader(http.StatusOK) writeResponse(w, time.Now(), err)
return true return true
case "newrelic": case "newrelic":
newrelicCheckRequest.Inc() newrelicCheckRequest.Inc()

View file

@ -3,10 +3,12 @@ package opentelemetry
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
@ -21,22 +23,35 @@ var (
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
) )
// WriteResponseFn function to write HTTP response data
type WriteResponseFn func(http.ResponseWriter, time.Time, error)
// InsertHandler processes opentelemetry metrics. // InsertHandler processes opentelemetry metrics.
func InsertHandler(at *auth.Token, req *http.Request) error { func InsertHandler(at *auth.Token, req *http.Request) (WriteResponseFn, error) {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error) var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" { writeResponse := func(w http.ResponseWriter, _ time.Time, err error) {
if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" { if err == nil {
processBody = firehose.ProcessRequestBody w.WriteHeader(http.StatusOK)
} else { } else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") httpserver.Errorf(w, req, "%s", err)
} }
} }
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { if req.Header.Get("Content-Type") == "application/json" {
if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" {
processBody = firehose.ProcessRequestBody
writeResponse = func(w http.ResponseWriter, t time.Time, err error) {
firehose.ResponseWriter(w, t, fhRequestID, err)
}
} else {
return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return writeResponse, err
}
return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(at, tss, extraLabels) return insertRows(at, tss, extraLabels)
}) })
} }

View file

@ -218,12 +218,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true return true
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics": case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
opentelemetryPushRequests.Inc() opentelemetryPushRequests.Inc()
if err := opentelemetry.InsertHandler(r); err != nil { writeResponse, err := opentelemetry.InsertHandler(r)
if err != nil {
opentelemetryPushErrors.Inc() opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
} }
w.WriteHeader(http.StatusOK) writeResponse(w, startTime, err)
return true return true
case "/newrelic": case "/newrelic":
newrelicCheckRequest.Inc() newrelicCheckRequest.Inc()

View file

@ -3,9 +3,11 @@ package opentelemetry
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
@ -18,22 +20,35 @@ var (
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`) rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`)
) )
// WriteResponseFn function to write HTTP response data
type WriteResponseFn func(http.ResponseWriter, time.Time, error)
// InsertHandler processes opentelemetry metrics. // InsertHandler processes opentelemetry metrics.
func InsertHandler(req *http.Request) error { func InsertHandler(req *http.Request) (WriteResponseFn, error) {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error) var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" { writeResponse := func(w http.ResponseWriter, _ time.Time, err error) {
if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" { if err == nil {
processBody = firehose.ProcessRequestBody w.WriteHeader(http.StatusOK)
} else { } else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding") httpserver.Errorf(w, req, "%s", err)
} }
} }
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error { if req.Header.Get("Content-Type") == "application/json" {
if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" {
processBody = firehose.ProcessRequestBody
writeResponse = func(w http.ResponseWriter, t time.Time, err error) {
firehose.ResponseWriter(w, t, fhRequestID, err)
}
} else {
return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return writeResponse, err
}
return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(tss, extraLabels) return insertRows(tss, extraLabels)
}) })
} }

View file

@ -65,6 +65,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix VictoriaLogs UI query handling to correctly apply `_time` filter across all queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5920). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix VictoriaLogs UI query handling to correctly apply `_time` filter across all queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5920).
* BUGFIX: [vmselect](https://docs.victoriametrics.com/): make vmselect resilient to absence of cache folder. If cache folder was mistakenly deleted by user or OS, vmselect will try re-creating it first. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5985). * BUGFIX: [vmselect](https://docs.victoriametrics.com/): make vmselect resilient to absence of cache folder. If cache folder was mistakenly deleted by user or OS, vmselect will try re-creating it first. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5985).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): limit duration of requests to /api/v1/labels, /api/v1/label/.../values or /api/v1/series with `-search.maxLabelsAPIDuration` duration. Before, `-search.maxExportDuration` value was used by mistake. Thanks to @kbweave for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5992). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): limit duration of requests to /api/v1/labels, /api/v1/label/.../values or /api/v1/series with `-search.maxLabelsAPIDuration` duration. Before, `-search.maxExportDuration` value was used by mistake. Thanks to @kbweave for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5992).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fixed response body and headers for AWS Firehose HTTP Destination.
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)

View file

@ -4,6 +4,8 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"time"
) )
// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP request body delivered via Firehose into OpenTelemetry protobuf message. // ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP request body delivered via Firehose into OpenTelemetry protobuf message.
@ -48,3 +50,17 @@ func ProcessRequestBody(b []byte) ([]byte, error) {
} }
return dst, nil return dst, nil
} }
// ResponseWriter writes response for AWS Firehose HTTP Endpoint request
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat
func ResponseWriter(w http.ResponseWriter, ct time.Time, reqID string, err error) {
var respBody string
ts := ct.UnixMilli()
if err == nil {
respBody = fmt.Sprintf(`{"requestId": %q,"timestamp": %d}`, reqID, ts)
} else {
respBody = fmt.Sprintf(`{"requestId": %q,"timestamp": %d,"errorMessage": %q}`, reqID, ts, err)
}
w.Header().Add("Content-Type", "application/json")
w.Write([]byte(respBody))
}