lib/promscrape: follow-up after bced9fb978

- Document the bugfix at docs/CHANGELOG.md
- Wait until all the worker goroutines are done in consulWatcher.mustStop()
- Do not log `context canceled` errors when discovering consul serviceNames
- Removed explicit handling of gzipped responses at lib/promscrape/discoveryutils.Client,
  since this handling is automatically performed by net/http.Transport.
  See DisableCompression option at https://pkg.go.dev/net/http#Transport .
- Remove explicit handling of the proxyURL, since it is automatically handled
  by net/http.Transport. See Proxy option at https://pkg.go.dev/net/http#Transport .
- Expliticly set MaxIdleConnsPerHost, since its default value equals to 2.
  Such a small value may result in excess tcp connection churn
  when more than 2 concurrent requests are processed by lib/promscrape/discoveryutils.Client.
- Do not set explicitly the `Host` request header, since it is automatically set by net/http.Client.
- Backport the bugfix to the recently added nomad_sd_configs - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468
This commit is contained in:
Aliaksandr Valialkin 2023-01-05 21:13:02 -08:00
parent bced9fb978
commit f392913d00
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 120 additions and 135 deletions

View file

@ -31,6 +31,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly parse durations with uppercase suffixes such as `10S`, `5MS`, `1W`, etc. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix a panic during target discovery when `vmagent` runs with `-promscrape.dropOriginalLabels` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3580). The bug has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): properly encode `filters` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix possible resource leak after hot reload of the updated [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468).
## [v1.85.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.3)

View file

@ -34,8 +34,9 @@ type consulWatcher struct {
servicesLock sync.Mutex
services map[string]*serviceWatcher
wg sync.WaitGroup
stopCh chan struct{}
servicesWG sync.WaitGroup
wg sync.WaitGroup
stopCh chan struct{}
}
type serviceWatcher struct {
@ -73,7 +74,11 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
stopCh: make(chan struct{}),
}
initCh := make(chan struct{})
go cw.watchForServicesUpdates(initCh)
cw.wg.Add(1)
go func() {
cw.watchForServicesUpdates(initCh)
cw.wg.Done()
}()
// wait for initialization to complete
<-initCh
return cw
@ -82,6 +87,7 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
func (cw *consulWatcher) mustStop() {
close(cw.stopCh)
cw.client.Stop()
cw.wg.Wait()
}
func (cw *consulWatcher) updateServices(serviceNames []string) {
@ -98,14 +104,14 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
stopCh: make(chan struct{}),
}
cw.services[serviceName] = sw
cw.wg.Add(1)
cw.servicesWG.Add(1)
serviceWatchersCreated.Inc()
initWG.Add(1)
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceNodesUpdates(cw, &initWG)
serviceWatchersCount.Dec()
cw.wg.Done()
cw.servicesWG.Done()
}()
}
@ -136,11 +142,13 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
index := int64(0)
clientAddr := cw.client.Addr()
apiServer := cw.client.APIServer()
f := func() {
serviceNames, newIndex, err := cw.getBlockingServiceNames(index)
if err != nil {
logger.Errorf("cannot obtain Consul serviceNames from %q: %s", clientAddr, err)
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Consul serviceNames from %q: %s", apiServer, err)
}
return
}
if index == newIndex {
@ -151,7 +159,7 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
index = newIndex
}
logger.Infof("started Consul service watcher for %q", clientAddr)
logger.Infof("started Consul service watcher for %q", apiServer)
f()
// send signal that initialization is complete
@ -165,15 +173,15 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
case <-ticker.C:
f()
case <-cw.stopCh:
logger.Infof("stopping Consul service watchers for %q", clientAddr)
logger.Infof("stopping Consul service watchers for %q", apiServer)
startTime := time.Now()
cw.servicesLock.Lock()
for _, sw := range cw.services {
close(sw.stopCh)
}
cw.servicesLock.Unlock()
cw.wg.Wait()
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", clientAddr, time.Since(startTime).Seconds())
cw.servicesWG.Wait()
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return
}
}
@ -219,16 +227,15 @@ func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64,
//
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG *sync.WaitGroup) {
clientAddr := cw.client.Addr()
apiServer := cw.client.APIServer()
index := int64(0)
path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs
f := func() {
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
if err != nil {
if errors.Is(err, context.Canceled) {
return
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
}
logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
return
}
if index == newIndex {
@ -237,7 +244,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
}
sns, err := parseServiceNodes(data)
if err != nil {
logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
return
}

View file

@ -3,6 +3,7 @@ package nomad
import (
"flag"
"fmt"
"net/http"
"os"
"strconv"
"strings"
@ -11,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/fasthttp"
)
var waitTime = flag.Duration("promscrape.nomad.waitTime", 0, "Wait time used by Nomad service discovery. Default value is used if not set")
@ -138,13 +138,13 @@ func maxWaitTime() time.Duration {
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
path += "&index=" + strconv.FormatInt(index, 10)
path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds()))
getMeta := func(resp *fasthttp.Response) {
ind := resp.Header.Peek("X-Nomad-Index")
getMeta := func(resp *http.Response) {
ind := resp.Header.Get("X-Nomad-Index")
if len(ind) == 0 {
logger.Errorf("cannot find X-Nomad-Index header in response from %q", path)
return
}
newIndex, err := strconv.ParseInt(string(ind), 10, 64)
newIndex, err := strconv.ParseInt(ind, 10, 64)
if err != nil {
logger.Errorf("cannot parse X-Nomad-Index header value in response from %q: %s", path, err)
return

View file

@ -1,7 +1,9 @@
package nomad
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net/url"
@ -31,8 +33,9 @@ type nomadWatcher struct {
servicesLock sync.Mutex
services map[string]*serviceWatcher
wg sync.WaitGroup
stopCh chan struct{}
servicesWG sync.WaitGroup
wg sync.WaitGroup
stopCh chan struct{}
}
type serviceWatcher struct {
@ -61,7 +64,11 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
stopCh: make(chan struct{}),
}
initCh := make(chan struct{})
go cw.watchForServicesUpdates(initCh)
cw.wg.Add(1)
go func() {
cw.watchForServicesUpdates(initCh)
cw.wg.Done()
}()
// wait for initialization to complete
<-initCh
return cw
@ -69,9 +76,8 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
func (cw *nomadWatcher) mustStop() {
close(cw.stopCh)
// Do not wait for the watcher to stop, since it may take
// up to discoveryutils.BlockingClientReadTimeout to complete.
// TODO: add ability to cancel blocking requests.
cw.client.Stop()
cw.wg.Wait()
}
func (cw *nomadWatcher) updateServices(serviceNames []string) {
@ -88,14 +94,14 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
stopCh: make(chan struct{}),
}
cw.services[serviceName] = sw
cw.wg.Add(1)
cw.servicesWG.Add(1)
serviceWatchersCreated.Inc()
initWG.Add(1)
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceAddressUpdates(cw, &initWG)
serviceWatchersCount.Dec()
cw.wg.Done()
cw.servicesWG.Done()
}()
}
@ -126,11 +132,13 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
index := int64(0)
clientAddr := cw.client.Addr()
apiServer := cw.client.APIServer()
f := func() {
serviceNames, newIndex, err := cw.getBlockingServiceNames(index)
if err != nil {
logger.Errorf("cannot obtain Nomad serviceNames from %q: %s", clientAddr, err)
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Nomad serviceNames from %q: %s", apiServer, err)
}
return
}
if index == newIndex {
@ -141,7 +149,7 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
index = newIndex
}
logger.Infof("started Nomad service watcher for %q", clientAddr)
logger.Infof("started Nomad service watcher for %q", apiServer)
f()
// send signal that initialization is complete
@ -155,15 +163,15 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
case <-ticker.C:
f()
case <-cw.stopCh:
logger.Infof("stopping Nomad service watchers for %q", clientAddr)
logger.Infof("stopping Nomad service watchers for %q", apiServer)
startTime := time.Now()
cw.servicesLock.Lock()
for _, sw := range cw.services {
close(sw.stopCh)
}
cw.servicesLock.Unlock()
cw.wg.Wait()
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", clientAddr, time.Since(startTime).Seconds())
cw.servicesWG.Wait()
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return
}
}
@ -225,14 +233,16 @@ func (cw *nomadWatcher) getServiceSnapshot() map[string][]Service {
//
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG *sync.WaitGroup) {
clientAddr := nw.client.Addr()
apiServer := nw.client.APIServer()
index := int64(0)
// TODO: Maybe use a different query arg.
path := "/v1/service/" + sw.serviceName + nw.serviceNamesQueryArgs
f := func() {
data, newIndex, err := getBlockingAPIResponse(nw.client, path, index)
if err != nil {
logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
}
return
}
if index == newIndex {
@ -241,7 +251,7 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG
}
sns, err := parseServices(data)
if err != nil {
logger.Errorf("cannot parse Nomad services response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
logger.Errorf("cannot parse Nomad services response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
return
}

View file

@ -1,7 +1,6 @@
package discoveryutils
import (
"compress/gzip"
"context"
"crypto/tls"
"flag"
@ -55,7 +54,7 @@ func GetHTTPClient() *http.Client {
return defaultClient
}
// Client is http client, which talks to the given apiServer.
// Client is http client, which talks to the given apiServer passed to NewClient().
type Client struct {
// client is used for short requests.
client *HTTPClient
@ -65,8 +64,6 @@ type Client struct {
apiServer string
dialAddr string
setHTTPHeaders func(req *http.Request)
setHTTPProxyHeaders func(req *http.Request)
@ -81,110 +78,96 @@ type HTTPClient struct {
WriteTimeout time.Duration
}
func addMissingPort(addr string, isTLS bool) string {
if strings.Contains(addr, ":") {
return addr
}
if isTLS {
return addr + ":443"
}
return addr + ":80"
}
var defaultDialer = &net.Dialer{}
// NewClient returns new Client for the given args.
func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxyAC *promauth.Config) (*Client, error) {
u, err := url.Parse(apiServer)
if err != nil {
return nil, fmt.Errorf("cannot parse provided url %q: %w", apiServer, err)
return nil, fmt.Errorf("cannot parse apiServer=%q: %w", apiServer, err)
}
// special case for unix socket connection
var dialFunc func(addr string) (net.Conn, error)
if string(u.Scheme) == "unix" {
dialFunc := defaultDialer.DialContext
if u.Scheme == "unix" {
// special case for unix socket connection
dialAddr := u.Path
apiServer = "http://"
dialFunc = func(_ string) (net.Conn, error) {
return net.Dial("unix", dialAddr)
apiServer = "http://unix"
dialFunc = func(ctx context.Context, _, _ string) (net.Conn, error) {
return defaultDialer.DialContext(ctx, "unix", dialAddr)
}
}
dialAddr := u.Host
isTLS := string(u.Scheme) == "https"
isTLS := u.Scheme == "https"
var tlsCfg *tls.Config
if isTLS {
tlsCfg = ac.NewTLSConfig()
}
setHTTPProxyHeaders := func(req *http.Request) {}
dialAddr = addMissingPort(dialAddr, isTLS)
if dialFunc == nil {
var err error
dialFunc, err = proxyURL.NewDialFunc(proxyAC)
if err != nil {
return nil, err
}
if proxyAC != nil {
setHTTPProxyHeaders = func(req *http.Request) {
proxyURL.SetHeaders(proxyAC, req)
}
}
var proxyURLFunc func(*http.Request) (*url.URL, error)
if pu := proxyURL.GetURL(); pu != nil {
proxyURLFunc = http.ProxyURL(pu)
}
hcTransport := &http.Transport{
TLSClientConfig: tlsCfg,
MaxConnsPerHost: 2 * *maxConcurrency,
ResponseHeaderTimeout: *maxWaitTime,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialFunc(dialAddr)
},
}
hc := &http.Client{
Timeout: DefaultClientReadTimeout,
Transport: hcTransport,
}
blockingTransport := &http.Transport{
TLSClientConfig: tlsCfg,
MaxConnsPerHost: 64 * 1024,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialFunc(dialAddr)
client := &http.Client{
Timeout: DefaultClientReadTimeout,
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: *maxConcurrency,
ResponseHeaderTimeout: DefaultClientReadTimeout,
DialContext: dialFunc,
},
}
blockingClient := &http.Client{
Timeout: BlockingClientReadTimeout,
Transport: blockingTransport,
Timeout: BlockingClientReadTimeout,
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 1000,
ResponseHeaderTimeout: BlockingClientReadTimeout,
DialContext: dialFunc,
},
}
setHTTPHeaders := func(req *http.Request) {}
if ac != nil {
setHTTPHeaders = func(req *http.Request) { ac.SetHeaders(req, true) }
setHTTPHeaders = func(req *http.Request) {
ac.SetHeaders(req, true)
}
}
setHTTPProxyHeaders := func(req *http.Request) {}
if proxyAC != nil {
setHTTPProxyHeaders = func(req *http.Request) {
proxyURL.SetHeaders(proxyAC, req)
}
}
ctx, cancel := context.WithCancel(context.Background())
return &Client{
client: &HTTPClient{client: hc, ReadTimeout: DefaultClientReadTimeout, WriteTimeout: DefaultClientWriteTimeout},
blockingClient: &HTTPClient{client: blockingClient, ReadTimeout: BlockingClientReadTimeout, WriteTimeout: DefaultClientWriteTimeout},
c := &Client{
client: &HTTPClient{
client: client,
ReadTimeout: DefaultClientReadTimeout,
WriteTimeout: DefaultClientWriteTimeout,
},
blockingClient: &HTTPClient{
client: blockingClient,
ReadTimeout: BlockingClientReadTimeout,
WriteTimeout: DefaultClientWriteTimeout,
},
apiServer: apiServer,
dialAddr: dialAddr,
setHTTPHeaders: setHTTPHeaders,
setHTTPProxyHeaders: setHTTPProxyHeaders,
clientCtx: ctx,
clientCancel: cancel,
}, nil
}
// Addr returns the address the client connects to.
func (c *Client) Addr() string {
return c.dialAddr
}
return c, nil
}
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
// modifyRequestParams should never reference data from request.
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequestParams func(request *http.Request)) ([]byte, error) {
return c.getAPIResponse(path, modifyRequestParams)
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest func(request *http.Request)) ([]byte, error) {
return c.getAPIResponse(path, modifyRequest)
}
// GetAPIResponse returns response for the given absolute path.
@ -205,12 +188,13 @@ func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Re
return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d",
c.apiServer, *maxWaitTime, *maxConcurrency)
}
defer func() { <-concurrencyLimitCh }()
defer func() {
<-concurrencyLimitCh
}()
return c.getAPIResponseWithParamsAndClient(c.client, path, modifyRequest, nil)
}
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
// inspectResponse - should never reference data from response.
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, nil, inspectResponse)
}
@ -222,7 +206,6 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
if err != nil {
return nil, fmt.Errorf("cannot parse %q: %w", requestURL, err)
}
u.Host = c.dialAddr
deadline := time.Now().Add(client.WriteTimeout)
ctx, cancel := context.WithDeadline(c.clientCtx, deadline)
@ -232,8 +215,6 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err)
}
req.Header.Set("Host", c.dialAddr)
req.Header.Set("Accept-Encoding", "gzip")
c.setHTTPHeaders(req)
c.setHTTPProxyHeaders(req)
if modifyRequest != nil {
@ -244,20 +225,11 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
if err != nil {
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
}
reader := resp.Body
if resp.Header.Get("Content-Encoding") == "gzip" {
reader, err = gzip.NewReader(resp.Body)
if err != nil {
return nil, fmt.Errorf("cannot create gzip reader for %q: %w", requestURL, err)
}
}
data, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("cannot ungzip response from %q: %w", requestURL, err)
}
data, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("cannot read response from %q: %w", requestURL, err)
}
if inspectResponse != nil {
inspectResponse(resp)
@ -280,10 +252,9 @@ func (c *Client) Stop() {
c.clientCancel()
}
// DoRequestWithPossibleRetry performs the given req at client and stores the response at resp.
func DoRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, requestCounter, retryCounter *metrics.Counter) (*http.Response, error) {
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
sleepTime := time.Second
requestCounter.Inc()
discoveryRequests.Inc()
deadline, ok := req.Context().Deadline()
if !ok {
deadline = time.Now().Add(hc.WriteTimeout)
@ -309,14 +280,10 @@ func DoRequestWithPossibleRetry(hc *HTTPClient, req *http.Request, requestCounte
sleepTime = maxSleepTime
}
time.Sleep(sleepTime)
retryCounter.Inc()
discoveryRetries.Inc()
}
}
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
return DoRequestWithPossibleRetry(hc, req, discoveryRequests, discoveryRetries)
}
var (
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)