From 111f7da946a0d5c2306c844926d5fa8ec6ebc291 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Tue, 16 Jul 2024 18:59:16 +0200
Subject: [PATCH] Revert "app/vmauth: reader pool to reduce gc & mem alloc
 (#6533)"

This reverts commit 4d66e042e3d22c6babdfd45b3e32e9073fa59d35.

Reasons for revert:

- The commit makes unrelated invalid changes to docs/CHANGELOG.md
- The changes at app/vmauth/main.go are too complex. It is better splitting them into two parts:
  - pooling readTrackingBody struct for reducing pressure on GC
  - avoiding to use readTrackingBody when -maxRequestBodySizeToRetry command-line flag is set to 0

Let's make this in the follow-up commits!

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6445
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533
---
 app/vmauth/main.go      | 108 ++++++++++------------------------
 app/vmauth/main_test.go | 125 +---------------------------------------
 docs/CHANGELOG.md       |   3 -
 docs/vmauth.md          |   2 +-
 4 files changed, 34 insertions(+), 204 deletions(-)

diff --git a/app/vmauth/main.go b/app/vmauth/main.go
index 91af81cd11..e5b9f5ce11 100644
--- a/app/vmauth/main.go
+++ b/app/vmauth/main.go
@@ -50,7 +50,7 @@ var (
 		`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
 	failTimeout               = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend")
 	maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+
-		"Bigger values may require more memory. Negative or zero values disable request body caching and retries.")
+		"Bigger values may require more memory")
 	backendTLSInsecureSkipVerify = flag.Bool("backend.tlsInsecureSkipVerify", false, "Whether to skip TLS verification when connecting to backends over HTTPS. "+
 		"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
 	backendTLSCAFile = flag.String("backend.TLSCAFile", "", "Optional path to TLS root CA file, which is used for TLS verification when connecting to backends over HTTPS. "+
@@ -200,13 +200,10 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
 		up, hc = ui.DefaultURL, ui.HeadersConf
 		isDefault = true
 	}
-	// caching makes sense only for positive non zero size
-	if maxRequestBodySizeToRetry.IntN() > 0 {
-		rtb := getReadTrackingBody(r.Body, int(r.ContentLength))
-		defer putReadTrackingBody(rtb)
-		r.Body = rtb
-	}
 	maxAttempts := up.getBackendsCount()
+	r.Body = &readTrackingBody{
+		r: r.Body,
+	}
 	for i := 0; i < maxAttempts; i++ {
 		bu := up.getBackendURL()
 		targetURL := bu.url
@@ -505,6 +502,9 @@ type readTrackingBody struct {
 	// bufComplete is set to true when buf contains complete request body read from r.
 	bufComplete bool
 
+	// needReadBuf is set to true when Read() must be performed from buf instead of r.
+	needReadBuf bool
+
 	// offset is an offset at buf for the next data read if needReadBuf is set to true.
 	offset int
 }
@@ -512,63 +512,50 @@ type readTrackingBody struct {
 // Read implements io.Reader interface
 // tracks body reading requests
 func (rtb *readTrackingBody) Read(p []byte) (int, error) {
-	if rtb.offset < len(rtb.buf) {
-		if rtb.cannotRetry {
-			return 0, fmt.Errorf("cannot retry reading data from buf")
+	if rtb.needReadBuf {
+		if rtb.offset >= len(rtb.buf) {
+			return 0, io.EOF
 		}
-		nb := copy(p, rtb.buf[rtb.offset:])
-		rtb.offset += nb
-		if rtb.bufComplete {
-			if rtb.offset == len(rtb.buf) {
-				return nb, io.EOF
-			}
-			return nb, nil
-		}
-		if nb < len(p) {
-			nr, err := rtb.readFromStream(p[nb:])
-			return nb + nr, err
-		}
-		return nb, nil
+		n := copy(p, rtb.buf[rtb.offset:])
+		rtb.offset += n
+		return n, nil
 	}
-	if rtb.bufComplete {
-		return 0, io.EOF
-	}
-	return rtb.readFromStream(p)
-}
 
-func (rtb *readTrackingBody) readFromStream(p []byte) (int, error) {
 	if rtb.r == nil {
 		return 0, fmt.Errorf("cannot read data after closing the reader")
 	}
+
 	n, err := rtb.r.Read(p)
 	if rtb.cannotRetry {
 		return n, err
 	}
-	if rtb.offset+n > maxRequestBodySizeToRetry.IntN() {
-		rtb.cannotRetry = true
-	}
-	if n > 0 {
-		rtb.offset += n
-		rtb.buf = append(rtb.buf, p[:n]...)
-	}
-	if err != nil {
-		if err == io.EOF {
-			rtb.bufComplete = true
-			return n, err
-		}
+	if len(rtb.buf)+n > maxRequestBodySizeToRetry.IntN() {
 		rtb.cannotRetry = true
 		return n, err
 	}
-	return n, nil
+	rtb.buf = append(rtb.buf, p[:n]...)
+	if err == io.EOF {
+		rtb.bufComplete = true
+	}
+	return n, err
 }
 
 func (rtb *readTrackingBody) canRetry() bool {
-	return !rtb.cannotRetry
+	if rtb.cannotRetry {
+		return false
+	}
+	if len(rtb.buf) > 0 && !rtb.needReadBuf {
+		return false
+	}
+	return true
 }
 
 // Close implements io.Closer interface.
 func (rtb *readTrackingBody) Close() error {
 	rtb.offset = 0
+	if rtb.bufComplete {
+		rtb.needReadBuf = true
+	}
 
 	// Close rtb.r only if the request body is completely read or if it is too big.
 	// http.Roundtrip performs body.Close call even without any Read calls,
@@ -584,38 +571,3 @@ func (rtb *readTrackingBody) Close() error {
 
 	return nil
 }
-
-var readTrackingBodyPool sync.Pool
-
-func getReadTrackingBody(origin io.ReadCloser, b int) *readTrackingBody {
-	bufSize := 1024
-	if b > 0 && b < maxRequestBodySizeToRetry.IntN() {
-		bufSize = b
-	}
-	v := readTrackingBodyPool.Get()
-	if v == nil {
-		v = &readTrackingBody{
-			buf: make([]byte, 0, bufSize),
-		}
-	}
-	rtb := v.(*readTrackingBody)
-	rtb.r = origin
-	if bufSize > cap(rtb.buf) {
-		rtb.buf = make([]byte, 0, bufSize)
-	}
-
-	return rtb
-}
-
-func putReadTrackingBody(rtb *readTrackingBody) {
-	if rtb.r != nil {
-		_ = rtb.r.Close()
-	}
-	rtb.r = nil
-	rtb.buf = rtb.buf[:0]
-	rtb.offset = 0
-	rtb.cannotRetry = false
-	rtb.bufComplete = false
-
-	readTrackingBodyPool.Put(rtb)
-}
diff --git a/app/vmauth/main_test.go b/app/vmauth/main_test.go
index a7b34d67f0..3e7b6d6d38 100644
--- a/app/vmauth/main_test.go
+++ b/app/vmauth/main_test.go
@@ -55,6 +55,9 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) {
 		if n != 1 {
 			t.Fatalf("unexpected number of bytes read; got %d; want 1", n)
 		}
+		if rtb.canRetry() {
+			t.Fatalf("canRetry() must return false")
+		}
 		data, err := io.ReadAll(rtb)
 		if err != nil {
 			t.Fatalf("unexpected error when reading all the data: %s", err)
@@ -82,128 +85,6 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) {
 	f(newTestString(2 * maxRequestBodySizeToRetry.IntN()))
 }
 
-// request body not over maxRequestBodySizeToRetry
-// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected.
-// 2. retry request: because buf caches some data, first read buf and then read stream when retrying
-// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later.
-func TestRetryReadSuccessAfterPartialRead(t *testing.T) {
-	f := func(s string) {
-		rtb := &readTrackingBody{
-			r:   io.NopCloser(bytes.NewBufferString(s)),
-			buf: make([]byte, 0, len(s)),
-		}
-
-		var data []byte
-		var err error
-		halfSize := len(s) / 2
-		if halfSize == 0 {
-			halfSize = 100
-		}
-		buf := make([]byte, halfSize)
-		var n int
-
-		// read part of the data
-		n, err = rtb.Read(buf[:])
-		data = append(data, buf[:n]...)
-		if err != nil && err != io.EOF {
-			t.Fatalf("unexpected error: %s", err)
-		}
-
-		// request failed when output stream is closed (eg: server connection reset)
-		// would close the reader
-		if err := rtb.Close(); err != nil {
-			t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
-		}
-		if !rtb.canRetry() {
-			t.Fatalf("canRetry() must return true")
-		}
-
-		// retry read (read buf + remaining data)
-		data = data[:0]
-		err = nil
-		for err == nil {
-			n, err = rtb.Read(buf[:])
-			data = append(data, buf[:n]...)
-		}
-		if err != io.EOF {
-			t.Fatalf("unexpected error: %s", err)
-		}
-		if string(data) != s {
-			t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s)
-		}
-		// cannotRetry return false
-		// because the request data is not over maxRequestBodySizeToRetry limit
-		if !rtb.canRetry() {
-			t.Fatalf("canRetry() must return true")
-		}
-	}
-
-	f("")
-	f("foo")
-	f("foobar")
-	f(newTestString(maxRequestBodySizeToRetry.IntN()))
-}
-
-// request body over maxRequestBodySizeToRetry
-// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected.
-// 2. retry request: because buf caches some data, first read buf and then read stream when retrying
-// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later.
-func TestRetryReadSuccessAfterPartialReadAndCannotRetryAgain(t *testing.T) {
-	f := func(s string) {
-		rtb := &readTrackingBody{
-			r:   io.NopCloser(bytes.NewBufferString(s)),
-			buf: make([]byte, 0, len(s)),
-		}
-
-		var data []byte
-		var err error
-		halfSize := len(s) / 2
-		if halfSize == 0 {
-			halfSize = 100
-		}
-		buf := make([]byte, halfSize)
-		var n int
-
-		// read part of the data
-		n, err = rtb.Read(buf[:])
-		data = append(data, buf[:n]...)
-		if err != nil && err != io.EOF {
-			t.Fatalf("unexpected error: %s", err)
-		}
-
-		// request failed when output stream is closed (eg: server connection reset)
-		if err := rtb.Close(); err != nil {
-			t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
-		}
-		if !rtb.canRetry() {
-			t.Fatalf("canRetry() must return true")
-		}
-
-		// retry read (read buf + remaining data)
-		data = data[:0]
-		err = nil
-		for err == nil {
-			n, err = rtb.Read(buf[:])
-			data = append(data, buf[:n]...)
-		}
-		if err != io.EOF {
-			t.Fatalf("unexpected error: %s", err)
-		}
-		if string(data) != s {
-			t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s)
-		}
-
-		// cannotRetry returns true
-		// because the request data is over maxRequestBodySizeToRetry limit
-		if rtb.canRetry() {
-			t.Fatalf("canRetry() must return false")
-		}
-	}
-
-	f(newTestString(maxRequestBodySizeToRetry.IntN() + 1))
-	f(newTestString(2 * maxRequestBodySizeToRetry.IntN()))
-}
-
 func newTestString(sLen int) string {
 	return string(make([]byte, sLen))
 }
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 3a25c3b625..e518f9939a 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -38,9 +38,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
 * SECURITY: upgrade base docker image (Alpine) from 3.20.0 to 3.20.1. See [alpine 3.20.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.1-released.html).
 
 * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with backend host before sending the request to the configured backend. See [these docs](https://docs.victoriametrics.com/vmauth/#modifying-http-headers) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453)
-* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details.
 * FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details.
-* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details.
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): [`yandexcloud_sd_configs`](https://docs.victoriametrics.com/sd_configs/#yandexcloud_sd_configs): add support for obtaining IAM token in [GCE format](https://yandex.cloud/en-ru/docs/compute/operations/vm-connect/auth-inside-vm#auth-inside-vm) additionally to the [deprecated Amazon EC2 IMDSv1 format](https://yandex.cloud/en/docs/security/standard/authentication#aws-token). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5513).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add `-graphite.sanitizeMetricName` cmd-line flag for sanitizing metrics ingested via [Graphite protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6077).
 * FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose the following metrics at `/metrics` page of [vmagent](https://docs.victoriametrics.com/vmagent/) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/):
@@ -62,7 +60,6 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
 
 * BUGFIX: [vmgateway](https://docs.victoriametrics.com/vmgateway/): properly apply read and write based rate limits. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6148) for details.
 * BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).
-* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).
 * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix the dateMetricIDCache consistency issue that leads to duplicate per-day index entries when new time series are inserted concurrently. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6534) for details.
 * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix incorrect redirection in WebUI of vmalert. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6603) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6620).
 * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530).
diff --git a/docs/vmauth.md b/docs/vmauth.md
index ee36f62096..ffb0027542 100644
--- a/docs/vmauth.md
+++ b/docs/vmauth.md
@@ -1260,7 +1260,7 @@ See the docs at https://docs.victoriametrics.com/vmauth/ .
   -maxIdleConnsPerBackend int
      The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100)
   -maxRequestBodySizeToRetry size
-     The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory. Negative or zero values disable request body caching and retries.
+     The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory
      Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)
   -memory.allowedBytes size
      Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage