From 669ad1ca5565a1583954d9736527925139fd820e Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Fri, 15 Sep 2023 21:40:13 +0400 Subject: [PATCH] lib/promscrape/discovery/kubernetes: fix leaking api watcher (#4861) * lib/promscrape/discovery/kubernetes: fix leaking api watcher goroutine which was polling k8s API had no execution control. This leaded to leaking goroutines during config reload. See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850 Signed-off-by: Zakhar Bessarab * lib/promscrape/discovery/kubernetes: use reference counting for urlWatcher cleanup Signed-off-by: Zakhar Bessarab * lib/promscrape/discovery/kubernetes: remove waitgroup sync for goroutines polling API server This is unnecessary since context will is cancelled and new requests will not be sent. Also, using waitgroup will increase time required to perform reload which might result in missed scrapes. Signed-off-by: Zakhar Bessarab * lib/promscrape/discovery/kubernetes: clarify comment Signed-off-by: Zakhar Bessarab * Apply suggestions from code review * lib/promscrape/discovery/kubernetes: address review feedback Signed-off-by: Zakhar Bessarab --------- Signed-off-by: Zakhar Bessarab Co-authored-by: Nikolay --- docs/CHANGELOG.md | 1 + .../discovery/kubernetes/api_watcher.go | 42 +++++++++++++++---- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6c5678917e..b2b193641b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -67,6 +67,7 @@ The v1.93.x line will be supported for at least 12 months since [v1.93.0](https: * BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071). * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): don't interrupt the migration process if no metrics were found for a specific tenant. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4796). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix the thread leak that occurs during the polling of the Kubernetes API server after a configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850). ## [v1.93.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.0) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 78bb7ddd75..60f91cea4b 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "encoding/json" "errors" "flag" @@ -16,11 +17,12 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "github.com/VictoriaMetrics/metrics" ) var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server") @@ -402,7 +404,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { } // doRequest performs http request to the given requestURL. -func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { +func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http.Response, error) { if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 { // Update networking URL for old Kubernetes API, which supports only v1beta1 path. requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1) @@ -411,7 +413,7 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { // Update discovery URL for old Kubernetes API, which supports only v1beta1 path. requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1) } - req, err := http.NewRequest(http.MethodGet, requestURL, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil) if err != nil { logger.Fatalf("cannot create a request for %q: %s", requestURL, err) } @@ -423,11 +425,11 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { if resp.StatusCode == http.StatusNotFound { if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 { atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1) - return gw.doRequest(requestURL) + return gw.doRequest(ctx, requestURL) } if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 { atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1) - return gw.doRequest(requestURL) + return gw.doRequest(ctx, requestURL) } } return resp, nil @@ -444,8 +446,12 @@ func (gw *groupWatcher) registerPendingAPIWatchers() { func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { gw.mu.Lock() defer gw.mu.Unlock() - for _, uw := range gw.m { + for key, uw := range gw.m { uw.unsubscribeAPIWatcherLocked(aw) + if (len(uw.aws) + len(uw.awsPending)) == 0 { + uw.cancel() + delete(gw.m, key) + } } } @@ -458,6 +464,9 @@ type urlWatcher struct { apiURL string gw *groupWatcher + ctx context.Context + cancel context.CancelFunc + parseObject parseObjectFunc parseObjectList parseObjectListFunc @@ -488,11 +497,17 @@ type urlWatcher struct { func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { parseObject, parseObjectList := getObjectParsersForRole(role) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() + + ctx, cancel := context.WithCancel(context.Background()) uw := &urlWatcher{ role: role, apiURL: apiURL, gw: gw, + refCount: 0, + ctx: ctx, + cancel: cancel, + parseObject: parseObject, parseObjectList: parseObjectList, @@ -587,7 +602,7 @@ func (uw *urlWatcher) reloadObjects() string { // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 . delimiter := getQueryArgsDelimiter(apiURL) requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan" - resp, err := uw.gw.doRequest(requestURL) + resp, err := uw.gw.doRequest(uw.ctx, requestURL) if err != nil { logger.Errorf("cannot perform request to %q: %s", requestURL, err) return "" @@ -667,13 +682,19 @@ func (uw *urlWatcher) watchForUpdates() { timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds() apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) for { + select { + case <-uw.ctx.Done(): + return + default: + } + resourceVersion := uw.reloadObjects() if resourceVersion == "" { backoffSleep() continue } requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion) - resp, err := uw.gw.doRequest(requestURL) + resp, err := uw.gw.doRequest(uw.ctx, requestURL) if err != nil { logger.Errorf("cannot perform request to %q: %s", requestURL, err) backoffSleep() @@ -823,6 +844,11 @@ func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() { } } +// close cancels context used for API polling +func (uw *urlWatcher) close() { + uw.cancel() +} + // Bookmark is a bookmark message from Kubernetes Watch API. // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks type Bookmark struct {