discovery/{consul,nomad}: fix cancelling serviceWatcher in-flight requests (#3658)

* lib/promscrape/discovery/{consul,nomad}: fix background service update watches not canceling requests on serviceWatcher stop

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape/discovery/{consul,nomad}: fix closing serviseWatcher during scrape job restart

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* wip

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-01-18 09:47:11 +04:00 committed by GitHub
parent 46b3b76d6d
commit 322d96bfe5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 59 additions and 36 deletions

View file

@ -22,6 +22,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: do not slow down concurrently executed queries during assisted merges, since assisted merges already prioritize data ingestion over queries. The probability of assisted merges has been increased starting from [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) because of internal refactoring. This could result in slowed down queries when there is a plenty of free CPU resources. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641) issues.
* BUGFIX: reduce the increased CPU usage at `vmselect` to v1.85.3 level when processing heavy queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641).
* BUGFIX: [retention filters](https://docs.victoriametrics.com/#retention-filters): fix `FATAL: cannot locate metric name for metricID=...: EOF` panic, which could occur when retention filters are enabled.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly cancel in-flight service discovery requests for [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs) and [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs) when the service list changes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information.

View file

@ -1,6 +1,7 @@
package consul
import (
"context"
"flag"
"fmt"
"net/http"
@ -157,7 +158,7 @@ func maxWaitTime() time.Duration {
// getBlockingAPIResponse perfoms blocking request to Consul via client and returns response.
//
// See https://www.consul.io/api-docs/features/blocking .
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
path += "&index=" + strconv.FormatInt(index, 10)
path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds()))
getMeta := func(resp *http.Response) {
@ -182,7 +183,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index in
}
index = newIndex
}
data, err := client.GetBlockingAPIResponse(path, getMeta)
data, err := client.GetBlockingAPIResponseCtx(ctx, path, getMeta)
if err != nil {
return nil, index, fmt.Errorf("cannot perform blocking Consul API request at %q: %w", path, err)
}

View file

@ -34,15 +34,17 @@ type consulWatcher struct {
servicesLock sync.Mutex
services map[string]*serviceWatcher
stopCh chan struct{}
stoppedCh chan struct{}
}
type serviceWatcher struct {
serviceName string
serviceNodes []ServiceNode
stopCh chan struct{}
stoppedCh chan struct{}
stoppedCh chan struct{}
requestCtx context.Context
requestCancel context.CancelFunc
}
// newConsulWatcher creates new watcher and starts background service discovery for Consul.
@ -71,7 +73,6 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
watchServices: sdc.Services,
watchTags: sdc.Tags,
services: make(map[string]*serviceWatcher),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
initCh := make(chan struct{})
@ -85,7 +86,6 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
}
func (cw *consulWatcher) mustStop() {
close(cw.stopCh)
cw.client.Stop()
<-cw.stoppedCh
}
@ -100,10 +100,12 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
// The watcher for serviceName already exists.
continue
}
ctx, cancel := context.WithCancel(cw.client.Context())
sw := &serviceWatcher{
serviceName: serviceName,
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
serviceName: serviceName,
stoppedCh: make(chan struct{}),
requestCtx: ctx,
requestCancel: cancel,
}
cw.services[serviceName] = sw
serviceWatchersCreated.Inc()
@ -126,7 +128,7 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
close(sw.stopCh)
sw.requestCancel()
delete(cw.services, serviceName)
swsStopped = append(swsStopped, sw)
}
@ -173,24 +175,26 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
stopCh := cw.client.Context().Done()
for {
select {
case <-ticker.C:
f()
case <-cw.stopCh:
case <-stopCh:
logger.Infof("stopping Consul service watchers for %q", apiServer)
startTime := time.Now()
var swsStopped []*serviceWatcher
cw.servicesLock.Lock()
for _, sw := range cw.services {
close(sw.stopCh)
sw.requestCancel()
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return
@ -209,7 +213,7 @@ var (
// It returns an empty serviceNames list if response contains the same index.
func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) {
path := "/v1/catalog/services" + cw.serviceNamesQueryArgs
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
data, newIndex, err := getBlockingAPIResponse(cw.client.Context(), cw.client, path, index)
if err != nil {
return nil, index, err
}
@ -242,7 +246,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
index := int64(0)
path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs
f := func() {
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
data, newIndex, err := getBlockingAPIResponse(sw.requestCtx, cw.client, path, index)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
@ -273,11 +277,12 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
stopCh := sw.requestCtx.Done()
for {
select {
case <-ticker.C:
f()
case <-sw.stopCh:
case <-stopCh:
return
}
}

View file

@ -1,6 +1,7 @@
package nomad
import (
"context"
"flag"
"fmt"
"net/http"
@ -116,7 +117,7 @@ func maxWaitTime() time.Duration {
// getBlockingAPIResponse perfoms blocking request to Nomad via client and returns response.
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries .
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
path += "&index=" + strconv.FormatInt(index, 10)
path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds()))
getMeta := func(resp *http.Response) {
@ -142,7 +143,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index in
}
index = newIndex
}
data, err := client.GetBlockingAPIResponse(path, getMeta)
data, err := client.GetBlockingAPIResponseCtx(ctx, path, getMeta)
if err != nil {
return nil, index, fmt.Errorf("cannot perform blocking Nomad API request at %q: %w", path, err)
}

View file

@ -30,15 +30,17 @@ type nomadWatcher struct {
servicesLock sync.Mutex
services map[string]*serviceWatcher
stopCh chan struct{}
stoppedCh chan struct{}
}
type serviceWatcher struct {
serviceName string
services []Service
stopCh chan struct{}
stoppedCh chan struct{}
stoppedCh chan struct{}
requestCtx context.Context
requestCancel context.CancelFunc
}
// newNomadWatcher creates new watcher and starts background service discovery for Nomad.
@ -62,7 +64,6 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, namespace, re
client: client,
serviceNamesQueryArgs: queryArgs,
services: make(map[string]*serviceWatcher),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
initCh := make(chan struct{})
@ -76,7 +77,6 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, namespace, re
}
func (cw *nomadWatcher) mustStop() {
close(cw.stopCh)
cw.client.Stop()
<-cw.stoppedCh
}
@ -91,10 +91,12 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
// The watcher for serviceName already exists.
continue
}
ctx, cancel := context.WithCancel(cw.client.Context())
sw := &serviceWatcher{
serviceName: serviceName,
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
serviceName: serviceName,
stoppedCh: make(chan struct{}),
requestCtx: ctx,
requestCancel: cancel,
}
cw.services[serviceName] = sw
serviceWatchersCreated.Inc()
@ -117,7 +119,7 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
close(sw.stopCh)
sw.requestCancel()
delete(cw.services, serviceName)
swsStopped = append(swsStopped, sw)
}
@ -164,24 +166,26 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
stopCh := cw.client.Context().Done()
for {
select {
case <-ticker.C:
f()
case <-cw.stopCh:
case <-stopCh:
logger.Infof("stopping Nomad service watchers for %q", apiServer)
startTime := time.Now()
var swsStopped []*serviceWatcher
cw.servicesLock.Lock()
for _, sw := range cw.services {
close(sw.stopCh)
sw.requestCancel()
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return
@ -200,7 +204,7 @@ var (
// It returns an empty serviceNames list if response contains the same index.
func (cw *nomadWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) {
path := "/v1/services" + cw.serviceNamesQueryArgs
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
data, newIndex, err := getBlockingAPIResponse(cw.client.Context(), cw.client, path, index)
if err != nil {
return nil, index, err
}
@ -244,7 +248,7 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG
// TODO: Maybe use a different query arg.
path := "/v1/service/" + sw.serviceName + nw.serviceNamesQueryArgs
f := func() {
data, newIndex, err := getBlockingAPIResponse(nw.client, path, index)
data, newIndex, err := getBlockingAPIResponse(sw.requestCtx, nw.client, path, index)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
@ -275,11 +279,12 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
stopCh := sw.requestCtx.Done()
for {
select {
case <-ticker.C:
f()
case <-sw.stopCh:
case <-stopCh:
return
}
}

View file

@ -159,6 +159,11 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
return c, nil
}
// Context returns context for the client requests.
func (c *Client) Context() context.Context {
return c.clientCtx
}
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest func(request *http.Request)) ([]byte, error) {
return c.getAPIResponse(path, modifyRequest)
@ -185,16 +190,21 @@ func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Re
defer func() {
<-concurrencyLimitCh
}()
return c.getAPIResponseWithParamsAndClient(c.client, path, modifyRequest, nil)
return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.client, path, modifyRequest, nil)
}
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, nil, inspectResponse)
return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.blockingClient, path, nil, inspectResponse)
}
// GetBlockingAPIResponseCtx returns response for given absolute path with blocking client and optional callback for api response,
func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, nil, inspectResponse)
}
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path string, modifyRequest func(req *http.Request), inspectResponse func(resp *http.Response)) ([]byte, error) {
func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest func(req *http.Request), inspectResponse func(resp *http.Response)) ([]byte, error) {
requestURL := c.apiServer + path
u, err := url.Parse(requestURL)
if err != nil {
@ -202,7 +212,7 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri
}
deadline := time.Now().Add(client.ReadTimeout)
ctx, cancel := context.WithDeadline(c.clientCtx, deadline)
ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {