lib/promscrape/discovery/kubernetes: synchronously load Kubernetes objects on first access

Remove async registration of apiWatchers, since it breaks discovering `role: endpoints` and `role: endpointslices` targets,
which depend on pod and service objects.

There is no need in reloading `endpoints` and `endpointslices` targets if the referenced `pod` or `service` objects change,
since in this case the corresponding `endpoints` and `endpointslices` objects should also change because they contain
ResourceVersion of the referenced `pod` or `service` objects, which is modified on object update.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182
This commit is contained in:
Aliaksandr Valialkin 2021-04-05 13:53:26 +03:00
parent 2c25b0322b
commit f010d773d6
3 changed files with 52 additions and 85 deletions

View file

@ -12,12 +12,13 @@
* FEATURE: vmagent: add support for `socks5 over tls` proxy in `proxy_url` config option. It can be set up with the following config: `proxy_url: tls+socks5://proxy-addr:port`. * FEATURE: vmagent: add support for `socks5 over tls` proxy in `proxy_url` config option. It can be set up with the following config: `proxy_url: tls+socks5://proxy-addr:port`.
* FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167). * FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167).
* FEATURE: vmagent: add AWS IAM roles for tasks support for EC2 service discovery according to [these docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html). * FEATURE: vmagent: add AWS IAM roles for tasks support for EC2 service discovery according to [these docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html).
* FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options to `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections. * FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options in `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections.
* FEATURE: vmagent: pass `X-Prometheus-Scrape-Timeout-Seconds` header to scrape targets as Prometheus does. In this case scrape targets can limit the time needed for performing the scrape. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813118733) for details. * FEATURE: vmagent: pass `X-Prometheus-Scrape-Timeout-Seconds` header to scrape targets as Prometheus does. In this case scrape targets can limit the time needed for performing the scrape. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813118733) for details.
* FEATURE: vmauth: add support for authorization via [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/). See [the docs](https://victoriametrics.github.io/vmauth.html#auth-config) for details. * FEATURE: vmauth: add support for authorization via [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/). See [the docs](https://victoriametrics.github.io/vmauth.html#auth-config) for details.
* BUGFIX: vmagent: properly work with simple HTTP proxies which don't support `CONNECT` method. For example, [PushProx](https://github.com/prometheus-community/PushProx). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179). * BUGFIX: vmagent: properly work with simple HTTP proxies which don't support `CONNECT` method. For example, [PushProx](https://github.com/prometheus-community/PushProx). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179).
* BUGFIX: vmagent: properly discover targets if multiple namespace selectors are put inside `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170). * BUGFIX: vmagent: properly discover targets if multiple namespace selectors are put inside `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170).
* BUGFIX: vmagent: properly discover `role: endpoints` and `role: endpointslices` targets in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182).
* BUGFIX: properly generate filename for `*.tar.gz` archive inside `_checksums.txt` file posted at [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1171). * BUGFIX: properly generate filename for `*.tar.gz` archive inside `_checksums.txt` file posted at [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1171).

View file

@ -87,7 +87,7 @@ func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][
} }
func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) { func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) swos := aw.getScrapeWorkObjectsForLabels(labels)
aw.swosByNamespaceLock.Lock() aw.swosByNamespaceLock.Lock()
swosByKey := aw.swosByNamespace[namespace] swosByKey := aw.swosByNamespace[namespace]
if swosByKey == nil { if swosByKey == nil {
@ -114,10 +114,10 @@ func (aw *apiWatcher) removeScrapeWorks(namespace, key string) {
aw.swosByNamespaceLock.Unlock() aw.swosByNamespaceLock.Unlock()
} }
func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} { func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} {
swos := make([]interface{}, 0, len(labelss)) swos := make([]interface{}, 0, len(labelss))
for _, labels := range labelss { for _, labels := range labelss {
swo := swcFunc(labels) swo := aw.swcFunc(labels)
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1 // The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if swo != nil && !reflect.ValueOf(swo).IsNil() { if swo != nil && !reflect.ValueOf(swo).IsNil() {
swos = append(swos, swo) swos = append(swos, swo)
@ -253,34 +253,17 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
apiURL := gw.apiServer + path apiURL := gw.apiServer + path
gw.mu.Lock() gw.mu.Lock()
uw := gw.m[apiURL] uw := gw.m[apiURL]
if uw == nil { needStart := uw == nil
if needStart {
uw = newURLWatcher(role, namespaces[i], apiURL, gw) uw = newURLWatcher(role, namespaces[i], apiURL, gw)
gw.m[apiURL] = uw gw.m[apiURL] = uw
} }
gw.mu.Unlock() gw.mu.Unlock()
uw.subscribeAPIWatcher(aw) if needStart {
} uw.reloadObjects()
} go uw.watchForUpdates()
func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(namespace string, aws []*apiWatcher, objectsByKey map[string]object) {
if len(aws) == 0 {
return
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
for key, o := range objectsByKey {
labels := o.getTargetLabels(gw)
for i, aw := range aws {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[i][key] = swos
}
} }
} uw.subscribeAPIWatcher(aw)
for i, aw := range aws {
aw.reloadScrapeWorks(namespace, swosByKey[i])
} }
} }
@ -314,15 +297,12 @@ type urlWatcher struct {
parseObject parseObjectFunc parseObject parseObjectFunc
parseObjectList parseObjectListFunc parseObjectList parseObjectListFunc
// mu protects aws, awsPending, objectsByKey and resourceVersion // mu protects aws, objectsByKey and resourceVersion
mu sync.Mutex mu sync.Mutex
// aws contains registered apiWatcher objects // aws contains registered apiWatcher objects
aws map[*apiWatcher]struct{} aws map[*apiWatcher]struct{}
// awsPending contains pending apiWatcher objects, which must be moved to aws in a batch
awsPending map[*apiWatcher]struct{}
// objectsByKey contains the latest state for objects obtained from apiURL // objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object objectsByKey map[string]object
@ -348,7 +328,6 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
parseObjectList: parseObjectList, parseObjectList: parseObjectList,
aws: make(map[*apiWatcher]struct{}), aws: make(map[*apiWatcher]struct{}),
awsPending: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object), objectsByKey: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
@ -358,8 +337,6 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)), staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)),
} }
logger.Infof("started %s watcher for %q", uw.role, uw.apiURL) logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
go uw.watchForUpdates()
go uw.processPendingSubscribers()
return uw return uw
} }
@ -369,10 +346,16 @@ func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) {
} }
uw.mu.Lock() uw.mu.Lock()
if _, ok := uw.aws[aw]; !ok { if _, ok := uw.aws[aw]; !ok {
if _, ok := uw.awsPending[aw]; !ok { swosByKey := make(map[string][]interface{})
uw.awsPending[aw] = struct{}{} for key, o := range uw.objectsByKey {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Inc() labels := o.getTargetLabels(uw.gw)
swos := aw.getScrapeWorkObjectsForLabels(labels)
if len(swos) > 0 {
swosByKey[key] = swos
}
} }
aw.reloadScrapeWorks(uw.namespace, swosByKey)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Inc()
} }
uw.mu.Unlock() uw.mu.Unlock()
} }
@ -381,43 +364,11 @@ func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
uw.mu.Lock() uw.mu.Lock()
if _, ok := uw.aws[aw]; ok { if _, ok := uw.aws[aw]; ok {
delete(uw.aws, aw) delete(uw.aws, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Dec() metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Dec()
} else if _, ok := uw.awsPending[aw]; ok {
delete(uw.awsPending, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Dec()
} }
uw.mu.Unlock() uw.mu.Unlock()
} }
func (uw *urlWatcher) processPendingSubscribers() {
t := time.NewTicker(time.Second)
for range t.C {
var awsPending []*apiWatcher
var objectsByKey map[string]object
uw.mu.Lock()
if len(uw.awsPending) > 0 {
awsPending = getAPIWatchers(uw.awsPending)
for _, aw := range awsPending {
if _, ok := uw.aws[aw]; ok {
logger.Panicf("BUG: aw=%p already exists in uw.aws", aw)
}
uw.aws[aw] = struct{}{}
delete(uw.awsPending, aw)
}
objectsByKey = make(map[string]object, len(uw.objectsByKey))
for key, o := range uw.objectsByKey {
objectsByKey[key] = o
}
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Add(-len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Add(len(awsPending))
uw.mu.Unlock()
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, awsPending, objectsByKey)
}
}
func (uw *urlWatcher) setResourceVersion(resourceVersion string) { func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.mu.Lock() uw.mu.Lock()
uw.resourceVersion = resourceVersion uw.resourceVersion = resourceVersion
@ -478,9 +429,31 @@ func (uw *urlWatcher) reloadObjects() string {
aws := getAPIWatchers(uw.aws) aws := getAPIWatchers(uw.aws)
uw.mu.Unlock() uw.mu.Unlock()
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, aws, objectsByKey) uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey)
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
return metadata.ResourceVersion return uw.resourceVersion
}
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) {
if len(aws) == 0 {
return
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
for key, o := range objectsByKey {
labels := o.getTargetLabels(uw.gw)
for i, aw := range aws {
swos := aw.getScrapeWorkObjectsForLabels(labels)
if len(swos) > 0 {
swosByKey[i][key] = swos
}
}
}
for i, aw := range aws {
aw.reloadScrapeWorks(uw.namespace, swosByKey[i])
}
} }
func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher { func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher {

View file

@ -231,17 +231,11 @@ func (scfg *scrapeConfig) run() {
cfg := <-scfg.cfgCh cfg := <-scfg.cfgCh
var swsPrev []*ScrapeWork var swsPrev []*ScrapeWork
updateScrapeWork := func(cfg *Config) { updateScrapeWork := func(cfg *Config) {
for { startTime := time.Now()
startTime := time.Now() sws := scfg.getScrapeWork(cfg, swsPrev)
sws := scfg.getScrapeWork(cfg, swsPrev) sg.update(sws)
retry := sg.update(sws) swsPrev = sws
swsPrev = sws scfg.discoveryDuration.UpdateDuration(startTime)
scfg.discoveryDuration.UpdateDuration(startTime)
if !retry {
return
}
time.Sleep(2 * time.Second)
}
} }
updateScrapeWork(cfg) updateScrapeWork(cfg)
atomic.AddInt32(&PendingScrapeConfigs, -1) atomic.AddInt32(&PendingScrapeConfigs, -1)
@ -301,7 +295,7 @@ func (sg *scraperGroup) stop() {
sg.wg.Wait() sg.wg.Wait()
} }
func (sg *scraperGroup) update(sws []*ScrapeWork) (retry bool) { func (sg *scraperGroup) update(sws []*ScrapeWork) {
sg.mLock.Lock() sg.mLock.Lock()
defer sg.mLock.Unlock() defer sg.mLock.Unlock()
@ -358,7 +352,6 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) (retry bool) {
sg.changesCount.Add(additionsCount + deletionsCount) sg.changesCount.Add(additionsCount + deletionsCount)
logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m)) logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m))
} }
return deletionsCount > 0 && len(sg.m) == 0
} }
type scraper struct { type scraper struct {