app/vmagent: simplify code after 509df44d03

- Simplify the code in order to improve its maintenance
- Properly pass tenant ID when processing multi-tenant opentelemetry request at vmagent

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6016
This commit is contained in:
Aliaksandr Valialkin 2024-04-02 17:51:18 +03:00
parent 76b1fc6ac1
commit 904e95fc69
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 65 additions and 64 deletions

View file

@ -40,6 +40,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/metrics"
)
@ -317,11 +318,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
opentelemetryPushRequests.Inc()
writeResponse, err := opentelemetry.InsertHandler(nil, r)
if err != nil {
if err := opentelemetry.InsertHandler(nil, r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
writeResponse(w, time.Now(), err)
firehose.WriteSuccessResponse(w, r)
return true
case "/newrelic":
newrelicCheckRequest.Inc()
@ -562,11 +564,12 @@ func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path stri
return true
case "opentelemetry/api/v1/push", "opentelemetry/v1/metrics":
opentelemetryPushRequests.Inc()
writeResponse, err := opentelemetry.InsertHandler(nil, r)
if err != nil {
if err := opentelemetry.InsertHandler(at, r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
writeResponse(w, time.Now(), err)
firehose.WriteSuccessResponse(w, r)
return true
case "newrelic":
newrelicCheckRequest.Inc()

View file

@ -3,12 +3,10 @@ package opentelemetry
import (
"fmt"
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
@ -23,35 +21,22 @@ var (
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
)
// WriteResponseFn function to write HTTP response data
type WriteResponseFn func(http.ResponseWriter, time.Time, error)
// InsertHandler processes opentelemetry metrics.
func InsertHandler(at *auth.Token, req *http.Request) (WriteResponseFn, error) {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error)
writeResponse := func(w http.ResponseWriter, _ time.Time, err error) {
if err == nil {
w.WriteHeader(http.StatusOK)
} else {
httpserver.Errorf(w, req, "%s", err)
}
}
if req.Header.Get("Content-Type") == "application/json" {
if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" {
processBody = firehose.ProcessRequestBody
writeResponse = func(w http.ResponseWriter, t time.Time, err error) {
firehose.ResponseWriter(w, t, fhRequestID, err)
}
} else {
return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return writeResponse, err
return err
}
return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" {
if req.Header.Get("X-Amz-Firehouse-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody
} else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
}

View file

@ -40,6 +40,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
@ -218,11 +219,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
opentelemetryPushRequests.Inc()
writeResponse, err := opentelemetry.InsertHandler(r)
if err != nil {
if err := opentelemetry.InsertHandler(r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
writeResponse(w, startTime, err)
firehose.WriteSuccessResponse(w, r)
return true
case "/newrelic":
newrelicCheckRequest.Inc()

View file

@ -3,11 +3,9 @@ package opentelemetry
import (
"fmt"
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
@ -20,35 +18,22 @@ var (
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`)
)
// WriteResponseFn function to write HTTP response data
type WriteResponseFn func(http.ResponseWriter, time.Time, error)
// InsertHandler processes opentelemetry metrics.
func InsertHandler(req *http.Request) (WriteResponseFn, error) {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error)
writeResponse := func(w http.ResponseWriter, _ time.Time, err error) {
if err == nil {
w.WriteHeader(http.StatusOK)
} else {
httpserver.Errorf(w, req, "%s", err)
}
}
if req.Header.Get("Content-Type") == "application/json" {
if fhRequestID := req.Header.Get("X-Amz-Firehose-Request-Id"); fhRequestID != "" {
processBody = firehose.ProcessRequestBody
writeResponse = func(w http.ResponseWriter, t time.Time, err error) {
firehose.ResponseWriter(w, t, fhRequestID, err)
}
} else {
return writeResponse, fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return writeResponse, err
return err
}
return writeResponse, stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
var processBody func([]byte) ([]byte, error)
if req.Header.Get("Content-Type") == "application/json" {
if req.Header.Get("X-Amz-Firehose-Protocol-Version") != "" {
processBody = firehose.ProcessRequestBody
} else {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
return stream.ParseStream(req.Body, isGzipped, processBody, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(tss, extraLabels)
})
}

View file

@ -66,10 +66,10 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case.
* BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): return proper resonses for [AWS Firehose](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat) requests according to [these docs](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6016).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): properly parse TLS key and CA files for [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x) and [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb) migration modes.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix VictoriaLogs UI query handling to correctly apply `_time` filter across all queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5920).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): limit duration of requests to /api/v1/labels, /api/v1/label/.../values or /api/v1/series with `-search.maxLabelsAPIDuration` duration. Before, `-search.maxExportDuration` value was used by mistake. The bug has been introduced in [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0). Thanks to @kbweave for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5992).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix response body and headers for AWS Firehose HTTP Destination.
* BUGFIX: properly wait for force merge to be completed during the shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5944) for the details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): set correct `endsAt` value in notifications sent to the Alertmanager. Previously, a rule with evaluation intervals lower than 10s could never be triggered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5995) for details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details.

View file

@ -0,0 +1,26 @@
package firehose
import (
"fmt"
"net/http"
"time"
)
// WriteSuccessResponse writes success response for AWS Firehose request.
//
// See https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat
func WriteSuccessResponse(w http.ResponseWriter, r *http.Request) {
requestID := r.Header.Get("X-Amz-Firehose-Request-Id")
if requestID == "" {
// This isn't a AWS firehose request - just return an empty response in this case.
w.WriteHeader(http.StatusOK)
return
}
body := fmt.Sprintf(`{"requestId":%q,"timestamp":%d}`, requestID, time.Now().UnixMilli())
h := w.Header()
h.Set("Content-Type", "application/json")
h.Set("Content-Length", fmt.Sprintf("%d", len(body)))
w.Write([]byte(body))
}