diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 54cb1f4d0..8298e8c6b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -103,6 +103,7 @@ The v1.93.x line will be supported for at least 12 months since [v1.93.0](https: * BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071). * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): don't interrupt the migration process if no metrics were found for a specific tenant. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4796). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix the thread leak that occurs during the polling of the Kubernetes API server after a configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850). ## [v1.93.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.0) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 78bb7ddd7..60f91cea4 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "encoding/json" "errors" "flag" @@ -16,11 +17,12 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "github.com/VictoriaMetrics/metrics" ) var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server") @@ -402,7 +404,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { } // doRequest performs http request to the given requestURL. -func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { +func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http.Response, error) { if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 { // Update networking URL for old Kubernetes API, which supports only v1beta1 path. requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1) @@ -411,7 +413,7 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { // Update discovery URL for old Kubernetes API, which supports only v1beta1 path. requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1) } - req, err := http.NewRequest(http.MethodGet, requestURL, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil) if err != nil { logger.Fatalf("cannot create a request for %q: %s", requestURL, err) } @@ -423,11 +425,11 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { if resp.StatusCode == http.StatusNotFound { if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 { atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1) - return gw.doRequest(requestURL) + return gw.doRequest(ctx, requestURL) } if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 { atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1) - return gw.doRequest(requestURL) + return gw.doRequest(ctx, requestURL) } } return resp, nil @@ -444,8 +446,12 @@ func (gw *groupWatcher) registerPendingAPIWatchers() { func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { gw.mu.Lock() defer gw.mu.Unlock() - for _, uw := range gw.m { + for key, uw := range gw.m { uw.unsubscribeAPIWatcherLocked(aw) + if (len(uw.aws) + len(uw.awsPending)) == 0 { + uw.cancel() + delete(gw.m, key) + } } } @@ -458,6 +464,9 @@ type urlWatcher struct { apiURL string gw *groupWatcher + ctx context.Context + cancel context.CancelFunc + parseObject parseObjectFunc parseObjectList parseObjectListFunc @@ -488,11 +497,17 @@ type urlWatcher struct { func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { parseObject, parseObjectList := getObjectParsersForRole(role) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() + + ctx, cancel := context.WithCancel(context.Background()) uw := &urlWatcher{ role: role, apiURL: apiURL, gw: gw, + refCount: 0, + ctx: ctx, + cancel: cancel, + parseObject: parseObject, parseObjectList: parseObjectList, @@ -587,7 +602,7 @@ func (uw *urlWatcher) reloadObjects() string { // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 . delimiter := getQueryArgsDelimiter(apiURL) requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan" - resp, err := uw.gw.doRequest(requestURL) + resp, err := uw.gw.doRequest(uw.ctx, requestURL) if err != nil { logger.Errorf("cannot perform request to %q: %s", requestURL, err) return "" @@ -667,13 +682,19 @@ func (uw *urlWatcher) watchForUpdates() { timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds() apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) for { + select { + case <-uw.ctx.Done(): + return + default: + } + resourceVersion := uw.reloadObjects() if resourceVersion == "" { backoffSleep() continue } requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion) - resp, err := uw.gw.doRequest(requestURL) + resp, err := uw.gw.doRequest(uw.ctx, requestURL) if err != nil { logger.Errorf("cannot perform request to %q: %s", requestURL, err) backoffSleep() @@ -823,6 +844,11 @@ func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() { } } +// close cancels context used for API polling +func (uw *urlWatcher) close() { + uw.cancel() +} + // Bookmark is a bookmark message from Kubernetes Watch API. // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks type Bookmark struct {