mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/{vmagent,vminsert}: fixed firehose response (#6016)
This commit is contained in:
parent
1c597e9dba
commit
914b23f1e8
6 changed files with 76 additions and 32 deletions
|
@ -315,12 +315,11 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
return true
|
return true
|
||||||
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
|
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
|
||||||
opentelemetryPushRequests.Inc()
|
opentelemetryPushRequests.Inc()
|
||||||
if err := opentelemetry.InsertHandler(nil, r); err != nil {
|
writeResponse, err := opentelemetry.InsertHandler(nil, r)
|
||||||
|
if err != nil {
|
||||||
opentelemetryPushErrors.Inc()
|
opentelemetryPushErrors.Inc()
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusOK)
|
writeResponse(w, time.Now(), err)
|
||||||
return true
|
return true
|
||||||
case "/newrelic":
|
case "/newrelic":
|
||||||
newrelicCheckRequest.Inc()
|
newrelicCheckRequest.Inc()
|
||||||
|
@ -561,12 +560,11 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
|
||||||
return true
|
return true
|
||||||
case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics":
|
case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics":
|
||||||
opentelemetryPushRequests.Inc()
|
opentelemetryPushRequests.Inc()
|
||||||
if err := opentelemetry.InsertHandler(at, r); err != nil {
|
writeResponse, err := opentelemetry.InsertHandler(nil, r)
|
||||||
|
if err != nil {
|
||||||
opentelemetryPushErrors.Inc()
|
opentelemetryPushErrors.Inc()
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusOK)
|
writeResponse(w, time.Now(), err)
|
||||||
return true
|
return true
|
||||||
case "newrelic":
|
case "newrelic":
|
||||||
newrelicCheckRequest.Inc()
|
newrelicCheckRequest.Inc()
|
||||||
|
|
|
@ -3,10 +3,12 @@ package opentelemetry
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
|
||||||
|
@ -21,22 +23,35 @@ var (
|
||||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
|
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// WriteResponseFn function to write HTTP response data
|
||||||
|
type WriteResponseFn func(http.ResponseWriter, time.Time, error)
|
||||||
|
|
||||||
// InsertHandler processes opentelemetry metrics.
|
// InsertHandler processes opentelemetry metrics.
|
||||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
func InsertHandler(at *auth.Token, req *http.Request) (WriteResponseFn, error) {
|
||||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||||
var processBody func([]byte) ([]byte, error)
|
var processBody func([]byte) ([]byte, error)
|
||||||
if req.Header.Get("Content-Type") == "application/json" {
|
writeResponse := func(w http.ResponseWriter, _ time.Time, err error) {
|
||||||
if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" {
|
if err == nil {
|
||||||
processBody = firehose.ProcessRequestBody
|
w.WriteHeader(http.StatusOK)
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
httpserver.Errorf(w, req, "%s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
|
if req.Header.Get("Content-Type") == "application/json" {
|
||||||
|
if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" {
|
||||||
|
processBody = firehose.ProcessRequestBody
|
||||||
|
writeResponse = func(w http.ResponseWriter, t time.Time, err error) {
|
||||||
|
firehose.ResponseWriter(w, t, fhRequestID, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||||
|
if err != nil {
|
||||||
|
return writeResponse, err
|
||||||
|
}
|
||||||
|
return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
|
||||||
return insertRows(at, tss, extraLabels)
|
return insertRows(at, tss, extraLabels)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,12 +298,11 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
return true
|
return true
|
||||||
case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics":
|
case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics":
|
||||||
opentelemetryPushRequests.Inc()
|
opentelemetryPushRequests.Inc()
|
||||||
if err := opentelemetry.InsertHandler(at, r); err != nil {
|
writeResponse, err := opentelemetry.InsertHandler(at, r)
|
||||||
|
if err != nil {
|
||||||
opentelemetryPushErrors.Inc()
|
opentelemetryPushErrors.Inc()
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusOK)
|
writeResponse(w, startTime, err)
|
||||||
return true
|
return true
|
||||||
case "newrelic":
|
case "newrelic":
|
||||||
newrelicCheckRequest.Inc()
|
newrelicCheckRequest.Inc()
|
||||||
|
|
|
@ -3,10 +3,12 @@ package opentelemetry
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
|
||||||
|
@ -22,22 +24,35 @@ var (
|
||||||
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`)
|
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// WriteResponseFn function to write HTTP response data
|
||||||
|
type WriteResponseFn func(http.ResponseWriter, time.Time, error)
|
||||||
|
|
||||||
// InsertHandler processes opentelemetry metrics.
|
// InsertHandler processes opentelemetry metrics.
|
||||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
func InsertHandler(at *auth.Token, req *http.Request) (WriteResponseFn, error) {
|
||||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||||
var processBody func([]byte) ([]byte, error)
|
var processBody func([]byte) ([]byte, error)
|
||||||
if req.Header.Get("Content-Type") == "application/json" {
|
writeResponse := func(w http.ResponseWriter, _ time.Time, err error) {
|
||||||
if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" {
|
if err == nil {
|
||||||
processBody = firehose.ProcessRequestBody
|
w.WriteHeader(http.StatusOK)
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
httpserver.Errorf(w, req, "%s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
|
if req.Header.Get("Content-Type") == "application/json" {
|
||||||
|
if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" {
|
||||||
|
processBody = firehose.ProcessRequestBody
|
||||||
|
writeResponse = func(w http.ResponseWriter, t time.Time, err error) {
|
||||||
|
firehose.ResponseWriter(w, t, fhRequestID, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||||
|
if err != nil {
|
||||||
|
return writeResponse, err
|
||||||
|
}
|
||||||
|
return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
|
||||||
return insertRows(at, tss, extraLabels)
|
return insertRows(at, tss, extraLabels)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): set correct `endsAt` value in notifications sent to the Alertmanager. Previously, a rule with evaluation intervals lower than 10s could never be triggered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5995) for details.
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): set correct `endsAt` value in notifications sent to the Alertmanager. Previously, a rule with evaluation intervals lower than 10s could never be triggered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5995) for details.
|
||||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details.
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details.
|
||||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): respect `-remoteWrite.maxBatchSize` at shutdown period. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025). Thanks to @jiekun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6039).
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): respect `-remoteWrite.maxBatchSize` at shutdown period. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025). Thanks to @jiekun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6039).
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fixed response body and headers for AWS Firehose HTTP Destination.
|
||||||
|
|
||||||
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)
|
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP request body delivered via Firehose into OpenTelemetry protobuf message.
|
// ProcessRequestBody converts Cloudwatch Stream protobuf metrics HTTP request body delivered via Firehose into OpenTelemetry protobuf message.
|
||||||
|
@ -48,3 +50,17 @@ func ProcessRequestBody(b []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
return dst, nil
|
return dst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResponseWriter writes response for AWS Firehose HTTP Endpoint request
|
||||||
|
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat
|
||||||
|
func ResponseWriter(w http.ResponseWriter, ct time.Time, reqID string, err error) {
|
||||||
|
var respBody string
|
||||||
|
ts := ct.UnixMilli()
|
||||||
|
if err == nil {
|
||||||
|
respBody = fmt.Sprintf(`{"requestId": %q,"timestamp": %d}`, reqID, ts)
|
||||||
|
} else {
|
||||||
|
respBody = fmt.Sprintf(`{"requestId": %q,"timestamp": %d,"errorMessage": %q}`, reqID, ts, err)
|
||||||
|
}
|
||||||
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
w.Write([]byte(respBody))
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue