lib/promscrape/discovery/kubernetes: follow-up after 03fece44e0

- Properly update vm_promscrape_discovery_kubernetes_url_watchers
  and vm_promscrape_discovery_kubernetes_group_watchers metrics after config changes

- Properly stop goroutine responsible for recreating scrapeWorks after the corresponding urlWatcher is stopped

- Log the event when urlWatcher is stopped in order to simplify debugging

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4861
This commit is contained in:
Aliaksandr Valialkin 2023-09-18 23:23:41 +02:00
parent 705b31c351
commit de2b3ff9b0
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -23,6 +23,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
) )
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server") var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server")
@ -271,7 +272,11 @@ func selectorsKey(selectors []Selector) string {
var ( var (
groupWatchersLock sync.Mutex groupWatchersLock sync.Mutex
groupWatchers = make(map[string]*groupWatcher) groupWatchers = func() map[string]*groupWatcher {
gws := make(map[string]*groupWatcher)
go groupWatchersCleaner(gws)
return gws
}()
_ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 { _ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 {
groupWatchersLock.Lock() groupWatchersLock.Lock()
@ -281,6 +286,21 @@ var (
}) })
) )
func groupWatchersCleaner(gws map[string]*groupWatcher) {
for {
time.Sleep(7 * time.Second)
groupWatchersLock.Lock()
for key, gw := range gws {
gw.mu.Lock()
if len(gw.m) == 0 {
delete(gws, key)
}
gw.mu.Unlock()
}
groupWatchersLock.Unlock()
}
}
type swosByKeyWithLock struct { type swosByKeyWithLock struct {
mu sync.Mutex mu sync.Mutex
swosByKey map[string][]interface{} swosByKey map[string][]interface{}
@ -380,24 +400,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
// This should guarantee that the ScrapeWork objects for these objects are properly updated // This should guarantee that the ScrapeWork objects for these objects are properly updated
// as soon as the objects they depend on are updated. // as soon as the objects they depend on are updated.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 . // This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
go func() { go uw.recreateScrapeWorks()
const minSleepTime = 5 * time.Second
sleepTime := minSleepTime
for {
time.Sleep(sleepTime)
startTime := time.Now()
gw.mu.Lock()
if uw.needRecreateScrapeWorks {
uw.needRecreateScrapeWorks = false
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
sleepTime = time.Since(startTime)
if sleepTime < minSleepTime {
sleepTime = minSleepTime
}
}
gw.mu.Unlock()
}
}()
} }
} }
} }
@ -533,6 +536,34 @@ func (uw *urlWatcher) stopIfNoUsers() {
gw.mu.Unlock() gw.mu.Unlock()
} }
func (uw *urlWatcher) recreateScrapeWorks() {
const minSleepTime = 5 * time.Second
sleepTime := minSleepTime
gw := uw.gw
stopCh := uw.ctx.Done()
for {
t := timerpool.Get(sleepTime)
select {
case <-stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
}
startTime := time.Now()
gw.mu.Lock()
if uw.needRecreateScrapeWorks {
uw.needRecreateScrapeWorks = false
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
sleepTime = time.Since(startTime)
if sleepTime < minSleepTime {
sleepTime = minSleepTime
}
}
gw.mu.Unlock()
}
}
func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) { func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) {
if _, ok := uw.aws[aw]; !ok { if _, ok := uw.aws[aw]; !ok {
if _, ok := uw.awsPending[aw]; !ok { if _, ok := uw.awsPending[aw]; !ok {
@ -678,10 +709,18 @@ func (uw *urlWatcher) reloadObjects() string {
// //
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates() { func (uw *urlWatcher) watchForUpdates() {
stopCh := uw.ctx.Done()
backoffDelay := time.Second backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second maxBackoffDelay := 30 * time.Second
backoffSleep := func() { backoffSleep := func() {
time.Sleep(backoffDelay) t := timerpool.Get(backoffDelay)
select {
case <-stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
}
backoffDelay *= 2 backoffDelay *= 2
if backoffDelay > maxBackoffDelay { if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay backoffDelay = maxBackoffDelay
@ -691,10 +730,11 @@ func (uw *urlWatcher) watchForUpdates() {
delimiter := getQueryArgsDelimiter(apiURL) delimiter := getQueryArgsDelimiter(apiURL)
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds() timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
stopCh := uw.ctx.Done()
for { for {
select { select {
case <-stopCh: case <-stopCh:
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, uw.role)).Dec()
logger.Infof("stopped %s watcher for %q", uw.role, uw.apiURL)
return return
default: default:
} }