From c04505e58553ce2541cf16e1be387da9077f3abb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 7 Mar 2021 19:50:01 +0200 Subject: [PATCH] lib/promscrape/discovery/kubernetes: reduce memory usage further when big number of scrape jobs are configured for the same `kubernetes_sd_config` role Serialize reloading per-role objects, so they don't occupy too much memory when objects for many scrape jobs are simultaneously refreshed. Do not reload per-role objects if they were already refreshed by concurrent goroutines. This should reduce load on Kubernetes API server when big number of scrape jobs are configured for the same Kubernetes role. This is a follow-up for 17b87725ed1da0813ecb9ac0405eb4520ae52931 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113 --- .../discovery/kubernetes/api_watcher.go | 56 +++++++++++++++---- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 420af9ff8..10066dea6 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -155,12 +155,12 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseO uw.watchersCount.Inc() uw.watchersCreated.Inc() - resourceVersion := uw.reloadObjects() + uw.reloadObjects() aw.wg.Add(1) go func() { defer aw.wg.Done() logger.Infof("started watcher for %q", apiURL) - uw.watchForUpdates(resourceVersion) + uw.watchForUpdates() logger.Infof("stopped watcher for %q", apiURL) uw.objectsByKey.decRef() @@ -205,9 +205,10 @@ type urlWatcher struct { // objectsByKey contains the latest state for objects obtained from apiURL objectsByKey *objectsMap - // mu protects swosByKey - mu sync.Mutex - swosByKey map[string][]interface{} + // mu protects swosByKey and resourceVersion + mu sync.Mutex + swosByKey map[string][]interface{} + resourceVersion string // the parent apiWatcher aw *apiWatcher @@ -236,8 +237,38 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject } } +// Limit the concurrency for per-role objects reloading to 1. +// +// This should reduce memory usage when big number of watchers simultaneously receive an update for objects of the same role. +var reloadObjectsLocksByRole = map[string]*sync.Mutex{ + "node": {}, + "pod": {}, + "service": {}, + "endpoints": {}, + "endpointslices": {}, + "ingress": {}, +} + +func (uw *urlWatcher) resetResourceVersion() { + uw.mu.Lock() + uw.resourceVersion = "" + uw.mu.Unlock() +} + // reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. func (uw *urlWatcher) reloadObjects() string { + lock := reloadObjectsLocksByRole[uw.role] + lock.Lock() + defer lock.Unlock() + + uw.mu.Lock() + resourceVersion := uw.resourceVersion + uw.mu.Unlock() + if resourceVersion != "" { + // Fast path - objects have been already reloaded by concurrent goroutines. + return resourceVersion + } + aw := uw.aw requestURL := uw.apiURL resp, err := aw.doRequest(requestURL) @@ -261,7 +292,7 @@ func (uw *urlWatcher) reloadObjects() string { return "" } uw.objectsByKey.reload(objectsByKey) - swosByKey := make(map[string][]interface{}, len(objectsByKey)) + swosByKey := make(map[string][]interface{}) for k, o := range objectsByKey { labels := o.getTargetLabels(aw) swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) @@ -271,7 +302,9 @@ func (uw *urlWatcher) reloadObjects() string { } uw.mu.Lock() uw.swosByKey = swosByKey + uw.resourceVersion = metadata.ResourceVersion uw.mu.Unlock() + return metadata.ResourceVersion } @@ -287,10 +320,10 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss [] return swos } -// watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state. +// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes -func (uw *urlWatcher) watchForUpdates(resourceVersion string) { +func (uw *urlWatcher) watchForUpdates() { aw := uw.aw backoffDelay := time.Second maxBackoffDelay := 30 * time.Second @@ -312,6 +345,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { if aw.needStop() { return } + resourceVersion := uw.reloadObjects() requestURL := apiURL if resourceVersion != "" { requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) @@ -323,7 +357,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { } logger.Errorf("error when performing a request to %q: %s", requestURL, err) backoffSleep() - resourceVersion = uw.reloadObjects() + uw.resetResourceVersion() continue } if resp.StatusCode != http.StatusOK { @@ -336,7 +370,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { } else { backoffSleep() } - resourceVersion = uw.reloadObjects() + uw.resetResourceVersion() continue } backoffDelay = time.Second @@ -350,7 +384,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) } backoffSleep() - resourceVersion = uw.reloadObjects() + uw.resetResourceVersion() continue } }