diff --git a/app/vmagent/main.go b/app/vmagent/main.go index be34a3096..58a0b8f1e 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -114,10 +114,12 @@ func main() { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) } if len(*opentsdbListenAddr) > 0 { - opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, opentsdbhttp.InsertHandler) + httpInsertHandler := getOpenTSDBHTTPInsertHandler() + opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, httpInsertHandler) } if len(*opentsdbHTTPListenAddr) > 0 { - opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler) + httpInsertHandler := getOpenTSDBHTTPInsertHandler() + opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, httpInsertHandler) } promscrape.Init(remotewrite.Push) @@ -159,6 +161,40 @@ func main() { logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds()) } +func getOpenTSDBHTTPInsertHandler() func(req *http.Request) error { + if !remotewrite.MultitenancyEnabled() { + return func(req *http.Request) error { + path := strings.Replace(req.URL.Path, "//", "/", -1) + if path != "/api/put" { + return fmt.Errorf("unsupported path requested: %q; expecting '/api/put'", path) + } + return opentsdbhttp.InsertHandler(nil, req) + } + } + return func(req *http.Request) error { + path := strings.Replace(req.URL.Path, "//", "/", -1) + at, err := getAuthTokenFromPath(path) + if err != nil { + return fmt.Errorf("cannot obtain auth token from path %q: %w", path, err) + } + return opentsdbhttp.InsertHandler(at, req) + } +} + +func getAuthTokenFromPath(path string) (*auth.Token, error) { + p, err := httpserver.ParsePath(path) + if err != nil { + return nil, fmt.Errorf("cannot parse multitenant path: %w", err) + } + if p.Prefix != "insert" { + return nil, fmt.Errorf(`unsupported multitenant prefix: %q; expected "insert"`, p.Prefix) + } + if p.Suffix != "opentsdb/api/put" { + return nil, fmt.Errorf("unsupported path requested: %q; expecting 'opentsdb/api/put'", p.Suffix) + } + return auth.NewToken(p.AuthToken) +} + func requestHandler(w http.ResponseWriter, r *http.Request) bool { if r.URL.Path == "/" { if r.Method != "GET" { diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index e49f0274f..98b02e5e3 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -1,15 +1,11 @@ package opentsdbhttp import ( - "errors" - "fmt" "net/http" - "strings" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" @@ -24,17 +20,7 @@ var ( // InsertHandler processes HTTP OpenTSDB put requests. // See http://opentsdb.net/docs/build/html/api_http/put.html -func InsertHandler(req *http.Request) error { - path := strings.Replace(req.URL.Path, "//", "/", -1) - p, err := httpserver.ParsePath(path) - if err != nil { - // Cannot parse multitenant path. Skip it - probably it will be parsed later. - return err - } - if p.Prefix != "insert" { - return errors.New(fmt.Sprintf(`unsupported multitenant prefix: %q; expected "insert"`, p.Prefix)) - } - at, err := auth.NewToken(p.AuthToken) +func InsertHandler(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 72acbb9fe..2d19773dd 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -23,6 +23,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: return shorter error messages to Grafana and to other clients requesting [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) endpoints. This should simplify reading these errors by humans. The long error message with full context is still written to logs. * FEATURE: add the ability to fine-tune the number of points, which can be generated per each matching time series during [subquery](https://docs.victoriametrics.com/MetricsQL.html#subqueries) evaluation. This can be done with the `-search.maxPointsSubqueryPerTimeseries` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2922). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to accept [multitenant](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) data via OpenTSDB `/api/put` protocol at `/insert//opentsdb/api/put` http endpoint if [multitenant support](https://docs.victoriametrics.com/vmagent.html#multitenancy) is enabled at `vmagent`. Thanks to @chengjianyun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3015). * FEATURE: [monitoring](https://docs.victoriametrics.com/#monitoring): expose `vm_hourly_series_limit_max_series`, `vm_hourly_series_limit_current_series`, `vm_daily_series_limit_max_series` and `vm_daily_series_limit_current_series` metrics when `-search.maxHourlySeries` or `-search.maxDailySeries` limits are set. This allows alerting when the number of unique series reaches the configured limits. See [these docs](https://docs.victoriametrics.com/#cardinality-limiter) for details. * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): reduce the amounts of logging at `vmstorage` when `vmselect` connects/disconnects to `vmstorage`. * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance for heavy queries on systems with many CPU cores.