diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 1f7d685a0..fa28dcb6e 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "errors" "flag" "fmt" "io" @@ -41,7 +43,9 @@ var ( reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...") logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+ `Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`) - failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend.") + failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend") + maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+ + "Bigger values may require more memory") ) func main() { @@ -162,11 +166,12 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) { up, headersConf = ui.DefaultURL, ui.HeadersConf isDefault = true } - r.Body = &readTrackingBody{ - r: r.Body, - } - maxAttempts := up.getBackendsCount() + if maxAttempts > 1 { + r.Body = &readTrackingBody{ + r: r.Body, + } + } for i := 0; i < maxAttempts; i++ { bu := up.getLeastLoadedBackendURL() targetURL := bu.url @@ -199,11 +204,17 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url updateHeadersByConfig(req.Header, headersConf.RequestHeaders) transportOnce.Do(transportInit) res, err := transport.RoundTrip(req) + rtb, rtbOK := req.Body.(*readTrackingBody) if err != nil { - rtb := req.Body.(*readTrackingBody) - if rtb.readStarted { - // Request body has been already read, so it is impossible to retry the request. - // Return the error to the client then. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // Do not retry canceled or timed out requests + remoteAddr := httpserver.GetQuotedRemoteAddr(r) + requestURI := httpserver.GetRequestURI(r) + logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s", remoteAddr, requestURI, targetURL, err) + return true + } + if !rtbOK || !rtb.canRetry() { + // Request body cannot be re-sent to another backend. Return the error to the client then. err = &httpserver.ErrorWithStatusCode{ Err: fmt.Errorf("cannot proxy the request to %q: %w", targetURL, err), StatusCode: http.StatusServiceUnavailable, @@ -214,8 +225,18 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url // Retry the request if its body wasn't read yet. This usually means that the backend isn't reachable. remoteAddr := httpserver.GetQuotedRemoteAddr(r) // NOTE: do not use httpserver.GetRequestURI - // it explicitly reads request body and fails retries. - logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying the request to %q: %s", remoteAddr, req.URL, targetURL, err) + // it explicitly reads request body, which may fail retries. + logger.Warnf("remoteAddr: %s; requestURI: %s; retrying the request to %s because of error: %s", remoteAddr, req.URL, targetURL, err) + return false + } + if res.StatusCode/100 >= 5 && (rtbOK && rtb.canRetry()) { + // Retry requests at other backends on 5xx status codes. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4893 + remoteAddr := httpserver.GetQuotedRemoteAddr(r) + // NOTE: do not use httpserver.GetRequestURI + // it explicitly reads request body, which may fail retries. + logger.Warnf("remoteAddr: %s; requestURI: %s; retrying the request to %s because of unexpected status code: %d; must be smaller than 500", + remoteAddr, req.URL, targetURL, res.StatusCode) return false } removeHopHeaders(res.Header) @@ -370,26 +391,86 @@ func handleConcurrencyLimitError(w http.ResponseWriter, r *http.Request, err err } type readTrackingBody struct { - r io.ReadCloser - readStarted bool + // r contains reader for initial data reading + r io.ReadCloser + + // buf is a buffer for data read from r. Buf size is limited by maxRequestBodySizeToRetry. + // If more than maxRequestBodySizeToRetry is read from r, then cannotRetry is set to true. + buf []byte + + // cannotRetry is set to true when more than maxRequestBodySizeToRetry are read from r. + // In this case the read data cannot fit buf, so it cannot be re-read from buf. + cannotRetry bool + + // bufComplete is set to true when buf contains complete request body read from r. + bufComplete bool + + // needReadBuf is set to true when Read() must be performed from buf instead of r. + needReadBuf bool + + // offset is an offset at buf for the next data read if needReadBuf is set to true. + offset int } // Read implements io.Reader interface // tracks body reading requests func (rtb *readTrackingBody) Read(p []byte) (int, error) { - if len(p) > 0 { - rtb.readStarted = true + if rtb.needReadBuf { + if rtb.offset >= len(rtb.buf) { + return 0, io.EOF + } + n := copy(p, rtb.buf[rtb.offset:]) + rtb.offset += n + return n, nil } - return rtb.r.Read(p) + + if rtb.r == nil { + return 0, fmt.Errorf("cannot read data after closing the reader") + } + + n, err := rtb.r.Read(p) + if rtb.cannotRetry { + return n, err + } + if len(rtb.buf)+n > maxRequestBodySizeToRetry.IntN() { + rtb.cannotRetry = true + return n, err + } + rtb.buf = append(rtb.buf, p[:n]...) + if err == io.EOF { + rtb.bufComplete = true + } + return n, err +} + +func (rtb *readTrackingBody) canRetry() bool { + if rtb.cannotRetry { + return false + } + if len(rtb.buf) > 0 && !rtb.needReadBuf { + return false + } + return true } // Close implements io.Closer interface. func (rtb *readTrackingBody) Close() error { - // Close rtb.r only if at least a single Read call was performed. - // http.Roundtrip performs body.Close call even without any Read calls - // so this hack allows us to reuse request body - if rtb.readStarted { - return rtb.r.Close() + rtb.offset = 0 + if rtb.bufComplete { + rtb.needReadBuf = true } + + // Close rtb.r only if the request body is completely read or if it is too big. + // http.Roundtrip performs body.Close call even without any Read calls, + // so this hack allows us to reuse request body. + if rtb.bufComplete || rtb.cannotRetry { + if rtb.r == nil { + return nil + } + err := rtb.r.Close() + rtb.r = nil + return err + } + return nil } diff --git a/app/vmauth/main_test.go b/app/vmauth/main_test.go new file mode 100644 index 000000000..3e7b6d6d3 --- /dev/null +++ b/app/vmauth/main_test.go @@ -0,0 +1,90 @@ +package main + +import ( + "bytes" + "io" + "testing" +) + +func TestReadTrackingBodyRetrySuccess(t *testing.T) { + f := func(s string) { + t.Helper() + rtb := &readTrackingBody{ + r: io.NopCloser(bytes.NewBufferString(s)), + } + if !rtb.canRetry() { + t.Fatalf("canRetry() must return true") + } + for i := 0; i < 5; i++ { + data, err := io.ReadAll(rtb) + if err != nil { + t.Fatalf("unexpected error when reading all the data at iteration %d: %s", i, err) + } + if string(data) != s { + t.Fatalf("unexpected data read at iteration %d\ngot\n%s\nwant\n%s", i, data, s) + } + if err := rtb.Close(); err != nil { + t.Fatalf("unexpected error when closing readTrackingBody at iteration %d: %s", i, err) + } + if !rtb.canRetry() { + t.Fatalf("canRetry() must return true at iteration %d", i) + } + } + } + + f("") + f("foo") + f("foobar") + f(newTestString(maxRequestBodySizeToRetry.IntN())) +} + +func TestReadTrackingBodyRetryFailure(t *testing.T) { + f := func(s string) { + t.Helper() + rtb := &readTrackingBody{ + r: io.NopCloser(bytes.NewBufferString(s)), + } + if !rtb.canRetry() { + t.Fatalf("canRetry() must return true") + } + buf := make([]byte, 1) + n, err := rtb.Read(buf) + if err != nil { + t.Fatalf("unexpected error when reading a single byte: %s", err) + } + if n != 1 { + t.Fatalf("unexpected number of bytes read; got %d; want 1", n) + } + if rtb.canRetry() { + t.Fatalf("canRetry() must return false") + } + data, err := io.ReadAll(rtb) + if err != nil { + t.Fatalf("unexpected error when reading all the data: %s", err) + } + if string(buf)+string(data) != s { + t.Fatalf("unexpected data read\ngot\n%s\nwant\n%s", string(buf)+string(data), s) + } + if err := rtb.Close(); err != nil { + t.Fatalf("unexpected error when closing readTrackingBody: %s", err) + } + if rtb.canRetry() { + t.Fatalf("canRetry() must return false") + } + + data, err = io.ReadAll(rtb) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if len(data) != 0 { + t.Fatalf("unexpected non-empty data read: %q", data) + } + } + + f(newTestString(maxRequestBodySizeToRetry.IntN() + 1)) + f(newTestString(2 * maxRequestBodySizeToRetry.IntN())) +} + +func newTestString(sLen int) string { + return string(make([]byte, sLen)) +} diff --git a/app/vmauth/target_url.go b/app/vmauth/target_url.go index 365484d95..9dfe7b070 100644 --- a/app/vmauth/target_url.go +++ b/app/vmauth/target_url.go @@ -8,6 +8,9 @@ import ( func mergeURLs(uiURL, requestURI *url.URL) *url.URL { targetURL := *uiURL + if strings.HasPrefix(requestURI.Path, "/") { + targetURL.Path = strings.TrimSuffix(targetURL.Path, "/") + } targetURL.Path += requestURI.Path requestParams := requestURI.Query() // fast path diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6eeaf0533..289deb142 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -42,6 +42,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components * FEATURE: dashboards: provide copies of Grafana dashboards alternated with VictoriaMetrics datasource at [dashboards/vm](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/dashboards/vm). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): added ability to set, override and clear request and response headers on a per-user and per-path basis. See [this i ssue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4825) and [these docs](https://docs.victoriametrics.com/vmauth.html#auth-config) for details. +* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): automatically retry requests to the remaining backends if they return 5xx response codes and if the size of the request body doesn't exceed the value specified by `-maxRequestBodySizeToRetry` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4893). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `eval_offset` attribute for [Groups](https://docs.victoriametrics.com/vmalert.html#groups). If specified, Group will be evaluated at the exact time offset on the range of [0...evaluationInterval]. The setting might be useful for cron-like rules which must be evaluated at specific moments of time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409) for details. * FEATURE: limit the length of string params in log messages to 500 chars. Longer string params are replaced with the `first_250_chars..last_250_chars`. This prevents from too long log lines, which can be emitted by VictoriaMetrics components.