From c80fc8c77f9bcdde1f801790b767224a58e866ed Mon Sep 17 00:00:00 2001
From: Dmytro Kozlov <kozlovdmitriyy@gmail.com>
Date: Fri, 10 Feb 2023 06:03:01 +0200
Subject: [PATCH] app/vmauth: add concurrent requests limit per auth record
 (#3749)

* app/vmauth: add concurent requests limit per auth record

* app/vmauth: added clarification comment

* app/vmauth: remove unused code

* app/vmauth: move read from limiter

* app/vmauth: fix text

* app/vmauth: fix comments

* - Clarify the docs for the max_concurrent_requests option at docs/vmauth.md
- Clarify the description of the change at docs/CHANGELOG.md
- Make sure that the -maxConcurrentRequests takes precedence over per-user max_concurrent_requests
- Update tests for verifying that the max_concurrent_requests option is parsed properly

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
---
 app/vmauth/README.md           | 23 ++++++------
 app/vmauth/auth_config.go      | 69 ++++++++++++++++++++++++++--------
 app/vmauth/auth_config_test.go |  8 ++--
 app/vmauth/example_config.yml  | 21 +++++------
 app/vmauth/main.go             | 29 +++++++++-----
 docs/CHANGELOG.md              |  1 +
 docs/vmauth.md                 | 23 ++++++------
 7 files changed, 112 insertions(+), 62 deletions(-)

diff --git a/app/vmauth/README.md b/app/vmauth/README.md
index 693d211dd..9df57113c 100644
--- a/app/vmauth/README.md
+++ b/app/vmauth/README.md
@@ -55,26 +55,27 @@ users:
   headers:
   - "X-Scope-OrgID: foobar"
 
-  # The user for querying local single-node VictoriaMetrics.
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be proxied to http://localhost:8428 .
+  # are proxied to http://localhost:8428 .
   # For example, http://vmauth:8427/api/v1/query is proxied to http://localhost:8428/api/v1/query
+  #
+  # The given user can send maximum 10 concurrent requests according to the provided max_concurrent_requests.
+  # Excess concurrent requests are rejected with 429 HTTP status code.
+  # See also -maxConcurrentRequests command-line flag for limiting the global number of concurrent requests.
 - username: "local-single-node"
   password: "***"
   url_prefix: "http://localhost:8428"
+  max_concurrent_requests: 10
 
-  # The user for querying local single-node VictoriaMetrics with extra_label team=dev.
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be routed to http://localhost:8428 with extra_label=team=dev query arg.
+  # are proxied to http://localhost:8428 with extra_label=team=dev query arg.
   # For example, http://vmauth:8427/api/v1/query is routed to http://localhost:8428/api/v1/query?extra_label=team=dev
-- username: "local-single-node"
+- username: "local-single-node2"
   password: "***"
   url_prefix: "http://localhost:8428?extra_label=team=dev"
 
-  # The user for querying account 123 in VictoriaMetrics cluster
-  # See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be load-balanced among http://vmselect1:8481/select/123/prometheus and http://vmselect2:8481/select/123/prometheus
+  # are load-balanced among http://vmselect1:8481/select/123/prometheus and http://vmselect2:8481/select/123/prometheus
   # For example, http://vmauth:8427/api/v1/query is proxied to the following urls in a round-robin manner:
   #   - http://vmselect1:8481/select/123/prometheus/api/v1/select
   #   - http://vmselect2:8481/select/123/prometheus/api/v1/select
@@ -84,10 +85,8 @@ users:
   - "http://vmselect1:8481/select/123/prometheus"
   - "http://vmselect2:8481/select/123/prometheus"
 
-  # The user for inserting Prometheus data into VictoriaMetrics cluster under account 42
-  # See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be load-balanced between http://vminsert1:8480/insert/42/prometheus and http://vminsert2:8480/insert/42/prometheus
+  # are load-balanced between http://vminsert1:8480/insert/42/prometheus and http://vminsert2:8480/insert/42/prometheus
   # For example, http://vmauth:8427/api/v1/write is proxied to the following urls in a round-robin manner:
   #   - http://vminsert1:8480/insert/42/prometheus/api/v1/write
   #   - http://vminsert2:8480/insert/42/prometheus/api/v1/write
@@ -285,7 +284,7 @@ See the docs at https://docs.victoriametrics.com/vmauth.html .
   -loggerWarnsPerSecondLimit int
      Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
   -maxConcurrentRequests int
-     The maximum number of concurrent requests vmauth can process. Other requests are rejected with '429 Too Many Requests' http status code. See also -maxIdleConnsPerBackend (default 1000)
+     The maximum number of concurrent requests vmauth can process. Other requests are rejected with '429 Too Many Requests' http status code. See also -maxIdleConnsPerBackend and max_concurrent_requests option per each user config (default 1000)
   -maxIdleConnsPerBackend int
      The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100)
   -memory.allowedBytes size
diff --git a/app/vmauth/auth_config.go b/app/vmauth/auth_config.go
index 19dc197e3..99e87b568 100644
--- a/app/vmauth/auth_config.go
+++ b/app/vmauth/auth_config.go
@@ -32,17 +32,40 @@ type AuthConfig struct {
 
 // UserInfo is user information read from authConfigPath
 type UserInfo struct {
-	Name        string     `yaml:"name,omitempty"`
-	BearerToken string     `yaml:"bearer_token,omitempty"`
-	Username    string     `yaml:"username,omitempty"`
-	Password    string     `yaml:"password,omitempty"`
-	URLPrefix   *URLPrefix `yaml:"url_prefix,omitempty"`
-	URLMaps     []URLMap   `yaml:"url_map,omitempty"`
-	Headers     []Header   `yaml:"headers,omitempty"`
+	Name                  string     `yaml:"name,omitempty"`
+	BearerToken           string     `yaml:"bearer_token,omitempty"`
+	Username              string     `yaml:"username,omitempty"`
+	Password              string     `yaml:"password,omitempty"`
+	URLPrefix             *URLPrefix `yaml:"url_prefix,omitempty"`
+	URLMaps               []URLMap   `yaml:"url_map,omitempty"`
+	Headers               []Header   `yaml:"headers,omitempty"`
+	MaxConcurrentRequests int        `yaml:"max_concurrent_requests,omitempty"`
+
+	concurrencyLimitCh      chan struct{}
+	concurrencyLimitReached *metrics.Counter
 
 	requests *metrics.Counter
 }
 
+func (ui *UserInfo) beginConcurrencyLimit() error {
+	if ui.concurrencyLimitCh == nil {
+		return nil
+	}
+	select {
+	case ui.concurrencyLimitCh <- struct{}{}:
+		return nil
+	default:
+		ui.concurrencyLimitReached.Inc()
+		return fmt.Errorf("cannot handle more than max_concurrent_requests=%d concurrent requests from user %s", ui.MaxConcurrentRequests, ui.name())
+	}
+}
+
+func (ui *UserInfo) endConcurrencyLimit() {
+	if ui.concurrencyLimitCh != nil {
+		<-ui.concurrencyLimitCh
+	}
+}
+
 // Header is `Name: Value` http header, which must be added to the proxied request.
 type Header struct {
 	Name  string
@@ -298,29 +321,45 @@ func parseAuthConfig(data []byte) (map[string]*UserInfo, error) {
 		if len(ui.URLMaps) == 0 && ui.URLPrefix == nil {
 			return nil, fmt.Errorf("missing `url_prefix`")
 		}
+		name := ui.name()
 		if ui.BearerToken != "" {
-			name := "bearer_token"
-			if ui.Name != "" {
-				name = ui.Name
-			}
 			if ui.Password != "" {
 				return nil, fmt.Errorf("password shouldn't be set for bearer_token %q", ui.BearerToken)
 			}
 			ui.requests = metrics.GetOrCreateCounter(fmt.Sprintf(`vmauth_user_requests_total{username=%q}`, name))
 		}
 		if ui.Username != "" {
-			name := ui.Username
-			if ui.Name != "" {
-				name = ui.Name
-			}
 			ui.requests = metrics.GetOrCreateCounter(fmt.Sprintf(`vmauth_user_requests_total{username=%q}`, name))
 		}
+		if ui.MaxConcurrentRequests > 0 {
+			ui.concurrencyLimitCh = make(chan struct{}, ui.MaxConcurrentRequests)
+			ui.concurrencyLimitReached = metrics.GetOrCreateCounter(fmt.Sprintf(`vmauth_user_concurrent_requests_limit_reached_total{username=%q}`, name))
+			_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmauth_user_concurrent_requests_capacity{username=%q}`, name), func() float64 {
+				return float64(cap(ui.concurrencyLimitCh))
+			})
+			_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmauth_user_concurrent_requests_current{username=%q}`, name), func() float64 {
+				return float64(len(ui.concurrencyLimitCh))
+			})
+		}
 		byAuthToken[at1] = ui
 		byAuthToken[at2] = ui
 	}
 	return byAuthToken, nil
 }
 
+func (ui *UserInfo) name() string {
+	if ui.Name != "" {
+		return ui.Name
+	}
+	if ui.Username != "" {
+		return ui.Username
+	}
+	if ui.BearerToken != "" {
+		return "bearer_token"
+	}
+	return ""
+}
+
 func getAuthTokens(bearerToken, username, password string) (string, string) {
 	if bearerToken != "" {
 		// Accept the bearerToken as Basic Auth username with empty password
diff --git a/app/vmauth/auth_config_test.go b/app/vmauth/auth_config_test.go
index d0a2f12ba..3b2a2bcfd 100644
--- a/app/vmauth/auth_config_test.go
+++ b/app/vmauth/auth_config_test.go
@@ -218,11 +218,13 @@ users:
 - username: foo
   password: bar
   url_prefix: http://aaa:343/bbb
+  max_concurrent_requests: 5
 `, map[string]*UserInfo{
 		getAuthToken("", "foo", "bar"): {
-			Username:  "foo",
-			Password:  "bar",
-			URLPrefix: mustParseURL("http://aaa:343/bbb"),
+			Username:              "foo",
+			Password:              "bar",
+			URLPrefix:             mustParseURL("http://aaa:343/bbb"),
+			MaxConcurrentRequests: 5,
 		},
 	})
 
diff --git a/app/vmauth/example_config.yml b/app/vmauth/example_config.yml
index a505c4854..1b6cd53fd 100644
--- a/app/vmauth/example_config.yml
+++ b/app/vmauth/example_config.yml
@@ -18,26 +18,27 @@ users:
   headers:
   - "X-Scope-OrgID: foobar"
 
-  # The user for querying local single-node VictoriaMetrics.
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be proxied to http://localhost:8428 .
+  # are proxied to http://localhost:8428 .
   # For example, http://vmauth:8427/api/v1/query is proxied to http://localhost:8428/api/v1/query
+  #
+  # The given user can send maximum 10 concurrent requests according to the provided max_concurrent_requests.
+  # Excess concurrent requests are rejected with 429 HTTP status code.
+  # See also -maxConcurrentRequests command-line flag for limiting the global number of concurrent requests.
 - username: "local-single-node"
   password: "***"
   url_prefix: "http://localhost:8428"
+  max_concurrent_requests: 10
 
-  # The user for querying local single-node VictoriaMetrics with extra_label team=dev.
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be routed to http://localhost:8428 with extra_label=team=dev query arg.
+  # are proxied to http://localhost:8428 with extra_label=team=dev query arg.
   # For example, http://vmauth:8427/api/v1/query is routed to http://localhost:8428/api/v1/query?extra_label=team=dev
-- username: "local-single-node"
+- username: "local-single-node2"
   password: "***"
   url_prefix: "http://localhost:8428?extra_label=team=dev"
 
-  # The user for querying account 123 in VictoriaMetrics cluster
-  # See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be load-balanced among http://vmselect1:8481/select/123/prometheus and http://vmselect2:8481/select/123/prometheus
+  # are load-balanced among http://vmselect1:8481/select/123/prometheus and http://vmselect2:8481/select/123/prometheus
   # For example, http://vmauth:8427/api/v1/query is proxied to the following urls in a round-robin manner:
   #   - http://vmselect1:8481/select/123/prometheus/api/v1/select
   #   - http://vmselect2:8481/select/123/prometheus/api/v1/select
@@ -47,10 +48,8 @@ users:
   - "http://vmselect1:8481/select/123/prometheus"
   - "http://vmselect2:8481/select/123/prometheus"
 
-  # The user for inserting Prometheus data into VictoriaMetrics cluster under account 42
-  # See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be load-balanced between http://vminsert1:8480/insert/42/prometheus and http://vminsert2:8480/insert/42/prometheus
+  # are load-balanced between http://vminsert1:8480/insert/42/prometheus and http://vminsert2:8480/insert/42/prometheus
   # For example, http://vmauth:8427/api/v1/write is proxied to the following urls in a round-robin manner:
   #   - http://vminsert1:8480/insert/42/prometheus/api/v1/write
   #   - http://vminsert2:8480/insert/42/prometheus/api/v1/write
diff --git a/app/vmauth/main.go b/app/vmauth/main.go
index c68d6e8ef..634283627 100644
--- a/app/vmauth/main.go
+++ b/app/vmauth/main.go
@@ -33,7 +33,7 @@ var (
 		"See also -maxConcurrentRequests")
 	responseTimeout       = flag.Duration("responseTimeout", 5*time.Minute, "The timeout for receiving a response from backend")
 	maxConcurrentRequests = flag.Int("maxConcurrentRequests", 1000, "The maximum number of concurrent requests vmauth can process. Other requests are rejected with "+
-		"'429 Too Many Requests' http status code. See also -maxIdleConnsPerBackend")
+		"'429 Too Many Requests' http status code. See also -maxIdleConnsPerBackend and max_concurrent_requests option per each user config")
 	reloadAuthKey        = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
 	logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+
 		`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
@@ -117,17 +117,19 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
 	concurrencyLimitOnce.Do(concurrencyLimitInit)
 	select {
 	case concurrencyLimitCh <- struct{}{}:
-	default:
-		concurrentRequestsLimitReachedTotal.Inc()
-		w.Header().Add("Retry-After", "10")
-		err := &httpserver.ErrorWithStatusCode{
-			Err:        fmt.Errorf("cannot serve more than -maxConcurrentRequests=%d concurrent requests", cap(concurrencyLimitCh)),
-			StatusCode: http.StatusTooManyRequests,
+		if err := ui.beginConcurrencyLimit(); err != nil {
+			handleConcurrencyLimitError(w, r, err)
+			<-concurrencyLimitCh
+			return true
 		}
-		httpserver.Errorf(w, r, "%s", err)
+	default:
+		concurrentRequestsLimitReached.Inc()
+		err := fmt.Errorf("cannot serve more than -maxConcurrentRequests=%d concurrent requests", cap(concurrencyLimitCh))
+		handleConcurrencyLimitError(w, r, err)
 		return true
 	}
 	processRequest(w, r, targetURL, headers)
+	ui.endConcurrencyLimit()
 	<-concurrencyLimitCh
 	return true
 }
@@ -269,7 +271,7 @@ func concurrencyLimitInit() {
 	})
 }
 
-var concurrentRequestsLimitReachedTotal = metrics.NewCounter("vmauth_concurrent_requests_limit_reached_total")
+var concurrentRequestsLimitReached = metrics.NewCounter("vmauth_concurrent_requests_limit_reached_total")
 
 func usage() {
 	const s = `
@@ -279,3 +281,12 @@ See the docs at https://docs.victoriametrics.com/vmauth.html .
 `
 	flagutil.Usage(s)
 }
+
+func handleConcurrencyLimitError(w http.ResponseWriter, r *http.Request, err error) {
+	w.Header().Add("Retry-After", "10")
+	err = &httpserver.ErrorWithStatusCode{
+		Err:        err,
+		StatusCode: http.StatusTooManyRequests,
+	}
+	httpserver.Errorf(w, r, "%s", err)
+}
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 2b61499ce..7f9bfe66a 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
 
 ## tip
 
+* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add the ability to limit the number of concurrent requests on a per-user basis via `max_concurrent_requests` option. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346) and [these docs](https://docs.victoriametrics.com/vmauth.html#auth-config).
 * FEATURE: [vmalert enterprise](https://docs.victoriametrics.com/vmalert.html): add ability to read alerting and recording rules from S3, GCS or S3-compatible object storage. See [these docs](https://docs.victoriametrics.com/vmalert.html#reading-rules-from-object-storage).
 
 ## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1)
diff --git a/docs/vmauth.md b/docs/vmauth.md
index 0d5dfbefa..ae9f532a4 100644
--- a/docs/vmauth.md
+++ b/docs/vmauth.md
@@ -59,26 +59,27 @@ users:
   headers:
   - "X-Scope-OrgID: foobar"
 
-  # The user for querying local single-node VictoriaMetrics.
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be proxied to http://localhost:8428 .
+  # are proxied to http://localhost:8428 .
   # For example, http://vmauth:8427/api/v1/query is proxied to http://localhost:8428/api/v1/query
+  #
+  # The given user can send maximum 10 concurrent requests according to the provided max_concurrent_requests.
+  # Excess concurrent requests are rejected with 429 HTTP status code.
+  # See also -maxConcurrentRequests command-line flag for limiting the global number of concurrent requests.
 - username: "local-single-node"
   password: "***"
   url_prefix: "http://localhost:8428"
+  max_concurrent_requests: 10
 
-  # The user for querying local single-node VictoriaMetrics with extra_label team=dev.
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be routed to http://localhost:8428 with extra_label=team=dev query arg.
+  # are proxied to http://localhost:8428 with extra_label=team=dev query arg.
   # For example, http://vmauth:8427/api/v1/query is routed to http://localhost:8428/api/v1/query?extra_label=team=dev
-- username: "local-single-node"
+- username: "local-single-node2"
   password: "***"
   url_prefix: "http://localhost:8428?extra_label=team=dev"
 
-  # The user for querying account 123 in VictoriaMetrics cluster
-  # See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be load-balanced among http://vmselect1:8481/select/123/prometheus and http://vmselect2:8481/select/123/prometheus
+  # are load-balanced among http://vmselect1:8481/select/123/prometheus and http://vmselect2:8481/select/123/prometheus
   # For example, http://vmauth:8427/api/v1/query is proxied to the following urls in a round-robin manner:
   #   - http://vmselect1:8481/select/123/prometheus/api/v1/select
   #   - http://vmselect2:8481/select/123/prometheus/api/v1/select
@@ -88,10 +89,8 @@ users:
   - "http://vmselect1:8481/select/123/prometheus"
   - "http://vmselect2:8481/select/123/prometheus"
 
-  # The user for inserting Prometheus data into VictoriaMetrics cluster under account 42
-  # See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
   # All the requests to http://vmauth:8427 with the given Basic Auth (username:password)
-  # will be load-balanced between http://vminsert1:8480/insert/42/prometheus and http://vminsert2:8480/insert/42/prometheus
+  # are load-balanced between http://vminsert1:8480/insert/42/prometheus and http://vminsert2:8480/insert/42/prometheus
   # For example, http://vmauth:8427/api/v1/write is proxied to the following urls in a round-robin manner:
   #   - http://vminsert1:8480/insert/42/prometheus/api/v1/write
   #   - http://vminsert2:8480/insert/42/prometheus/api/v1/write
@@ -289,7 +288,7 @@ See the docs at https://docs.victoriametrics.com/vmauth.html .
   -loggerWarnsPerSecondLimit int
      Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
   -maxConcurrentRequests int
-     The maximum number of concurrent requests vmauth can process. Other requests are rejected with '429 Too Many Requests' http status code. See also -maxIdleConnsPerBackend (default 1000)
+     The maximum number of concurrent requests vmauth can process. Other requests are rejected with '429 Too Many Requests' http status code. See also -maxIdleConnsPerBackend and max_concurrent_requests option per each user config (default 1000)
   -maxIdleConnsPerBackend int
      The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100)
   -memory.allowedBytes size