From de2b3ff9b093bb4abb110a66a896e4d447829697 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 18 Sep 2023 23:23:41 +0200 Subject: [PATCH] lib/promscrape/discovery/kubernetes: follow-up after 03fece44e0f3803e16e5d9f3d71423a79b6d8785 - Properly update vm_promscrape_discovery_kubernetes_url_watchers and vm_promscrape_discovery_kubernetes_group_watchers metrics after config changes - Properly stop goroutine responsible for recreating scrapeWorks after the corresponding urlWatcher is stopped - Log the event when urlWatcher is stopped in order to simplify debugging Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4861 --- .../discovery/kubernetes/api_watcher.go | 82 ++++++++++++++----- 1 file changed, 61 insertions(+), 21 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index d358113279..3d4f0c8069 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -23,6 +23,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" ) var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server") @@ -271,7 +272,11 @@ func selectorsKey(selectors []Selector) string { var ( groupWatchersLock sync.Mutex - groupWatchers = make(map[string]*groupWatcher) + groupWatchers = func() map[string]*groupWatcher { + gws := make(map[string]*groupWatcher) + go groupWatchersCleaner(gws) + return gws + }() _ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 { groupWatchersLock.Lock() @@ -281,6 +286,21 @@ var ( }) ) +func groupWatchersCleaner(gws map[string]*groupWatcher) { + for { + time.Sleep(7 * time.Second) + groupWatchersLock.Lock() + for key, gw := range gws { + gw.mu.Lock() + if len(gw.m) == 0 { + delete(gws, key) + } + gw.mu.Unlock() + } + groupWatchersLock.Unlock() + } +} + type swosByKeyWithLock struct { mu sync.Mutex swosByKey map[string][]interface{} @@ -380,24 +400,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { // This should guarantee that the ScrapeWork objects for these objects are properly updated // as soon as the objects they depend on are updated. // This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 . - go func() { - const minSleepTime = 5 * time.Second - sleepTime := minSleepTime - for { - time.Sleep(sleepTime) - startTime := time.Now() - gw.mu.Lock() - if uw.needRecreateScrapeWorks { - uw.needRecreateScrapeWorks = false - uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws) - sleepTime = time.Since(startTime) - if sleepTime < minSleepTime { - sleepTime = minSleepTime - } - } - gw.mu.Unlock() - } - }() + go uw.recreateScrapeWorks() } } } @@ -533,6 +536,34 @@ func (uw *urlWatcher) stopIfNoUsers() { gw.mu.Unlock() } +func (uw *urlWatcher) recreateScrapeWorks() { + const minSleepTime = 5 * time.Second + sleepTime := minSleepTime + gw := uw.gw + stopCh := uw.ctx.Done() + for { + t := timerpool.Get(sleepTime) + select { + case <-stopCh: + timerpool.Put(t) + return + case <-t.C: + timerpool.Put(t) + } + startTime := time.Now() + gw.mu.Lock() + if uw.needRecreateScrapeWorks { + uw.needRecreateScrapeWorks = false + uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws) + sleepTime = time.Since(startTime) + if sleepTime < minSleepTime { + sleepTime = minSleepTime + } + } + gw.mu.Unlock() + } +} + func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) { if _, ok := uw.aws[aw]; !ok { if _, ok := uw.awsPending[aw]; !ok { @@ -678,10 +709,18 @@ func (uw *urlWatcher) reloadObjects() string { // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes func (uw *urlWatcher) watchForUpdates() { + stopCh := uw.ctx.Done() backoffDelay := time.Second maxBackoffDelay := 30 * time.Second backoffSleep := func() { - time.Sleep(backoffDelay) + t := timerpool.Get(backoffDelay) + select { + case <-stopCh: + timerpool.Put(t) + return + case <-t.C: + timerpool.Put(t) + } backoffDelay *= 2 if backoffDelay > maxBackoffDelay { backoffDelay = maxBackoffDelay @@ -691,10 +730,11 @@ func (uw *urlWatcher) watchForUpdates() { delimiter := getQueryArgsDelimiter(apiURL) timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds() apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) - stopCh := uw.ctx.Done() for { select { case <-stopCh: + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, uw.role)).Dec() + logger.Infof("stopped %s watcher for %q", uw.role, uw.apiURL) return default: }