lib/promscrape: retry http requests if the server returns 429 status code

The 429 status code means that the server is overwhelmed with requests.
The client can retry the request after some wait time.
Implement this strategy for service discovery and scrape requests.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2940
This commit is contained in:
Aliaksandr Valialkin 2022-08-16 14:52:38 +03:00
parent 1a363192ff
commit aa37e6b438
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 20 additions and 31 deletions

View file

@ -25,6 +25,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for MX record types in [dns_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dns_sd_configs) in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/10099).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `__meta_kubernetes_service_port_number` meta-label for `role: service` in [kubernetes_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/11002).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `__meta_kubernetes_pod_container_image` meta-label for `role: pod` in [kubernetes_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/11034).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): retry HTTP requests after some wait time during service discovery and during target scrapes if the server returns 429 HTTP status code (aka `Too many requests`). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2940).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add a legend in the top right corner for shortcut keys. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2813).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `toTime()` template function in the same way as Prometheus 2.38 [does](https://github.com/prometheus/prometheus/pull/10993). See [these docs](https://prometheus.io/docs/prometheus/latest/configuration/template_reference/#numbers).

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics"
@ -189,6 +190,7 @@ func (c *client) GetStreamReader() (*streamReader, error) {
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
c.setHeaders(req)
c.setProxyHeaders(req)
scrapeRequests.Inc()
resp, err := c.sc.Do(req)
if err != nil {
cancel()
@ -327,33 +329,12 @@ var (
scrapesOK = metrics.NewCounter(`vm_promscrape_scrapes_total{status_code="200"}`)
scrapesGunzipped = metrics.NewCounter(`vm_promscrape_scrapes_gunziped_total`)
scrapesGunzipFailed = metrics.NewCounter(`vm_promscrape_scrapes_gunzip_failed_total`)
scrapeRequests = metrics.NewCounter(`vm_promscrape_scrape_requests_total`)
scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`)
)
func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
sleepTime := time.Second
for {
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline
// across multiple retries.
err := hc.DoDeadline(req, resp, deadline)
if err == nil {
return nil
}
if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
return err
}
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
maxSleepTime := time.Until(deadline)
if sleepTime > maxSleepTime {
return fmt.Errorf("the server closes all the connection attempts: %w", err)
}
sleepTime += sleepTime
if sleepTime > maxSleepTime {
sleepTime = maxSleepTime
}
time.Sleep(sleepTime)
scrapeRetries.Inc()
}
return discoveryutils.DoRequestWithPossibleRetry(hc, req, resp, deadline, scrapeRequests, scrapeRetries)
}
type streamReader struct {

View file

@ -240,20 +240,23 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *fasthttp.HostClient,
return data, nil
}
func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
// DoRequestWithPossibleRetry performs the given req at hc and stores the response at resp.
func DoRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time, requestCounter, retryCounter *metrics.Counter) error {
sleepTime := time.Second
discoveryRequests.Inc()
requestCounter.Inc()
for {
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline
// across multiple retries.
err := hc.DoDeadline(req, resp, deadline)
if err == nil {
return nil
}
if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
statusCode := resp.StatusCode()
if statusCode != fasthttp.StatusTooManyRequests {
return nil
}
} else if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
return err
}
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
// Retry request after exponentially increased sleep.
maxSleepTime := time.Until(deadline)
if sleepTime > maxSleepTime {
return fmt.Errorf("the server closes all the connection attempts: %w", err)
@ -263,11 +266,15 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request,
sleepTime = maxSleepTime
}
time.Sleep(sleepTime)
discoveryRetries.Inc()
retryCounter.Inc()
}
}
func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error {
return DoRequestWithPossibleRetry(hc, req, resp, deadline, discoveryRequests, discoveryRetries)
}
var (
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
)