mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape/discovery/kubernetes: follow-up after 03fece44e0
- Properly update vm_promscrape_discovery_kubernetes_url_watchers and vm_promscrape_discovery_kubernetes_group_watchers metrics after config changes - Properly stop goroutine responsible for recreating scrapeWorks after the corresponding urlWatcher is stopped - Log the event when urlWatcher is stopped in order to simplify debugging Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4861
This commit is contained in:
parent
03fece44e0
commit
30a645cd82
1 changed files with 61 additions and 21 deletions
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
)
|
||||
|
||||
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server")
|
||||
|
@ -271,7 +272,11 @@ func selectorsKey(selectors []Selector) string {
|
|||
|
||||
var (
|
||||
groupWatchersLock sync.Mutex
|
||||
groupWatchers = make(map[string]*groupWatcher)
|
||||
groupWatchers = func() map[string]*groupWatcher {
|
||||
gws := make(map[string]*groupWatcher)
|
||||
go groupWatchersCleaner(gws)
|
||||
return gws
|
||||
}()
|
||||
|
||||
_ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 {
|
||||
groupWatchersLock.Lock()
|
||||
|
@ -281,6 +286,21 @@ var (
|
|||
})
|
||||
)
|
||||
|
||||
func groupWatchersCleaner(gws map[string]*groupWatcher) {
|
||||
for {
|
||||
time.Sleep(7 * time.Second)
|
||||
groupWatchersLock.Lock()
|
||||
for key, gw := range gws {
|
||||
gw.mu.Lock()
|
||||
if len(gw.m) == 0 {
|
||||
delete(gws, key)
|
||||
}
|
||||
gw.mu.Unlock()
|
||||
}
|
||||
groupWatchersLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
type swosByKeyWithLock struct {
|
||||
mu sync.Mutex
|
||||
swosByKey map[string][]interface{}
|
||||
|
@ -380,24 +400,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
|
|||
// This should guarantee that the ScrapeWork objects for these objects are properly updated
|
||||
// as soon as the objects they depend on are updated.
|
||||
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
|
||||
go func() {
|
||||
const minSleepTime = 5 * time.Second
|
||||
sleepTime := minSleepTime
|
||||
for {
|
||||
time.Sleep(sleepTime)
|
||||
startTime := time.Now()
|
||||
gw.mu.Lock()
|
||||
if uw.needRecreateScrapeWorks {
|
||||
uw.needRecreateScrapeWorks = false
|
||||
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
|
||||
sleepTime = time.Since(startTime)
|
||||
if sleepTime < minSleepTime {
|
||||
sleepTime = minSleepTime
|
||||
}
|
||||
}
|
||||
gw.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
go uw.recreateScrapeWorks()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -533,6 +536,34 @@ func (uw *urlWatcher) stopIfNoUsers() {
|
|||
gw.mu.Unlock()
|
||||
}
|
||||
|
||||
func (uw *urlWatcher) recreateScrapeWorks() {
|
||||
const minSleepTime = 5 * time.Second
|
||||
sleepTime := minSleepTime
|
||||
gw := uw.gw
|
||||
stopCh := uw.ctx.Done()
|
||||
for {
|
||||
t := timerpool.Get(sleepTime)
|
||||
select {
|
||||
case <-stopCh:
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
startTime := time.Now()
|
||||
gw.mu.Lock()
|
||||
if uw.needRecreateScrapeWorks {
|
||||
uw.needRecreateScrapeWorks = false
|
||||
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
|
||||
sleepTime = time.Since(startTime)
|
||||
if sleepTime < minSleepTime {
|
||||
sleepTime = minSleepTime
|
||||
}
|
||||
}
|
||||
gw.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) {
|
||||
if _, ok := uw.aws[aw]; !ok {
|
||||
if _, ok := uw.awsPending[aw]; !ok {
|
||||
|
@ -678,10 +709,18 @@ func (uw *urlWatcher) reloadObjects() string {
|
|||
//
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
|
||||
func (uw *urlWatcher) watchForUpdates() {
|
||||
stopCh := uw.ctx.Done()
|
||||
backoffDelay := time.Second
|
||||
maxBackoffDelay := 30 * time.Second
|
||||
backoffSleep := func() {
|
||||
time.Sleep(backoffDelay)
|
||||
t := timerpool.Get(backoffDelay)
|
||||
select {
|
||||
case <-stopCh:
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
backoffDelay *= 2
|
||||
if backoffDelay > maxBackoffDelay {
|
||||
backoffDelay = maxBackoffDelay
|
||||
|
@ -691,10 +730,11 @@ func (uw *urlWatcher) watchForUpdates() {
|
|||
delimiter := getQueryArgsDelimiter(apiURL)
|
||||
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
|
||||
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
|
||||
stopCh := uw.ctx.Done()
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, uw.role)).Dec()
|
||||
logger.Infof("stopped %s watcher for %q", uw.role, uw.apiURL)
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue