From 70f8911ca7f03f869ee7f67a5a653ba78f8601ec Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 9 Feb 2023 21:05:13 -0800 Subject: [PATCH] app/vmauth: automatically retry failing GET requests on the remaining backends --- app/vmauth/main.go | 51 ++++++++++++++++++++++++++--------- app/vmauth/target_url.go | 34 ++++++++++++++--------- app/vmauth/target_url_test.go | 13 +++++---- docs/CHANGELOG.md | 1 + 4 files changed, 68 insertions(+), 31 deletions(-) diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 634283627..e9997a859 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -107,11 +107,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true } ui.requests.Inc() - targetURL, headers, err := createTargetURL(ui, r.URL) - if err != nil { - httpserver.Errorf(w, r, "cannot determine targetURL: %s", err) - return true - } // Limit the concurrency of requests to backends concurrencyLimitOnce.Do(concurrencyLimitInit) @@ -128,13 +123,34 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { handleConcurrencyLimitError(w, r, err) return true } - processRequest(w, r, targetURL, headers) + processRequest(w, r, ui) ui.endConcurrencyLimit() <-concurrencyLimitCh return true } -func processRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL, headers []Header) { +func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) { + u := normalizeURL(r.URL) + up, headers, err := ui.getURLPrefix(u) + if err != nil { + httpserver.Errorf(w, r, "cannot determine targetURL: %s", err) + return + } + maxAttempts := up.getBackendsCount() + for i := 0; i < maxAttempts; i++ { + targetURL := up.mergeURLs(u) + if tryProcessingRequest(w, r, targetURL, headers) { + return + } + } + err = &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("all the backends for the user %q are unavailable", ui.name()), + StatusCode: http.StatusServiceUnavailable, + } + httpserver.Errorf(w, r, "%s", err) +} + +func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL, headers []Header) bool { // This code has been copied from net/http/httputil/reverseproxy.go req := sanitizeRequestHeaders(r) req.URL = targetURL @@ -144,12 +160,20 @@ func processRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL, transportOnce.Do(transportInit) res, err := transport.RoundTrip(req) if err != nil { - err = &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("error when proxying the request to %q: %s", targetURL, err), - StatusCode: http.StatusBadGateway, + remoteAddr := httpserver.GetQuotedRemoteAddr(r) + requestURI := httpserver.GetRequestURI(r) + if r.Method == "POST" || r.Method == "PUT" { + // It is impossible to retry POST and PUT requests, + // since we already proxied the request body to the backend. + err = &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("cannot proxy the request to %q: %w", targetURL, err), + StatusCode: http.StatusServiceUnavailable, + } + httpserver.Errorf(w, r, "%s", err) + return true } - httpserver.Errorf(w, r, "%s", err) - return + logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying the request to %q: %s", remoteAddr, requestURI, targetURL, err) + return false } removeHopHeaders(res.Header) copyHeader(w.Header(), res.Header) @@ -164,8 +188,9 @@ func processRequest(w http.ResponseWriter, r *http.Request, targetURL *url.URL, remoteAddr := httpserver.GetQuotedRemoteAddr(r) requestURI := httpserver.GetRequestURI(r) logger.Warnf("remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s", remoteAddr, requestURI, targetURL, err) - return + return true } + return true } var copyBufPool bytesutil.ByteBufferPool diff --git a/app/vmauth/target_url.go b/app/vmauth/target_url.go index 5f3d0df63..a81d1d7df 100644 --- a/app/vmauth/target_url.go +++ b/app/vmauth/target_url.go @@ -12,6 +12,10 @@ func (up *URLPrefix) mergeURLs(requestURI *url.URL) *url.URL { return mergeURLs(pu, requestURI) } +func (up *URLPrefix) getBackendsCount() int { + return len(up.urls) +} + func mergeURLs(uiURL, requestURI *url.URL) *url.URL { targetURL := *uiURL targetURL.Path += requestURI.Path @@ -35,7 +39,22 @@ func mergeURLs(uiURL, requestURI *url.URL) *url.URL { return &targetURL } -func createTargetURL(ui *UserInfo, uOrig *url.URL) (*url.URL, []Header, error) { +func (ui *UserInfo) getURLPrefix(u *url.URL) (*URLPrefix, []Header, error) { + for _, e := range ui.URLMaps { + for _, sp := range e.SrcPaths { + if sp.match(u.Path) { + return e.URLPrefix, e.Headers, nil + } + } + } + if ui.URLPrefix != nil { + return ui.URLPrefix, ui.Headers, nil + } + missingRouteRequests.Inc() + return nil, nil, fmt.Errorf("missing route for %q", u.String()) +} + +func normalizeURL(uOrig *url.URL) *url.URL { u := *uOrig // Prevent from attacks with using `..` in r.URL.Path u.Path = path.Clean(u.Path) @@ -52,16 +71,5 @@ func createTargetURL(ui *UserInfo, uOrig *url.URL) (*url.URL, []Header, error) { // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1554 u.Path = "" } - for _, e := range ui.URLMaps { - for _, sp := range e.SrcPaths { - if sp.match(u.Path) { - return e.URLPrefix.mergeURLs(&u), e.Headers, nil - } - } - } - if ui.URLPrefix != nil { - return ui.URLPrefix.mergeURLs(&u), ui.Headers, nil - } - missingRouteRequests.Inc() - return nil, nil, fmt.Errorf("missing route for %q", u.String()) + return &u } diff --git a/app/vmauth/target_url_test.go b/app/vmauth/target_url_test.go index a942dd8ac..1590bb272 100644 --- a/app/vmauth/target_url_test.go +++ b/app/vmauth/target_url_test.go @@ -13,10 +13,12 @@ func TestCreateTargetURLSuccess(t *testing.T) { if err != nil { t.Fatalf("cannot parse %q: %s", requestURI, err) } - target, headers, err := createTargetURL(ui, u) + u = normalizeURL(u) + up, headers, err := ui.getURLPrefix(u) if err != nil { t.Fatalf("unexpected error: %s", err) } + target := up.mergeURLs(u) if target.String() != expectedTarget { t.Fatalf("unexpected target; got %q; want %q", target, expectedTarget) } @@ -119,15 +121,16 @@ func TestCreateTargetURLFailure(t *testing.T) { if err != nil { t.Fatalf("cannot parse %q: %s", requestURI, err) } - target, headers, err := createTargetURL(ui, u) + u = normalizeURL(u) + up, headers, err := ui.getURLPrefix(u) if err == nil { t.Fatalf("expecting non-nil error") } - if target != nil { - t.Fatalf("unexpected target=%q; want empty string", target) + if up != nil { + t.Fatalf("unexpected non-empty up=%q", up) } if headers != nil { - t.Fatalf("unexpected headers=%q; want empty string", headers) + t.Fatalf("unexpected non-empty headers=%q", headers) } } f(&UserInfo{}, "/foo/bar") diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7f9bfe66a..773e5813f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -16,6 +16,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add the ability to limit the number of concurrent requests on a per-user basis via `max_concurrent_requests` option. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346) and [these docs](https://docs.victoriametrics.com/vmauth.html#auth-config). +* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): automatically retry failing GET requests on all the configured backends. * FEATURE: [vmalert enterprise](https://docs.victoriametrics.com/vmalert.html): add ability to read alerting and recording rules from S3, GCS or S3-compatible object storage. See [these docs](https://docs.victoriametrics.com/vmalert.html#reading-rules-from-object-storage). ## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1)