From da115b51700bc1ebdbe9fe4236586ac24c523445 Mon Sep 17 00:00:00 2001
From: Nikolay <nik@victoriametrics.com>
Date: Wed, 17 May 2023 09:19:33 +0200
Subject: [PATCH] app/vmauth: retry common network dial errors (#4280)

with tracking request body read calls
it allows us to retry POST and PUT requests
---
 app/vmauth/main.go | 53 ++++++++++++++++++++++++++++++++++++++++++----
 docs/CHANGELOG.md  |  1 +
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/app/vmauth/main.go b/app/vmauth/main.go
index 5cf685519..5d16da5dc 100644
--- a/app/vmauth/main.go
+++ b/app/vmauth/main.go
@@ -158,6 +158,8 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
 		up, headers = ui.DefaultURL, ui.Headers
 		isDefault = true
 	}
+	rtb := &readTrackingBody{ioc: r.Body}
+	r.Body = rtb
 
 	maxAttempts := up.getBackendsCount()
 	for i := 0; i < maxAttempts; i++ {
@@ -178,6 +180,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
 		}
 		bu.setBroken()
 	}
+	rtb.mustClose()
 	err := &httpserver.ErrorWithStatusCode{
 		Err:        fmt.Errorf("all the backends for the user %q are unavailable", ui.name()),
 		StatusCode: http.StatusServiceUnavailable,
@@ -192,12 +195,14 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
 	for _, h := range headers {
 		req.Header.Set(h.Name, h.Value)
 	}
+	rtb := req.Body.(*readTrackingBody)
 	transportOnce.Do(transportInit)
 	res, err := transport.RoundTrip(req)
 	if err != nil {
 		remoteAddr := httpserver.GetQuotedRemoteAddr(r)
-		requestURI := httpserver.GetRequestURI(r)
-		if r.Method == http.MethodPost || r.Method == http.MethodPut {
+		// NOTE: do not use httpserver.GetRequestURI
+		// it explicitly reads request body and fail retries
+		if (r.Method == http.MethodPost || r.Method == http.MethodPut) && !rtb.couldReuseBody() {
 			// It is impossible to retry POST and PUT requests,
 			// since we already proxied the request body to the backend.
 			err = &httpserver.ErrorWithStatusCode{
@@ -207,7 +212,7 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
 			httpserver.Errorf(w, r, "%s", err)
 			return true
 		}
-		logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying the request to %q: %s", remoteAddr, requestURI, targetURL, err)
+		logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying the request to %q: %s", remoteAddr, req.URL, targetURL, err)
 		return false
 	}
 	removeHopHeaders(res.Header)
@@ -218,7 +223,7 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
 	copyBuf.B = bytesutil.ResizeNoCopyNoOverallocate(copyBuf.B, 16*1024)
 	_, err = io.CopyBuffer(w, res.Body, copyBuf.B)
 	copyBufPool.Put(copyBuf)
-	_ = res.Body.Close()
+	rtb.mustClose()
 	if err != nil && !netutil.IsTrivialNetworkError(err) {
 		remoteAddr := httpserver.GetQuotedRemoteAddr(r)
 		requestURI := httpserver.GetRequestURI(r)
@@ -350,3 +355,43 @@ func handleConcurrencyLimitError(w http.ResponseWriter, r *http.Request, err err
 	}
 	httpserver.Errorf(w, r, "%s", err)
 }
+
+type readTrackingBody struct {
+	ioc       io.ReadCloser
+	wasClosed bool
+	wasRead   bool
+}
+
+// Read implements io.Reader interface
+// tracks body reading requests
+func (rtb *readTrackingBody) Read(p []byte) (int, error) {
+	if rtb.wasClosed {
+		return 0, io.EOF
+	}
+	rtb.wasRead = true
+	return rtb.ioc.Read(p)
+}
+
+// Close implements interface.
+// Closes body only if Read call was performed.
+// http.Roundtrip performs body.Close call even without any Read calls
+// so this hack allows us to reuse request body
+func (rtb *readTrackingBody) Close() error {
+	if rtb.wasRead {
+		err := rtb.ioc.Close()
+		rtb.wasClosed = true
+		return err
+	}
+	return nil
+}
+
+// body could be reused only if read operation wasn't called
+func (rtb *readTrackingBody) couldReuseBody() bool {
+	return !rtb.wasRead
+}
+
+// mustClose explicitly closes body
+func (rtb *readTrackingBody) mustClose() {
+	rtb.wasClosed = true
+	rtb.ioc.Close()
+}
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 28bf3c714..1843ed5ab 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -46,6 +46,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
 * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to filter incoming requests by IP. See [these docs](https://docs.victoriametrics.com/vmauth.html#ip-filters) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3491).
 * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to proxy requests to the specified backends for unauthorized users. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4083).
 * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to specify default route for unmatched requests. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4084).
+* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to retry requests for common tcp dial error for POST and PUT requests. Previously load-balancing algorythm didn't retry such errors.
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to compare the data for the previous day with the data for the current day at [Cardinality Explorer](https://docs.victoriametrics.com/#cardinality-explorer). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3967).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): display histograms as heatmaps in [Metrics explorer](https://docs.victoriametrics.com/#metrics-explorer). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4111).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add `WITH template` playground. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3811).