From d826352688e201ddd1732780f055c8557f3650c2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 5 Aug 2021 09:46:19 +0300 Subject: [PATCH] app/vmagent: follow-up after fe445f753b6edd8799f3c44d0c0ee9bdcd8c9466 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491 --- app/vmagent/README.md | 19 +- app/vmagent/csvimport/request_handler.go | 18 +- app/vmagent/graphite/request_handler.go | 2 +- app/vmagent/influx/request_handler.go | 18 +- app/vmagent/main.go | 155 +++++++++-------- app/vmagent/native/request_handler.go | 18 +- app/vmagent/opentsdb/request_handler.go | 2 +- app/vmagent/opentsdbhttp/request_handler.go | 2 +- .../prometheusimport/request_handler.go | 18 +- .../promremotewrite/request_handler.go | 22 +-- app/vmagent/remotewrite/relabel.go | 3 +- app/vmagent/remotewrite/remotewrite.go | 164 ++++++++++-------- app/vmagent/vmimport/request_handler.go | 18 +- docs/CHANGELOG.md | 1 + docs/vmagent.md | 19 +- lib/auth/auth.go | 35 ++++ lib/httpserver/path.go | 64 +++++++ lib/promscrape/scraper.go | 17 +- lib/promscrape/scrapework.go | 9 +- lib/promscrape/scrapework_test.go | 5 +- lib/promscrape/scrapework_timing_test.go | 3 +- lib/tenantmetrics/counter_map.go | 72 ++++++++ lib/tenantmetrics/counter_map_test.go | 90 ++++++++++ 23 files changed, 529 insertions(+), 245 deletions(-) create mode 100644 lib/auth/auth.go create mode 100644 lib/httpserver/path.go create mode 100644 lib/tenantmetrics/counter_map.go create mode 100644 lib/tenantmetrics/counter_map_test.go diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 7b1d02fd9..e9141284a 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -135,7 +135,11 @@ Also, Basic Auth can be enabled for the incoming `remote_write` requests with `- ### remote_write for clustered version -While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `://:8480/insert//prometheus/api/v1/write` +While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `://:8480/insert//prometheus/api/v1/write` according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). There is also support for multitenant writes. See [these docs](#multitenancy). + +## Multitenancy + +By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. But it can accept multitenant data if `-remoteWrite.multitenantURL` is set. In this case it accepts multitenant data at `http://vmagent:8429/insert//...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert//prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants. ## How to collect metrics in Prometheus format @@ -634,7 +638,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -promscrape.consulSDCheckInterval duration Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details (default 30s) -promscrape.digitaloceanSDCheckInterval duration - Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s) + Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s) -promscrape.disableCompression Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control -promscrape.disableKeepAlive @@ -645,6 +649,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . The maximum duration for waiting to perform API requests if more than -promscrape.discovery.concurrency requests are simultaneously performed (default 1m0s) -promscrape.dnsSDCheckInterval duration Interval for checking for changes in dns. This works only if dns_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config for details (default 30s) + -promscrape.dockerSDCheckInterval duration + Interval for checking for changes in docker. This works only if docker_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#docker_sd_config for details (default 30s) -promscrape.dockerswarmSDCheckInterval duration Interval for checking for changes in dockerswarm. This works only if dockerswarm_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config for details (default 30s) -promscrape.dropOriginalLabels @@ -658,7 +664,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -promscrape.gceSDCheckInterval duration Interval for checking for changes in gce. This works only if gce_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details (default 1m0s) -promscrape.httpSDCheckInterval duration - Interval for checking for changes in http service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s) + Interval for checking for changes in http endpoint service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s) -promscrape.kubernetes.apiServerTimeout duration How frequently to reload the full state from Kuberntes API server (default 30m0s) -promscrape.kubernetesSDCheckInterval duration @@ -706,6 +712,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -remoteWrite.maxHourlySeries int The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries + -remoteWrite.multitenantURL array + Base path for multitenant remote storage URL to write data to. See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://:8480 . Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url + Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.oauth2.clientID array Optional OAuth2 clientID to use for -remoteWrite.url. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url Supports an array of values separated by comma or specified via multiple flags. @@ -725,7 +734,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234 Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.queues int - The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage (default 2 * numberOfAvailableCPUs) + The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage. Default value if 2 * numberOfAvailableCPUs (default 8) -remoteWrite.rateLimit array Optional rate limit in bytes per second for data sent to -remoteWrite.url. By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data is sent after temporary unavailability of the remote storage Supports array of values separated by comma or specified via multiple flags. @@ -762,7 +771,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -remoteWrite.tmpDataPath string Path to directory where temporary data for remote write component is stored. See also -remoteWrite.maxDiskUsagePerURL (default "vmagent-remotewrite-data") -remoteWrite.url array - Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems + Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.multitenantURL Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.urlRelabelConfig array Optional path to relabel config for the corresponding -remoteWrite.url diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index ac330972b..6aa1ad2d1 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" @@ -17,24 +16,24 @@ import ( var ( rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="csvimport"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="csvimport"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`) ) // InsertHandler processes csv data from req. -func InsertHandler(p *httpserver.Path, req *http.Request) error { +func InsertHandler(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(p, rows, extraLabels) + return insertRows(at, rows, extraLabels) }) }) } -func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -68,13 +67,10 @@ func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarsh ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(p, &ctx.WriteRequest) + remotewrite.PushWithAuthToken(at, &ctx.WriteRequest) rowsInserted.Add(len(rows)) - if p != nil { - at, err := auth.NewToken(p.AuthToken) - if err == nil { - rowsTenantInserted.Get(at).Add(len(rows)) - } + if at != nil { + rowsTenantInserted.Get(at).Add(len(rows)) } rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/graphite/request_handler.go b/app/vmagent/graphite/request_handler.go index c3ef22d8d..fd0d96cd6 100644 --- a/app/vmagent/graphite/request_handler.go +++ b/app/vmagent/graphite/request_handler.go @@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(nil, &ctx.WriteRequest) + remotewrite.Push(&ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index a443af701..74027d305 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -29,7 +28,7 @@ var ( var ( rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="influx"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="influx"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="influx"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`) ) @@ -47,7 +46,7 @@ func InsertHandlerForReader(r io.Reader) error { // InsertHandlerForHTTP processes remote write for influx line protocol. // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md -func InsertHandlerForHTTP(p *httpserver.Path, req *http.Request) error { +func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err @@ -59,12 +58,12 @@ func InsertHandlerForHTTP(p *httpserver.Path, req *http.Request) error { // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { - return insertRows(p, db, rows, extraLabels) + return insertRows(at, db, rows, extraLabels) }) }) } -func insertRows(p *httpserver.Path, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) @@ -134,13 +133,10 @@ func insertRows(p *httpserver.Path, db string, rows []parser.Row, extraLabels [] ctx.ctx.Labels = labels ctx.ctx.Samples = samples ctx.commonLabels = commonLabels - remotewrite.Push(p, &ctx.ctx.WriteRequest) + remotewrite.PushWithAuthToken(at, &ctx.ctx.WriteRequest) rowsInserted.Add(rowsTotal) - if p != nil { - at, err := auth.NewToken(p.AuthToken) - if err == nil { - rowsTenantInserted.Get(at).Add(rowsTotal) - } + if at != nil { + rowsTenantInserted.Get(at).Add(rowsTotal) } rowsPerInsert.Update(float64(rowsTotal)) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 2a3c7002c..a9f909c3f 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -89,7 +89,7 @@ func main() { logger.Infof("starting vmagent at %q...", *httpListenAddr) startTime := time.Now() - remotewrite.Init(nil) + remotewrite.Init() common.StartUnmarshalWorkers() writeconcurrencylimiter.Init() if len(*influxListenAddr) > 0 { @@ -161,76 +161,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true } - p, err := httpserver.ParsePath(r.URL.Path) - if err == nil && p.Prefix == "insert" { - - _, err := auth.NewToken(p.AuthToken) - if err != nil { - httpserver.Errorf(w, r, "auth error: %s", err) - return true - } - - switch p.Suffix { - case "prometheus/", "prometheus", "prometheus/api/v1/write": - prometheusWriteRequests.Inc() - if err := promremotewrite.InsertHandler(p, r); err != nil { - prometheusWriteErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - case "prometheus/api/v1/import": - vmimportRequests.Inc() - if err := vmimport.InsertHandler(p, r); err != nil { - vmimportErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - case "prometheus/api/v1/import/csv": - csvimportRequests.Inc() - if err := csvimport.InsertHandler(p, r); err != nil { - csvimportErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - case "prometheus/api/v1/import/prometheus": - prometheusimportRequests.Inc() - if err := prometheusimport.InsertHandler(p, r); err != nil { - prometheusimportErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - case "prometheus/api/v1/import/native": - nativeimportRequests.Inc() - if err := native.InsertHandler(p, r); err != nil { - nativeimportErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - case "influx/write", "influx/api/v2/write": - influxWriteRequests.Inc() - if err := influx.InsertHandlerForHTTP(p, r); err != nil { - influxWriteErrors.Inc() - httpserver.Errorf(w, r, "%s", err) - return true - } - w.WriteHeader(http.StatusNoContent) - return true - default: - // This link is not multitenant - } - - } - path := strings.Replace(r.URL.Path, "//", "/", -1) switch path { case "/api/v1/write": @@ -317,9 +247,92 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } return true } + if remotewrite.MultitenancyEnabled() { + return processMultitenantRequest(w, r, path) + } return false } +func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path string) bool { + p, err := httpserver.ParsePath(path) + if err != nil { + // Cannot parse multitenant path. Skip it - probably it will be parsed later. + return false + } + if p.Prefix != "insert" { + httpserver.Errorf(w, r, `unsupported multitenant prefix: %q; expected "insert"`, p.Prefix) + return true + } + at, err := auth.NewToken(p.AuthToken) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain auth token: %s", err) + return true + } + switch p.Suffix { + case "prometheus/", "prometheus", "prometheus/api/v1/write": + prometheusWriteRequests.Inc() + if err := promremotewrite.InsertHandler(at, r); err != nil { + prometheusWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import": + vmimportRequests.Inc() + if err := vmimport.InsertHandler(at, r); err != nil { + vmimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import/csv": + csvimportRequests.Inc() + if err := csvimport.InsertHandler(at, r); err != nil { + csvimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import/prometheus": + prometheusimportRequests.Inc() + if err := prometheusimport.InsertHandler(at, r); err != nil { + prometheusimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import/native": + nativeimportRequests.Inc() + if err := native.InsertHandler(at, r); err != nil { + nativeimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "influx/write", "influx/api/v2/write": + influxWriteRequests.Inc() + if err := influx.InsertHandlerForHTTP(at, r); err != nil { + influxWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "influx/query": + influxQueryRequests.Inc() + influxutils.WriteDatabaseNames(w) + return true + default: + httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix) + return true + } +} + var ( prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`) prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="promremotewrite"}`) diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index b65af88a9..5d931a1c8 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -7,7 +7,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -19,26 +18,26 @@ import ( var ( rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="native"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="native"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`) ) // InsertHandler processes `/api/v1/import` request. // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 -func InsertHandler(p *httpserver.Path, req *http.Request) error { +func InsertHandler(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(block *parser.Block) error { - return insertRows(p, block, extraLabels) + return insertRows(at, block, extraLabels) }) }) } -func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -46,11 +45,8 @@ func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmar // since relabeling can prevent from inserting the rows. rowsLen := len(block.Values) rowsInserted.Add(rowsLen) - if p != nil { - at, err := auth.NewToken(p.AuthToken) - if err == nil { - rowsTenantInserted.Get(at).Add(rowsLen) - } + if at != nil { + rowsTenantInserted.Get(at).Add(rowsLen) } rowsPerInsert.Update(float64(rowsLen)) @@ -90,6 +86,6 @@ func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmar ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(p, &ctx.WriteRequest) + remotewrite.PushWithAuthToken(at, &ctx.WriteRequest) return nil } diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index 2721912d6..628676de9 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(nil, &ctx.WriteRequest) + remotewrite.Push(&ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index 7d2d409eb..b3026ab86 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -65,7 +65,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(nil, &ctx.WriteRequest) + remotewrite.Push(&ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index c6cdf80a0..dc623d2ef 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -17,12 +16,12 @@ import ( var ( rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="prometheus"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="prometheus"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`) ) // InsertHandler processes `/api/v1/import/prometheus` request. -func InsertHandler(p *httpserver.Path, req *http.Request) error { +func InsertHandler(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err @@ -34,12 +33,12 @@ func InsertHandler(p *httpserver.Path, req *http.Request) error { return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { - return insertRows(p, rows, extraLabels) + return insertRows(at, rows, extraLabels) }, nil) }) } -func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -73,13 +72,10 @@ func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarsh ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(p, &ctx.WriteRequest) + remotewrite.PushWithAuthToken(at, &ctx.WriteRequest) rowsInserted.Add(len(rows)) - if p != nil { - at, err := auth.NewToken(p.AuthToken) - if err == nil { - rowsTenantInserted.Get(at).Add(len(rows)) - } + if at != nil { + rowsTenantInserted.Get(at).Add(len(rows)) } rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 64b3a91a7..611d56b7e 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -7,7 +7,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -18,25 +17,25 @@ import ( ) var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="promremotewrite"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`) + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="promremotewrite"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`) ) // InsertHandler processes remote write for prometheus. -func InsertHandler(p *httpserver.Path, req *http.Request) error { +func InsertHandler(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { - return insertRows(p, tss, extraLabels) + return insertRows(at, tss, extraLabels) }) }) } -func insertRows(p *httpserver.Path, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -72,13 +71,10 @@ func insertRows(p *httpserver.Path, timeseries []prompb.TimeSeries, extraLabels ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(p, &ctx.WriteRequest) + remotewrite.PushWithAuthToken(at, &ctx.WriteRequest) rowsInserted.Add(rowsTotal) - if p != nil { - at, err := auth.NewToken(p.AuthToken) - if err == nil { - rowsTenantInserted.Get(at).Add(rowsTotal) - } + if at != nil { + rowsTenantInserted.Get(at).Add(rowsTotal) } rowsPerInsert.Update(float64(rowsTotal)) return nil diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 5dcfc1805..ca34c11f4 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -43,7 +43,7 @@ func loadRelabelConfigs() (*relabelConfigs, error) { rcs.global = global } if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) { - return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", + return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url or -remoteWrite.multitenantURL args: %d", len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs))) } rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs))) @@ -58,7 +58,6 @@ func loadRelabelConfigs() (*relabelConfigs, error) { } rcs.perURL[i] = prc } - return &rcs, nil } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 70a294cc8..f41e25105 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -8,17 +8,18 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" xxhash "github.com/cespare/xxhash/v2" ) @@ -26,10 +27,10 @@ import ( var ( remoteWriteURLs = flagutil.NewArray("remoteWrite.url", "Remote storage URL to write data to. It must support Prometheus remote_write API. "+ "It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428/api/v1/write . "+ - "Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems") - remoteWriteMultitenantURLs = flagutil.NewArray("remoteWrite.multitenantURL", "Base path for remote storage URL to write data to. It must support VictoriaMetrics remote_write tenants API (identified by accountID or accountID:projectID). "+ - "It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428 . "+ - "Pass multiple -remoteWrite.multitenantURL flags in order to write data concurrently to multiple remote storage systems") + "Pass multiple -remoteWrite.url flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.multitenantURL") + remoteWriteMultitenantURLs = flagutil.NewArray("remoteWrite.multitenantURL", "Base path for multitenant remote storage URL to write data to. "+ + "See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://:8480 . "+ + "Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url") tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+ "See also -remoteWrite.maxDiskUsagePerURL") queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+ @@ -57,10 +58,22 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries") ) -var defaultWriteToken = "default" -var rwctxsMap = map[string][]*remoteWriteCtx{} +var ( + // rwctxsDefault contains statically populated entries when -remoteWrite.url is specified. + rwctxsDefault []*remoteWriteCtx -var rwctxLock = sync.Mutex{} + // rwctxsMap contains dynamically populated entries when -remoteWrite.multitenantURL is specified. + rwctxsMap = make(map[tenantmetrics.TenantID][]*remoteWriteCtx) + rwctxsMapLock sync.Mutex + + // Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified. + defaultAuthToken = &auth.Token{} +) + +// MultitenancyEnabled returns true if -remoteWrite.multitenantURL is specified. +func MultitenancyEnabled() bool { + return len(*remoteWriteMultitenantURLs) > 0 +} // Contains the current relabelConfigs. var allRelabelConfigs atomic.Value @@ -82,27 +95,13 @@ func InitSecretFlags() { // It must be called after flag.Parse(). // // Stop must be called for graceful shutdown. -func Init(p *httpserver.Path) { - rwctxLock.Lock() - defer rwctxLock.Unlock() +func Init() { if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) == 0 { logger.Fatalf("at least one `-remoteWrite.url` or `-remoteWrite.multitenantURL` command-line flag must be set") } - // Do not Init MultitenantURLs they are dynamically initialized - if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) > 0 && p == nil { - return + if len(*remoteWriteURLs) > 0 && len(*remoteWriteMultitenantURLs) > 0 { + logger.Fatalf("cannot set both `-remoteWrite.url` and `-remoteWrite.multitenantURL` command-line flags") } - - // Create one writecontext per tenant - writeContextIndex := defaultWriteToken - - if p != nil { - writeContextIndex = p.AuthToken - } - if _, ok := rwctxsMap[writeContextIndex]; ok { - return - } - if *maxHourlySeries > 0 { hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour) _ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 { @@ -140,43 +139,9 @@ func Init(p *httpserver.Path) { } allRelabelConfigs.Store(rcs) - maxInmemoryBlocks := memory.Allowed() / (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) / maxRowsPerBlock / 100 - if maxInmemoryBlocks > 400 { - // There is no much sense in keeping higher number of blocks in memory, - // since this means that the producer outperforms consumer and the queue - // will continue growing. It is better storing the queue to file. - maxInmemoryBlocks = 400 + if len(*remoteWriteURLs) > 0 { + rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs) } - if maxInmemoryBlocks < 2 { - maxInmemoryBlocks = 2 - } - - rwctxs := []*remoteWriteCtx{} - - if len(*remoteWriteURLs) > 0 && p == nil { - for i, remoteWriteURL := range *remoteWriteURLs { - sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) - if *showRemoteWriteURL { - sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL) - } - rwctx := newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) - rwctxs = append(rwctxs, rwctx) - } - } - - if len(*remoteWriteMultitenantURLs) > 0 && p != nil { - for i, remoteWriteMultitenantURL := range *remoteWriteMultitenantURLs { - sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) - if *showRemoteWriteURL { - sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteMultitenantURL) - } - remoteWriteMultitenantURL := fmt.Sprintf("%s/%s/%s/%s", remoteWriteMultitenantURL, p.Prefix, p.AuthToken, p.Suffix) - rwctx := newRemoteWriteCtx(i, remoteWriteMultitenantURL, maxInmemoryBlocks, sanitizedURL) - rwctxs = append(rwctxs, rwctx) - } - } - - rwctxsMap[writeContextIndex] = rwctxs // Start config reloader. configReloaderWG.Add(1) @@ -200,6 +165,37 @@ func Init(p *httpserver.Path) { }() } +func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { + if len(urls) == 0 { + logger.Panicf("BUG: urls must be non-empty") + } + + maxInmemoryBlocks := memory.Allowed() / len(urls) / maxRowsPerBlock / 100 + if maxInmemoryBlocks > 400 { + // There is no much sense in keeping higher number of blocks in memory, + // since this means that the producer outperforms consumer and the queue + // will continue growing. It is better storing the queue to file. + maxInmemoryBlocks = 400 + } + if maxInmemoryBlocks < 2 { + maxInmemoryBlocks = 2 + } + rwctxs := make([]*remoteWriteCtx, len(urls)) + for i, remoteWriteURL := range urls { + sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) + if at != nil { + // Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format + remoteWriteURL = fmt.Sprintf("%s/insert/%d:%d/prometheus/api/v1/write", remoteWriteURL, at.AccountID, at.ProjectID) + sanitizedURL = fmt.Sprintf("%s:%d:%d", sanitizedURL, at.AccountID, at.ProjectID) + } + if *showRemoteWriteURL { + sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL) + } + rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) + } + return rwctxs +} + var stopCh = make(chan struct{}) var configReloaderWG sync.WaitGroup @@ -209,11 +205,17 @@ var configReloaderWG sync.WaitGroup func Stop() { close(stopCh) configReloaderWG.Wait() + + for _, rwctx := range rwctxsDefault { + rwctx.MustStop() + } + rwctxsDefault = nil + + // There is no need in locking rwctxsMapLock here, since nobody should call Push during the Stop call. for _, rwctxs := range rwctxsMap { for _, rwctx := range rwctxs { rwctx.MustStop() } - rwctxs = nil } rwctxsMap = nil } @@ -221,19 +223,37 @@ func Stop() { // Push sends wr to remote storage systems set via `-remoteWrite.url`. // // Note that wr may be modified by Push due to relabeling and rounding. -func Push(p *httpserver.Path, wr *prompbmarshal.WriteRequest) { - // if a queue is not created for this tenant, create it dynamically using the auth.Token - var rwctxs []*remoteWriteCtx - writeContextIndex := defaultWriteToken +func Push(wr *prompbmarshal.WriteRequest) { + PushWithAuthToken(nil, wr) +} - // if no tenant speficied, p is nil - if p != nil { - writeContextIndex = p.AuthToken +// PushWithAuthToken sends wr to remote storage systems set via `-remoteWrite.multitenantURL`. +// +// Note that wr may be modified by Push due to relabeling and rounding. +func PushWithAuthToken(at *auth.Token, wr *prompbmarshal.WriteRequest) { + if at == nil && len(*remoteWriteMultitenantURLs) > 0 { + // Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set. + at = defaultAuthToken } - if _, ok := rwctxsMap[writeContextIndex]; !ok { - Init(p) + var rwctxs []*remoteWriteCtx + if at == nil { + rwctxs = rwctxsDefault + } else { + if len(*remoteWriteMultitenantURLs) == 0 { + logger.Panicf("BUG: remoteWriteMultitenantURLs must be non-empty for non-nil at") + } + rwctxsMapLock.Lock() + tenantID := tenantmetrics.TenantID{ + AccountID: at.AccountID, + ProjectID: at.ProjectID, + } + rwctxs = rwctxsMap[tenantID] + if rwctxs == nil { + rwctxs = newRemoteWriteCtxs(at, *remoteWriteMultitenantURLs) + rwctxsMap[tenantID] = rwctxs + } + rwctxsMapLock.Unlock() } - rwctxs = rwctxsMap[writeContextIndex] var rctx *relabelCtx rcs := allRelabelConfigs.Load().(*relabelConfigs) diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index b2fb10023..d30ebcd3a 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -7,7 +7,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -19,26 +18,26 @@ import ( var ( rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="vmimport"}`) - rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="vmimport"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="vmimport"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="vmimport"}`) ) // InsertHandler processes `/api/v1/import` request. // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 -func InsertHandler(p *httpserver.Path, req *http.Request) error { +func InsertHandler(at *auth.Token, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(p, rows, extraLabels) + return insertRows(at, rows, extraLabels) }) }) } -func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -78,13 +77,10 @@ func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarsh ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(p, &ctx.WriteRequest) + remotewrite.PushWithAuthToken(at, &ctx.WriteRequest) rowsInserted.Add(rowsTotal) - if p != nil { - at, err := auth.NewToken(p.AuthToken) - if err == nil { - rowsTenantInserted.Get(at).Add(rowsTotal) - } + if at != nil { + rowsTenantInserted.Get(at).Add(rowsTotal) } rowsPerInsert.Update(float64(rowsTotal)) return nil diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 200992b7a..b313b6b48 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -7,6 +7,7 @@ sort: 15 ## tip * FEATURE: add `present_over_time(m[d])` function, which returns 1 if `m` has a least a single sample over the previous duration `d`. This function has been added also to [Prometheus 2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0). +* FEATURE: vmagent: support multitenant writes according to [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy). This allows using a single `vmagent` instance in front of VictoriaMetrics cluster for all the tenants. Thanks to @omarghader for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1505). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491). * FEATURE: vmagent: add `__meta_ec2_availability_zone_id` label to discovered Amazon EC2 targets. This label is available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0). * FAETURE: vmagent: add `__meta_gce_interface_ipv4_` labels to discovered GCE targets. These labels are available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0). * FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query can process per each time series. This option can protect from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067). diff --git a/docs/vmagent.md b/docs/vmagent.md index 0eeb38532..ba3cf0882 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -139,7 +139,11 @@ Also, Basic Auth can be enabled for the incoming `remote_write` requests with `- ### remote_write for clustered version -While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `://:8480/insert//prometheus/api/v1/write` +While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `://:8480/insert//prometheus/api/v1/write` according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). There is also support for multitenant writes. See [these docs](#multitenancy). + +## Multitenancy + +By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. But it can accept multitenant data if `-remoteWrite.multitenantURL` is set. In this case it accepts multitenant data at `http://vmagent:8429/insert//...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert//prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants. ## How to collect metrics in Prometheus format @@ -638,7 +642,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -promscrape.consulSDCheckInterval duration Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details (default 30s) -promscrape.digitaloceanSDCheckInterval duration - Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s) + Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s) -promscrape.disableCompression Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control -promscrape.disableKeepAlive @@ -649,6 +653,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . The maximum duration for waiting to perform API requests if more than -promscrape.discovery.concurrency requests are simultaneously performed (default 1m0s) -promscrape.dnsSDCheckInterval duration Interval for checking for changes in dns. This works only if dns_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config for details (default 30s) + -promscrape.dockerSDCheckInterval duration + Interval for checking for changes in docker. This works only if docker_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#docker_sd_config for details (default 30s) -promscrape.dockerswarmSDCheckInterval duration Interval for checking for changes in dockerswarm. This works only if dockerswarm_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config for details (default 30s) -promscrape.dropOriginalLabels @@ -662,7 +668,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -promscrape.gceSDCheckInterval duration Interval for checking for changes in gce. This works only if gce_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details (default 1m0s) -promscrape.httpSDCheckInterval duration - Interval for checking for changes in http service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s) + Interval for checking for changes in http endpoint service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s) -promscrape.kubernetes.apiServerTimeout duration How frequently to reload the full state from Kuberntes API server (default 30m0s) -promscrape.kubernetesSDCheckInterval duration @@ -710,6 +716,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -remoteWrite.maxHourlySeries int The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries + -remoteWrite.multitenantURL array + Base path for multitenant remote storage URL to write data to. See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://:8480 . Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url + Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.oauth2.clientID array Optional OAuth2 clientID to use for -remoteWrite.url. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url Supports an array of values separated by comma or specified via multiple flags. @@ -729,7 +738,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234 Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.queues int - The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage (default 2 * numberOfAvailableCPUs) + The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage. Default value if 2 * numberOfAvailableCPUs (default 8) -remoteWrite.rateLimit array Optional rate limit in bytes per second for data sent to -remoteWrite.url. By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data is sent after temporary unavailability of the remote storage Supports array of values separated by comma or specified via multiple flags. @@ -766,7 +775,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -remoteWrite.tmpDataPath string Path to directory where temporary data for remote write component is stored. See also -remoteWrite.maxDiskUsagePerURL (default "vmagent-remotewrite-data") -remoteWrite.url array - Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems + Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.multitenantURL Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.urlRelabelConfig array Optional path to relabel config for the corresponding -remoteWrite.url diff --git a/lib/auth/auth.go b/lib/auth/auth.go new file mode 100644 index 000000000..7a4d2c11c --- /dev/null +++ b/lib/auth/auth.go @@ -0,0 +1,35 @@ +package auth + +import ( + "fmt" + "strconv" + "strings" +) + +// Token contains settings for request processing +type Token struct { + ProjectID uint32 + AccountID uint32 +} + +// NewToken returns new Token for the given authToken +func NewToken(authToken string) (*Token, error) { + tmp := strings.Split(authToken, ":") + if len(tmp) > 2 { + return nil, fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp)) + } + var at Token + accountID, err := strconv.Atoi(tmp[0]) + if err != nil { + return nil, fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err) + } + at.AccountID = uint32(accountID) + if len(tmp) > 1 { + projectID, err := strconv.Atoi(tmp[1]) + if err != nil { + return nil, fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err) + } + at.ProjectID = uint32(projectID) + } + return &at, nil +} diff --git a/lib/httpserver/path.go b/lib/httpserver/path.go new file mode 100644 index 000000000..833c0a6b4 --- /dev/null +++ b/lib/httpserver/path.go @@ -0,0 +1,64 @@ +package httpserver + +import ( + "fmt" + "strings" +) + +// Path contains the following path structure: +// /{prefix}/{authToken}/{suffix} +// +// It is compatible with SaaS version. +type Path struct { + Prefix string + AuthToken string + Suffix string +} + +// ParsePath parses the given path. +func ParsePath(path string) (*Path, error) { + // The path must have the following form: + // /{prefix}/{authToken}/{suffix} + // + // - prefix must contain `select`, `insert` or `delete`. + // - authToken contains `accountID[:projectID]`, where projectID is optional. + // - suffix contains arbitrary suffix. + // + // prefix must be used for the routing to the appropriate service + // in the cluster - either vminsert or vmselect. + s := skipPrefixSlashes(path) + n := strings.IndexByte(s, '/') + if n < 0 { + return nil, fmt.Errorf("cannot find {prefix}") + } + prefix := s[:n] + + s = skipPrefixSlashes(s[n+1:]) + n = strings.IndexByte(s, '/') + if n < 0 { + return nil, fmt.Errorf("cannot find {authToken}") + } + authToken := s[:n] + + s = skipPrefixSlashes(s[n+1:]) + + // Substitute double slashes with single slashes in the path, since such slashes + // may appear due improper copy-pasting of the url. + suffix := strings.Replace(s, "//", "/", -1) + + p := &Path{ + Prefix: prefix, + AuthToken: authToken, + Suffix: suffix, + } + return p, nil +} + +// skipPrefixSlashes remove double slashes which may appear due +// improper copy-pasting of the url +func skipPrefixSlashes(s string) string { + for len(s) > 0 && s[0] == '/' { + s = s[1:] + } + return s +} diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 6b661566d..8509636d6 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -8,7 +8,6 @@ import ( "sync/atomic" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -50,7 +49,7 @@ func CheckConfig() error { // Init initializes Prometheus scraper with config from the `-promscrape.config`. // // Scraped data is passed to pushData. -func Init(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) { +func Init(pushData func(wr *prompbmarshal.WriteRequest)) { globalStopCh = make(chan struct{}) scraperWG.Add(1) go func() { @@ -73,7 +72,7 @@ var ( PendingScrapeConfigs int32 ) -func runScraper(configFile string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { +func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { if configFile == "" { // Nothing to scrape. return @@ -161,13 +160,13 @@ func runScraper(configFile string, pushData func(p *httpserver.Path, wr *prompbm var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`) type scrapeConfigs struct { - pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) + pushData func(wr *prompbmarshal.WriteRequest) wg sync.WaitGroup stopCh chan struct{} scfgs []*scrapeConfig } -func newScrapeConfigs(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scrapeConfigs { +func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConfigs { return &scrapeConfigs{ pushData: pushData, stopCh: make(chan struct{}), @@ -208,7 +207,7 @@ func (scs *scrapeConfigs) stop() { type scrapeConfig struct { name string - pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) + pushData func(wr *prompbmarshal.WriteRequest) getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork checkInterval time.Duration cfgCh chan *Config @@ -257,7 +256,7 @@ type scraperGroup struct { wg sync.WaitGroup mLock sync.Mutex m map[string]*scraper - pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) + pushData func(wr *prompbmarshal.WriteRequest) changesCount *metrics.Counter activeScrapers *metrics.Counter @@ -265,7 +264,7 @@ type scraperGroup struct { scrapersStopped *metrics.Counter } -func newScraperGroup(name string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraperGroup { +func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup { sg := &scraperGroup{ name: name, m: make(map[string]*scraper), @@ -359,7 +358,7 @@ type scraper struct { stopCh chan struct{} } -func newScraper(sw *ScrapeWork, group string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraper { +func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper { sc := &scraper{ stopCh: make(chan struct{}), } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 90cd3b9d3..281a0e57e 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -10,7 +10,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" @@ -164,7 +163,7 @@ type scrapeWork struct { GetStreamReader func() (*streamReader, error) // PushData is called for pushing collected data. - PushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) + PushData func(wr *prompbmarshal.WriteRequest) // ScrapeGroup is name of ScrapeGroup that // scrapeWork belongs to @@ -317,7 +316,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) startTime := time.Now() - sw.PushData(nil, &wc.writeRequest) + sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) sw.prevLabelsLen = len(wc.labels) wc.reset() @@ -359,7 +358,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } sw.updateSeriesAdded(wc) startTime := time.Now() - sw.PushData(nil, &wc.writeRequest) + sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) wc.resetNoRows() return nil @@ -387,7 +386,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) startTime := time.Now() - sw.PushData(nil, &wc.writeRequest) + sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) sw.prevLabelsLen = len(wc.labels) wc.reset() diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 4083a4495..7d2eeced6 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -5,7 +5,6 @@ import ( "strings" "testing" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -59,7 +58,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) { pushDataCalls := 0 var pushDataErr error - sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) { + sw.PushData = func(wr *prompbmarshal.WriteRequest) { if err := expectEqualTimeseries(wr.Timeseries, timeseriesExpected); err != nil { pushDataErr = fmt.Errorf("unexpected data pushed: %w\ngot\n%#v\nwant\n%#v", err, wr.Timeseries, timeseriesExpected) } @@ -99,7 +98,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { pushDataCalls := 0 var pushDataErr error - sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) { + sw.PushData = func(wr *prompbmarshal.WriteRequest) { pushDataCalls++ if len(wr.Timeseries) > len(timeseriesExpected) { pushDataErr = fmt.Errorf("too many time series obtained; got %d; want %d\ngot\n%+v\nwant\n%+v", diff --git a/lib/promscrape/scrapework_timing_test.go b/lib/promscrape/scrapework_timing_test.go index 0aa9ec363..0720d698e 100644 --- a/lib/promscrape/scrapework_timing_test.go +++ b/lib/promscrape/scrapework_timing_test.go @@ -4,7 +4,6 @@ import ( "fmt" "testing" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) @@ -40,7 +39,7 @@ vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356 var sw scrapeWork sw.Config = &ScrapeWork{} sw.ReadData = readDataFunc - sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {} + sw.PushData = func(wr *prompbmarshal.WriteRequest) {} timestamp := int64(0) for pb.Next() { if err := sw.scrapeInternal(timestamp, timestamp); err != nil { diff --git a/lib/tenantmetrics/counter_map.go b/lib/tenantmetrics/counter_map.go new file mode 100644 index 000000000..710986b80 --- /dev/null +++ b/lib/tenantmetrics/counter_map.go @@ -0,0 +1,72 @@ +package tenantmetrics + +import ( + "fmt" + "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" +) + +// TenantID defines metric tenant. +type TenantID struct { + AccountID uint32 + ProjectID uint32 +} + +// CounterMap is a map of counters keyed by tenant. +type CounterMap struct { + metric string + m atomic.Value +} + +// NewCounterMap creates new CounterMap for the given metric. +func NewCounterMap(metric string) *CounterMap { + cm := &CounterMap{ + metric: metric, + } + cm.m.Store(make(map[TenantID]*metrics.Counter)) + return cm +} + +// Get returns counter for the given at +func (cm *CounterMap) Get(at *auth.Token) *metrics.Counter { + key := TenantID{ + AccountID: at.AccountID, + ProjectID: at.ProjectID, + } + return cm.GetByTenant(key) +} + +// GetByTenant returns counter for the given key. +func (cm *CounterMap) GetByTenant(key TenantID) *metrics.Counter { + m := cm.m.Load().(map[TenantID]*metrics.Counter) + if c := m[key]; c != nil { + // Fast path - the counter for k already exists. + return c + } + + // Slow path - create missing counter for k and re-create m. + newM := make(map[TenantID]*metrics.Counter, len(m)+1) + for k, c := range m { + newM[k] = c + } + metricName := createMetricName(cm.metric, key) + c := metrics.GetOrCreateCounter(metricName) + newM[key] = c + cm.m.Store(newM) + return c +} + +func createMetricName(metric string, key TenantID) string { + if len(metric) == 0 { + logger.Panicf("BUG: metric cannot be empty") + } + if metric[len(metric)-1] != '}' { + // Metric without labels. + return fmt.Sprintf(`%s{accountID="%d",projectID="%d"}`, metric, key.AccountID, key.ProjectID) + } + // Metric with labels. + return fmt.Sprintf(`%s,accountID="%d",projectID="%d"}`, metric[:len(metric)-1], key.AccountID, key.ProjectID) +} diff --git a/lib/tenantmetrics/counter_map_test.go b/lib/tenantmetrics/counter_map_test.go new file mode 100644 index 000000000..b4334537e --- /dev/null +++ b/lib/tenantmetrics/counter_map_test.go @@ -0,0 +1,90 @@ +package tenantmetrics + +import ( + "fmt" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" +) + +func TestCreateMetricNameError(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expecting non-nil panic") + } + }() + _ = createMetricName("", TenantID{}) +} + +func TestCreateMetricNameSuccess(t *testing.T) { + f := func(s string, at *auth.Token, metricExpected string) { + t.Helper() + metric := createMetricName(s, TenantID{ + AccountID: at.AccountID, + ProjectID: at.ProjectID, + }) + if metric != metricExpected { + t.Fatalf("unexpected result for createMetricName(%q, %v); got %q; want %q", s, at, metric, metricExpected) + } + } + f(`a`, &auth.Token{AccountID: 1, ProjectID: 2}, `a{accountID="1",projectID="2"}`) + f(`foo{bar="baz"}`, &auth.Token{AccountID: 33, ProjectID: 41}, `foo{bar="baz",accountID="33",projectID="41"}`) + f(`foo{bar="baz",a="aa"}`, &auth.Token{AccountID: 33, ProjectID: 41}, `foo{bar="baz",a="aa",accountID="33",projectID="41"}`) +} + +func TestCounterMap(t *testing.T) { + cm := NewCounterMap("foobar") + cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Inc() + cm.Get(&auth.Token{AccountID: 4, ProjectID: 0}).Add(12) + + if n := cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Get(); n != 1 { + t.Fatalf("unexpected counter value; got %d; want %d", n, 1) + } + if n := cm.Get(&auth.Token{AccountID: 4, ProjectID: 0}).Get(); n != 12 { + t.Fatalf("unexpected counter value; got %d; want %d", n, 12) + } + if n := cm.Get(&auth.Token{}).Get(); n != 0 { + t.Fatalf("unexpected counter value; got %d; want %d", n, 0) + } +} + +func TestCounterMapConcurrent(t *testing.T) { + cm := NewCounterMap(`aaa{bb="cc"}`) + f := func() error { + for i := 0; i < 10; i++ { + cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Inc() + if n := cm.Get(&auth.Token{AccountID: 3, ProjectID: 4}).Get(); n != 0 { + return fmt.Errorf("unexpected counter value; got %d; want %d", n, 0) + } + cm.Get(&auth.Token{AccountID: 1, ProjectID: 3}).Add(5) + } + return nil + } + + const concurrency = 5 + ch := make(chan error, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + ch <- f() + }() + } + + for i := 0; i < concurrency; i++ { + select { + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } + } + + if n := cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Get(); n != concurrency*10 { + t.Fatalf("unexpected counter value; got %d; want %d", n, concurrency*10) + } + if n := cm.Get(&auth.Token{AccountID: 1, ProjectID: 3}).Get(); n != concurrency*10*5 { + t.Fatalf("unexpected counter value; got %d; want %d", n, concurrency*10*5) + } +}