From f392913d0053a3d58a75614904a3d776d89ef1e7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 5 Jan 2023 21:13:02 -0800 Subject: [PATCH] lib/promscrape: follow-up after bced9fb9784dbe807dcf4c72c519a5a6f9c83a05 - Document the bugfix at docs/CHANGELOG.md - Wait until all the worker goroutines are done in consulWatcher.mustStop() - Do not log `context canceled` errors when discovering consul serviceNames - Removed explicit handling of gzipped responses at lib/promscrape/discoveryutils.Client, since this handling is automatically performed by net/http.Transport. See DisableCompression option at https://pkg.go.dev/net/http#Transport . - Remove explicit handling of the proxyURL, since it is automatically handled by net/http.Transport. See Proxy option at https://pkg.go.dev/net/http#Transport . - Expliticly set MaxIdleConnsPerHost, since its default value equals to 2. Such a small value may result in excess tcp connection churn when more than 2 concurrent requests are processed by lib/promscrape/discoveryutils.Client. - Do not set explicitly the `Host` request header, since it is automatically set by net/http.Client. - Backport the bugfix to the recently added nomad_sd_configs - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468 --- docs/CHANGELOG.md | 1 + lib/promscrape/discovery/consul/watch.go | 39 +++--- lib/promscrape/discovery/nomad/api.go | 8 +- lib/promscrape/discovery/nomad/watch.go | 44 +++--- lib/promscrape/discoveryutils/client.go | 163 +++++++++-------------- 5 files changed, 120 insertions(+), 135 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index fbb7e35149..f123dd2bbb 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -31,6 +31,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly parse durations with uppercase suffixes such as `10S`, `5MS`, `1W`, etc. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix a panic during target discovery when `vmagent` runs with `-promscrape.dropOriginalLabels` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3580). The bug has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): properly encode `filters` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix possible resource leak after hot reload of the updated [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468). ## [v1.85.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.3) diff --git a/lib/promscrape/discovery/consul/watch.go b/lib/promscrape/discovery/consul/watch.go index ff47d275e4..76579b9b03 100644 --- a/lib/promscrape/discovery/consul/watch.go +++ b/lib/promscrape/discovery/consul/watch.go @@ -34,8 +34,9 @@ type consulWatcher struct { servicesLock sync.Mutex services map[string]*serviceWatcher - wg sync.WaitGroup - stopCh chan struct{} + servicesWG sync.WaitGroup + wg sync.WaitGroup + stopCh chan struct{} } type serviceWatcher struct { @@ -73,7 +74,11 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, stopCh: make(chan struct{}), } initCh := make(chan struct{}) - go cw.watchForServicesUpdates(initCh) + cw.wg.Add(1) + go func() { + cw.watchForServicesUpdates(initCh) + cw.wg.Done() + }() // wait for initialization to complete <-initCh return cw @@ -82,6 +87,7 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, func (cw *consulWatcher) mustStop() { close(cw.stopCh) cw.client.Stop() + cw.wg.Wait() } func (cw *consulWatcher) updateServices(serviceNames []string) { @@ -98,14 +104,14 @@ func (cw *consulWatcher) updateServices(serviceNames []string) { stopCh: make(chan struct{}), } cw.services[serviceName] = sw - cw.wg.Add(1) + cw.servicesWG.Add(1) serviceWatchersCreated.Inc() initWG.Add(1) go func() { serviceWatchersCount.Inc() sw.watchForServiceNodesUpdates(cw, &initWG) serviceWatchersCount.Dec() - cw.wg.Done() + cw.servicesWG.Done() }() } @@ -136,11 +142,13 @@ func (cw *consulWatcher) updateServices(serviceNames []string) { // watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done. func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) { index := int64(0) - clientAddr := cw.client.Addr() + apiServer := cw.client.APIServer() f := func() { serviceNames, newIndex, err := cw.getBlockingServiceNames(index) if err != nil { - logger.Errorf("cannot obtain Consul serviceNames from %q: %s", clientAddr, err) + if !errors.Is(err, context.Canceled) { + logger.Errorf("cannot obtain Consul serviceNames from %q: %s", apiServer, err) + } return } if index == newIndex { @@ -151,7 +159,7 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) { index = newIndex } - logger.Infof("started Consul service watcher for %q", clientAddr) + logger.Infof("started Consul service watcher for %q", apiServer) f() // send signal that initialization is complete @@ -165,15 +173,15 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) { case <-ticker.C: f() case <-cw.stopCh: - logger.Infof("stopping Consul service watchers for %q", clientAddr) + logger.Infof("stopping Consul service watchers for %q", apiServer) startTime := time.Now() cw.servicesLock.Lock() for _, sw := range cw.services { close(sw.stopCh) } cw.servicesLock.Unlock() - cw.wg.Wait() - logger.Infof("stopped Consul service watcher for %q in %.3f seconds", clientAddr, time.Since(startTime).Seconds()) + cw.servicesWG.Wait() + logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds()) return } } @@ -219,16 +227,15 @@ func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64, // // watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done. func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG *sync.WaitGroup) { - clientAddr := cw.client.Addr() + apiServer := cw.client.APIServer() index := int64(0) path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs f := func() { data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) if err != nil { - if errors.Is(err, context.Canceled) { - return + if !errors.Is(err, context.Canceled) { + logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err) } - logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err) return } if index == newIndex { @@ -237,7 +244,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG } sns, err := parseServiceNodes(data) if err != nil { - logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err) + logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err) return } diff --git a/lib/promscrape/discovery/nomad/api.go b/lib/promscrape/discovery/nomad/api.go index b7a016512c..b93029deb3 100644 --- a/lib/promscrape/discovery/nomad/api.go +++ b/lib/promscrape/discovery/nomad/api.go @@ -3,6 +3,7 @@ package nomad import ( "flag" "fmt" + "net/http" "os" "strconv" "strings" @@ -11,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" - "github.com/VictoriaMetrics/fasthttp" ) var waitTime = flag.Duration("promscrape.nomad.waitTime", 0, "Wait time used by Nomad service discovery. Default value is used if not set") @@ -138,13 +138,13 @@ func maxWaitTime() time.Duration { func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { path += "&index=" + strconv.FormatInt(index, 10) path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds())) - getMeta := func(resp *fasthttp.Response) { - ind := resp.Header.Peek("X-Nomad-Index") + getMeta := func(resp *http.Response) { + ind := resp.Header.Get("X-Nomad-Index") if len(ind) == 0 { logger.Errorf("cannot find X-Nomad-Index header in response from %q", path) return } - newIndex, err := strconv.ParseInt(string(ind), 10, 64) + newIndex, err := strconv.ParseInt(ind, 10, 64) if err != nil { logger.Errorf("cannot parse X-Nomad-Index header value in response from %q: %s", path, err) return diff --git a/lib/promscrape/discovery/nomad/watch.go b/lib/promscrape/discovery/nomad/watch.go index 2598c907fa..daa7739488 100644 --- a/lib/promscrape/discovery/nomad/watch.go +++ b/lib/promscrape/discovery/nomad/watch.go @@ -1,7 +1,9 @@ package nomad import ( + "context" "encoding/json" + "errors" "flag" "fmt" "net/url" @@ -31,8 +33,9 @@ type nomadWatcher struct { servicesLock sync.Mutex services map[string]*serviceWatcher - wg sync.WaitGroup - stopCh chan struct{} + servicesWG sync.WaitGroup + wg sync.WaitGroup + stopCh chan struct{} } type serviceWatcher struct { @@ -61,7 +64,11 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n stopCh: make(chan struct{}), } initCh := make(chan struct{}) - go cw.watchForServicesUpdates(initCh) + cw.wg.Add(1) + go func() { + cw.watchForServicesUpdates(initCh) + cw.wg.Done() + }() // wait for initialization to complete <-initCh return cw @@ -69,9 +76,8 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n func (cw *nomadWatcher) mustStop() { close(cw.stopCh) - // Do not wait for the watcher to stop, since it may take - // up to discoveryutils.BlockingClientReadTimeout to complete. - // TODO: add ability to cancel blocking requests. + cw.client.Stop() + cw.wg.Wait() } func (cw *nomadWatcher) updateServices(serviceNames []string) { @@ -88,14 +94,14 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) { stopCh: make(chan struct{}), } cw.services[serviceName] = sw - cw.wg.Add(1) + cw.servicesWG.Add(1) serviceWatchersCreated.Inc() initWG.Add(1) go func() { serviceWatchersCount.Inc() sw.watchForServiceAddressUpdates(cw, &initWG) serviceWatchersCount.Dec() - cw.wg.Done() + cw.servicesWG.Done() }() } @@ -126,11 +132,13 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) { // watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done. func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) { index := int64(0) - clientAddr := cw.client.Addr() + apiServer := cw.client.APIServer() f := func() { serviceNames, newIndex, err := cw.getBlockingServiceNames(index) if err != nil { - logger.Errorf("cannot obtain Nomad serviceNames from %q: %s", clientAddr, err) + if !errors.Is(err, context.Canceled) { + logger.Errorf("cannot obtain Nomad serviceNames from %q: %s", apiServer, err) + } return } if index == newIndex { @@ -141,7 +149,7 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) { index = newIndex } - logger.Infof("started Nomad service watcher for %q", clientAddr) + logger.Infof("started Nomad service watcher for %q", apiServer) f() // send signal that initialization is complete @@ -155,15 +163,15 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) { case <-ticker.C: f() case <-cw.stopCh: - logger.Infof("stopping Nomad service watchers for %q", clientAddr) + logger.Infof("stopping Nomad service watchers for %q", apiServer) startTime := time.Now() cw.servicesLock.Lock() for _, sw := range cw.services { close(sw.stopCh) } cw.servicesLock.Unlock() - cw.wg.Wait() - logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", clientAddr, time.Since(startTime).Seconds()) + cw.servicesWG.Wait() + logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds()) return } } @@ -225,14 +233,16 @@ func (cw *nomadWatcher) getServiceSnapshot() map[string][]Service { // // watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done. func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG *sync.WaitGroup) { - clientAddr := nw.client.Addr() + apiServer := nw.client.APIServer() index := int64(0) // TODO: Maybe use a different query arg. path := "/v1/service/" + sw.serviceName + nw.serviceNamesQueryArgs f := func() { data, newIndex, err := getBlockingAPIResponse(nw.client, path, index) if err != nil { - logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err) + if !errors.Is(err, context.Canceled) { + logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, apiServer, err) + } return } if index == newIndex { @@ -241,7 +251,7 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG } sns, err := parseServices(data) if err != nil { - logger.Errorf("cannot parse Nomad services response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err) + logger.Errorf("cannot parse Nomad services response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err) return } diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 2de07369e7..74bab4f3ca 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -1,7 +1,6 @@ package discoveryutils import ( - "compress/gzip" "context" "crypto/tls" "flag" @@ -55,7 +54,7 @@ func GetHTTPClient() *http.Client { return defaultClient } -// Client is http client, which talks to the given apiServer. +// Client is http client, which talks to the given apiServer passed to NewClient(). type Client struct { // client is used for short requests. client *HTTPClient @@ -65,8 +64,6 @@ type Client struct { apiServer string - dialAddr string - setHTTPHeaders func(req *http.Request) setHTTPProxyHeaders func(req *http.Request) @@ -81,110 +78,96 @@ type HTTPClient struct { WriteTimeout time.Duration } -func addMissingPort(addr string, isTLS bool) string { - if strings.Contains(addr, ":") { - return addr - } - if isTLS { - return addr + ":443" - } - return addr + ":80" -} +var defaultDialer = &net.Dialer{} // NewClient returns new Client for the given args. func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxyAC *promauth.Config) (*Client, error) { u, err := url.Parse(apiServer) if err != nil { - return nil, fmt.Errorf("cannot parse provided url %q: %w", apiServer, err) + return nil, fmt.Errorf("cannot parse apiServer=%q: %w", apiServer, err) } - // special case for unix socket connection - var dialFunc func(addr string) (net.Conn, error) - if string(u.Scheme) == "unix" { + dialFunc := defaultDialer.DialContext + if u.Scheme == "unix" { + // special case for unix socket connection dialAddr := u.Path - apiServer = "http://" - dialFunc = func(_ string) (net.Conn, error) { - return net.Dial("unix", dialAddr) + apiServer = "http://unix" + dialFunc = func(ctx context.Context, _, _ string) (net.Conn, error) { + return defaultDialer.DialContext(ctx, "unix", dialAddr) } } - dialAddr := u.Host - isTLS := string(u.Scheme) == "https" + isTLS := u.Scheme == "https" var tlsCfg *tls.Config if isTLS { tlsCfg = ac.NewTLSConfig() } - setHTTPProxyHeaders := func(req *http.Request) {} - - dialAddr = addMissingPort(dialAddr, isTLS) - if dialFunc == nil { - var err error - dialFunc, err = proxyURL.NewDialFunc(proxyAC) - if err != nil { - return nil, err - } - if proxyAC != nil { - setHTTPProxyHeaders = func(req *http.Request) { - proxyURL.SetHeaders(proxyAC, req) - } - } + var proxyURLFunc func(*http.Request) (*url.URL, error) + if pu := proxyURL.GetURL(); pu != nil { + proxyURLFunc = http.ProxyURL(pu) } - hcTransport := &http.Transport{ - TLSClientConfig: tlsCfg, - MaxConnsPerHost: 2 * *maxConcurrency, - ResponseHeaderTimeout: *maxWaitTime, - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return dialFunc(dialAddr) - }, - } - - hc := &http.Client{ - Timeout: DefaultClientReadTimeout, - Transport: hcTransport, - } - - blockingTransport := &http.Transport{ - TLSClientConfig: tlsCfg, - MaxConnsPerHost: 64 * 1024, - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return dialFunc(dialAddr) + client := &http.Client{ + Timeout: DefaultClientReadTimeout, + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: proxyURLFunc, + TLSHandshakeTimeout: 10 * time.Second, + MaxIdleConnsPerHost: *maxConcurrency, + ResponseHeaderTimeout: DefaultClientReadTimeout, + DialContext: dialFunc, }, } blockingClient := &http.Client{ - Timeout: BlockingClientReadTimeout, - Transport: blockingTransport, + Timeout: BlockingClientReadTimeout, + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: proxyURLFunc, + TLSHandshakeTimeout: 10 * time.Second, + MaxIdleConnsPerHost: 1000, + ResponseHeaderTimeout: BlockingClientReadTimeout, + DialContext: dialFunc, + }, } setHTTPHeaders := func(req *http.Request) {} if ac != nil { - setHTTPHeaders = func(req *http.Request) { ac.SetHeaders(req, true) } + setHTTPHeaders = func(req *http.Request) { + ac.SetHeaders(req, true) + } + } + setHTTPProxyHeaders := func(req *http.Request) {} + if proxyAC != nil { + setHTTPProxyHeaders = func(req *http.Request) { + proxyURL.SetHeaders(proxyAC, req) + } } - ctx, cancel := context.WithCancel(context.Background()) - return &Client{ - client: &HTTPClient{client: hc, ReadTimeout: DefaultClientReadTimeout, WriteTimeout: DefaultClientWriteTimeout}, - blockingClient: &HTTPClient{client: blockingClient, ReadTimeout: BlockingClientReadTimeout, WriteTimeout: DefaultClientWriteTimeout}, + c := &Client{ + client: &HTTPClient{ + client: client, + ReadTimeout: DefaultClientReadTimeout, + WriteTimeout: DefaultClientWriteTimeout, + }, + blockingClient: &HTTPClient{ + client: blockingClient, + ReadTimeout: BlockingClientReadTimeout, + WriteTimeout: DefaultClientWriteTimeout, + }, apiServer: apiServer, - dialAddr: dialAddr, setHTTPHeaders: setHTTPHeaders, setHTTPProxyHeaders: setHTTPProxyHeaders, clientCtx: ctx, clientCancel: cancel, - }, nil -} - -// Addr returns the address the client connects to. -func (c *Client) Addr() string { - return c.dialAddr + } + return c, nil } // GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request. -// modifyRequestParams should never reference data from request. -func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequestParams func(request *http.Request)) ([]byte, error) { - return c.getAPIResponse(path, modifyRequestParams) +func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest func(request *http.Request)) ([]byte, error) { + return c.getAPIResponse(path, modifyRequest) } // GetAPIResponse returns response for the given absolute path. @@ -205,12 +188,13 @@ func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Re return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d", c.apiServer, *maxWaitTime, *maxConcurrency) } - defer func() { <-concurrencyLimitCh }() + defer func() { + <-concurrencyLimitCh + }() return c.getAPIResponseWithParamsAndClient(c.client, path, modifyRequest, nil) } // GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response, -// inspectResponse - should never reference data from response. func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *http.Response)) ([]byte, error) { return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, nil, inspectResponse) } @@ -222,7 +206,6 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri if err != nil { return nil, fmt.Errorf("cannot parse %q: %w", requestURL, err) } - u.Host = c.dialAddr deadline := time.Now().Add(client.WriteTimeout) ctx, cancel := context.WithDeadline(c.clientCtx, deadline) @@ -232,8 +215,6 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err) } - req.Header.Set("Host", c.dialAddr) - req.Header.Set("Accept-Encoding", "gzip") c.setHTTPHeaders(req) c.setHTTPProxyHeaders(req) if modifyRequest != nil { @@ -244,20 +225,11 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri if err != nil { return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err) } - - reader := resp.Body - if resp.Header.Get("Content-Encoding") == "gzip" { - reader, err = gzip.NewReader(resp.Body) - if err != nil { - return nil, fmt.Errorf("cannot create gzip reader for %q: %w", requestURL, err) - } - } - - data, err := io.ReadAll(reader) - if err != nil { - return nil, fmt.Errorf("cannot ungzip response from %q: %w", requestURL, err) - } + data, err := io.ReadAll(resp.Body) _ = resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("cannot read response from %q: %w", requestURL, err) + } if inspectResponse != nil { inspectResponse(resp) @@ -280,10 +252,9 @@ func (c *Client) Stop() { c.clientCancel() } -// DoRequestWithPossibleRetry performs the given req at client and stores the response at resp. -func DoRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, requestCounter, retryCounter *metrics.Counter) (*http.Response, error) { +func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) { sleepTime := time.Second - requestCounter.Inc() + discoveryRequests.Inc() deadline, ok := req.Context().Deadline() if !ok { deadline = time.Now().Add(hc.WriteTimeout) @@ -309,14 +280,10 @@ func DoRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, requestCounte sleepTime = maxSleepTime } time.Sleep(sleepTime) - retryCounter.Inc() + discoveryRetries.Inc() } } -func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) { - return DoRequestWithPossibleRetry(hc, req, discoveryRequests, discoveryRetries) -} - var ( discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`) discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)