diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 46f463f20..f6919a69d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -22,6 +22,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: do not slow down concurrently executed queries during assisted merges, since assisted merges already prioritize data ingestion over queries. The probability of assisted merges has been increased starting from [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) because of internal refactoring. This could result in slowed down queries when there is a plenty of free CPU resources. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641) issues. * BUGFIX: reduce the increased CPU usage at `vmselect` to v1.85.3 level when processing heavy queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641). * BUGFIX: [retention filters](https://docs.victoriametrics.com/#retention-filters): fix `FATAL: cannot locate metric name for metricID=...: EOF` panic, which could occur when retention filters are enabled. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly cancel in-flight service discovery requests for [consul_sd_configs](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs) and [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs) when the service list changes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information. diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index 92c5deb4e..5f9d7e224 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -1,6 +1,7 @@ package consul import ( + "context" "flag" "fmt" "net/http" @@ -157,7 +158,7 @@ func maxWaitTime() time.Duration { // getBlockingAPIResponse perfoms blocking request to Consul via client and returns response. // // See https://www.consul.io/api-docs/features/blocking . -func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { +func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { path += "&index=" + strconv.FormatInt(index, 10) path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds())) getMeta := func(resp *http.Response) { @@ -182,7 +183,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index in } index = newIndex } - data, err := client.GetBlockingAPIResponse(path, getMeta) + data, err := client.GetBlockingAPIResponseCtx(ctx, path, getMeta) if err != nil { return nil, index, fmt.Errorf("cannot perform blocking Consul API request at %q: %w", path, err) } diff --git a/lib/promscrape/discovery/consul/watch.go b/lib/promscrape/discovery/consul/watch.go index 18d287ab3..6b0ae267c 100644 --- a/lib/promscrape/discovery/consul/watch.go +++ b/lib/promscrape/discovery/consul/watch.go @@ -34,15 +34,17 @@ type consulWatcher struct { servicesLock sync.Mutex services map[string]*serviceWatcher - stopCh chan struct{} stoppedCh chan struct{} } type serviceWatcher struct { serviceName string serviceNodes []ServiceNode - stopCh chan struct{} - stoppedCh chan struct{} + + stoppedCh chan struct{} + + requestCtx context.Context + requestCancel context.CancelFunc } // newConsulWatcher creates new watcher and starts background service discovery for Consul. @@ -71,7 +73,6 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, watchServices: sdc.Services, watchTags: sdc.Tags, services: make(map[string]*serviceWatcher), - stopCh: make(chan struct{}), stoppedCh: make(chan struct{}), } initCh := make(chan struct{}) @@ -85,7 +86,6 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, } func (cw *consulWatcher) mustStop() { - close(cw.stopCh) cw.client.Stop() <-cw.stoppedCh } @@ -100,10 +100,12 @@ func (cw *consulWatcher) updateServices(serviceNames []string) { // The watcher for serviceName already exists. continue } + ctx, cancel := context.WithCancel(cw.client.Context()) sw := &serviceWatcher{ - serviceName: serviceName, - stopCh: make(chan struct{}), - stoppedCh: make(chan struct{}), + serviceName: serviceName, + stoppedCh: make(chan struct{}), + requestCtx: ctx, + requestCancel: cancel, } cw.services[serviceName] = sw serviceWatchersCreated.Inc() @@ -126,7 +128,7 @@ func (cw *consulWatcher) updateServices(serviceNames []string) { if _, ok := newServiceNamesMap[serviceName]; ok { continue } - close(sw.stopCh) + sw.requestCancel() delete(cw.services, serviceName) swsStopped = append(swsStopped, sw) } @@ -173,24 +175,26 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) { checkInterval := getCheckInterval() ticker := time.NewTicker(checkInterval / 2) defer ticker.Stop() + stopCh := cw.client.Context().Done() for { select { case <-ticker.C: f() - case <-cw.stopCh: + case <-stopCh: logger.Infof("stopping Consul service watchers for %q", apiServer) startTime := time.Now() var swsStopped []*serviceWatcher cw.servicesLock.Lock() for _, sw := range cw.services { - close(sw.stopCh) + sw.requestCancel() swsStopped = append(swsStopped, sw) } cw.servicesLock.Unlock() for _, sw := range swsStopped { <-sw.stoppedCh + serviceWatchersStopped.Inc() } logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds()) return @@ -209,7 +213,7 @@ var ( // It returns an empty serviceNames list if response contains the same index. func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) { path := "/v1/catalog/services" + cw.serviceNamesQueryArgs - data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) + data, newIndex, err := getBlockingAPIResponse(cw.client.Context(), cw.client, path, index) if err != nil { return nil, index, err } @@ -242,7 +246,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG index := int64(0) path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs f := func() { - data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) + data, newIndex, err := getBlockingAPIResponse(sw.requestCtx, cw.client, path, index) if err != nil { if !errors.Is(err, context.Canceled) { logger.Errorf("cannot obtain Consul serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err) @@ -273,11 +277,12 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG checkInterval := getCheckInterval() ticker := time.NewTicker(checkInterval / 2) defer ticker.Stop() + stopCh := sw.requestCtx.Done() for { select { case <-ticker.C: f() - case <-sw.stopCh: + case <-stopCh: return } } diff --git a/lib/promscrape/discovery/nomad/api.go b/lib/promscrape/discovery/nomad/api.go index d6cccb7ed..6c5efcc73 100644 --- a/lib/promscrape/discovery/nomad/api.go +++ b/lib/promscrape/discovery/nomad/api.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "flag" "fmt" "net/http" @@ -116,7 +117,7 @@ func maxWaitTime() time.Duration { // getBlockingAPIResponse perfoms blocking request to Nomad via client and returns response. // See https://developer.hashicorp.com/nomad/api-docs#blocking-queries . -func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { +func getBlockingAPIResponse(ctx context.Context, client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) { path += "&index=" + strconv.FormatInt(index, 10) path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds())) getMeta := func(resp *http.Response) { @@ -142,7 +143,7 @@ func getBlockingAPIResponse(client *discoveryutils.Client, path string, index in } index = newIndex } - data, err := client.GetBlockingAPIResponse(path, getMeta) + data, err := client.GetBlockingAPIResponseCtx(ctx, path, getMeta) if err != nil { return nil, index, fmt.Errorf("cannot perform blocking Nomad API request at %q: %w", path, err) } diff --git a/lib/promscrape/discovery/nomad/watch.go b/lib/promscrape/discovery/nomad/watch.go index f3510e206..2c8d42a9a 100644 --- a/lib/promscrape/discovery/nomad/watch.go +++ b/lib/promscrape/discovery/nomad/watch.go @@ -30,15 +30,17 @@ type nomadWatcher struct { servicesLock sync.Mutex services map[string]*serviceWatcher - stopCh chan struct{} stoppedCh chan struct{} } type serviceWatcher struct { serviceName string services []Service - stopCh chan struct{} - stoppedCh chan struct{} + + stoppedCh chan struct{} + + requestCtx context.Context + requestCancel context.CancelFunc } // newNomadWatcher creates new watcher and starts background service discovery for Nomad. @@ -62,7 +64,6 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, namespace, re client: client, serviceNamesQueryArgs: queryArgs, services: make(map[string]*serviceWatcher), - stopCh: make(chan struct{}), stoppedCh: make(chan struct{}), } initCh := make(chan struct{}) @@ -76,7 +77,6 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, namespace, re } func (cw *nomadWatcher) mustStop() { - close(cw.stopCh) cw.client.Stop() <-cw.stoppedCh } @@ -91,10 +91,12 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) { // The watcher for serviceName already exists. continue } + ctx, cancel := context.WithCancel(cw.client.Context()) sw := &serviceWatcher{ - serviceName: serviceName, - stopCh: make(chan struct{}), - stoppedCh: make(chan struct{}), + serviceName: serviceName, + stoppedCh: make(chan struct{}), + requestCtx: ctx, + requestCancel: cancel, } cw.services[serviceName] = sw serviceWatchersCreated.Inc() @@ -117,7 +119,7 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) { if _, ok := newServiceNamesMap[serviceName]; ok { continue } - close(sw.stopCh) + sw.requestCancel() delete(cw.services, serviceName) swsStopped = append(swsStopped, sw) } @@ -164,24 +166,26 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) { checkInterval := getCheckInterval() ticker := time.NewTicker(checkInterval / 2) defer ticker.Stop() + stopCh := cw.client.Context().Done() for { select { case <-ticker.C: f() - case <-cw.stopCh: + case <-stopCh: logger.Infof("stopping Nomad service watchers for %q", apiServer) startTime := time.Now() var swsStopped []*serviceWatcher cw.servicesLock.Lock() for _, sw := range cw.services { - close(sw.stopCh) + sw.requestCancel() swsStopped = append(swsStopped, sw) } cw.servicesLock.Unlock() for _, sw := range swsStopped { <-sw.stoppedCh + serviceWatchersStopped.Inc() } logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds()) return @@ -200,7 +204,7 @@ var ( // It returns an empty serviceNames list if response contains the same index. func (cw *nomadWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) { path := "/v1/services" + cw.serviceNamesQueryArgs - data, newIndex, err := getBlockingAPIResponse(cw.client, path, index) + data, newIndex, err := getBlockingAPIResponse(cw.client.Context(), cw.client, path, index) if err != nil { return nil, index, err } @@ -244,7 +248,7 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG // TODO: Maybe use a different query arg. path := "/v1/service/" + sw.serviceName + nw.serviceNamesQueryArgs f := func() { - data, newIndex, err := getBlockingAPIResponse(nw.client, path, index) + data, newIndex, err := getBlockingAPIResponse(sw.requestCtx, nw.client, path, index) if err != nil { if !errors.Is(err, context.Canceled) { logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, apiServer, err) @@ -275,11 +279,12 @@ func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG checkInterval := getCheckInterval() ticker := time.NewTicker(checkInterval / 2) defer ticker.Stop() + stopCh := sw.requestCtx.Done() for { select { case <-ticker.C: f() - case <-sw.stopCh: + case <-stopCh: return } } diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 301b73b0b..319c1f2f9 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -159,6 +159,11 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy return c, nil } +// Context returns context for the client requests. +func (c *Client) Context() context.Context { + return c.clientCtx +} + // GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request. func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest func(request *http.Request)) ([]byte, error) { return c.getAPIResponse(path, modifyRequest) @@ -185,16 +190,21 @@ func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Re defer func() { <-concurrencyLimitCh }() - return c.getAPIResponseWithParamsAndClient(c.client, path, modifyRequest, nil) + return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.client, path, modifyRequest, nil) } // GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response, func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *http.Response)) ([]byte, error) { - return c.getAPIResponseWithParamsAndClient(c.blockingClient, path, nil, inspectResponse) + return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.blockingClient, path, nil, inspectResponse) +} + +// GetBlockingAPIResponseCtx returns response for given absolute path with blocking client and optional callback for api response, +func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, inspectResponse func(resp *http.Response)) ([]byte, error) { + return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, nil, inspectResponse) } // getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response. -func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path string, modifyRequest func(req *http.Request), inspectResponse func(resp *http.Response)) ([]byte, error) { +func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest func(req *http.Request), inspectResponse func(resp *http.Response)) ([]byte, error) { requestURL := c.apiServer + path u, err := url.Parse(requestURL) if err != nil { @@ -202,7 +212,7 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *HTTPClient, path stri } deadline := time.Now().Add(client.ReadTimeout) - ctx, cancel := context.WithDeadline(c.clientCtx, deadline) + ctx, cancel := context.WithDeadline(ctx, deadline) defer cancel() req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) if err != nil {