diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 420af9ff8..10066dea6 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -155,12 +155,12 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseO uw.watchersCount.Inc() uw.watchersCreated.Inc() - resourceVersion := uw.reloadObjects() + uw.reloadObjects() aw.wg.Add(1) go func() { defer aw.wg.Done() logger.Infof("started watcher for %q", apiURL) - uw.watchForUpdates(resourceVersion) + uw.watchForUpdates() logger.Infof("stopped watcher for %q", apiURL) uw.objectsByKey.decRef() @@ -205,9 +205,10 @@ type urlWatcher struct { // objectsByKey contains the latest state for objects obtained from apiURL objectsByKey *objectsMap - // mu protects swosByKey - mu sync.Mutex - swosByKey map[string][]interface{} + // mu protects swosByKey and resourceVersion + mu sync.Mutex + swosByKey map[string][]interface{} + resourceVersion string // the parent apiWatcher aw *apiWatcher @@ -236,8 +237,38 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject } } +// Limit the concurrency for per-role objects reloading to 1. +// +// This should reduce memory usage when big number of watchers simultaneously receive an update for objects of the same role. +var reloadObjectsLocksByRole = map[string]*sync.Mutex{ + "node": {}, + "pod": {}, + "service": {}, + "endpoints": {}, + "endpointslices": {}, + "ingress": {}, +} + +func (uw *urlWatcher) resetResourceVersion() { + uw.mu.Lock() + uw.resourceVersion = "" + uw.mu.Unlock() +} + // reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. func (uw *urlWatcher) reloadObjects() string { + lock := reloadObjectsLocksByRole[uw.role] + lock.Lock() + defer lock.Unlock() + + uw.mu.Lock() + resourceVersion := uw.resourceVersion + uw.mu.Unlock() + if resourceVersion != "" { + // Fast path - objects have been already reloaded by concurrent goroutines. + return resourceVersion + } + aw := uw.aw requestURL := uw.apiURL resp, err := aw.doRequest(requestURL) @@ -261,7 +292,7 @@ func (uw *urlWatcher) reloadObjects() string { return "" } uw.objectsByKey.reload(objectsByKey) - swosByKey := make(map[string][]interface{}, len(objectsByKey)) + swosByKey := make(map[string][]interface{}) for k, o := range objectsByKey { labels := o.getTargetLabels(aw) swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) @@ -271,7 +302,9 @@ func (uw *urlWatcher) reloadObjects() string { } uw.mu.Lock() uw.swosByKey = swosByKey + uw.resourceVersion = metadata.ResourceVersion uw.mu.Unlock() + return metadata.ResourceVersion } @@ -287,10 +320,10 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss [] return swos } -// watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state. +// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes -func (uw *urlWatcher) watchForUpdates(resourceVersion string) { +func (uw *urlWatcher) watchForUpdates() { aw := uw.aw backoffDelay := time.Second maxBackoffDelay := 30 * time.Second @@ -312,6 +345,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { if aw.needStop() { return } + resourceVersion := uw.reloadObjects() requestURL := apiURL if resourceVersion != "" { requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) @@ -323,7 +357,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { } logger.Errorf("error when performing a request to %q: %s", requestURL, err) backoffSleep() - resourceVersion = uw.reloadObjects() + uw.resetResourceVersion() continue } if resp.StatusCode != http.StatusOK { @@ -336,7 +370,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { } else { backoffSleep() } - resourceVersion = uw.reloadObjects() + uw.resetResourceVersion() continue } backoffDelay = time.Second @@ -350,7 +384,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) } backoffSleep() - resourceVersion = uw.reloadObjects() + uw.resetResourceVersion() continue } }