lib/promscrape/discovery/kubernetes: synchronously load Kubernetes objects on first access

Remove async registration of apiWatchers, since it breaks discovering `role: endpoints` and `role: endpointslices` targets,
which depend on pod and service objects.

There is no need in reloading `endpoints` and `endpointslices` targets if the referenced `pod` or `service` objects change,
since in this case the corresponding `endpoints` and `endpointslices` objects should also change because they contain
ResourceVersion of the referenced `pod` or `service` objects, which is modified on object update.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182
This commit is contained in:
Aliaksandr Valialkin 2021-04-05 13:53:26 +03:00
parent c8b7c32e23
commit fe084fdd33
3 changed files with 52 additions and 85 deletions

View file

@ -12,12 +12,13 @@
* FEATURE: vmagent: add support for `socks5 over tls` proxy in `proxy_url` config option. It can be set up with the following config: `proxy_url: tls+socks5://proxy-addr:port`.
* FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167).
* FEATURE: vmagent: add AWS IAM roles for tasks support for EC2 service discovery according to [these docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html).
* FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options to `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections.
* FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options in `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections.
* FEATURE: vmagent: pass `X-Prometheus-Scrape-Timeout-Seconds` header to scrape targets as Prometheus does. In this case scrape targets can limit the time needed for performing the scrape. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813118733) for details.
* FEATURE: vmauth: add support for authorization via [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/). See [the docs](https://victoriametrics.github.io/vmauth.html#auth-config) for details.
* BUGFIX: vmagent: properly work with simple HTTP proxies which don't support `CONNECT` method. For example, [PushProx](https://github.com/prometheus-community/PushProx). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179).
* BUGFIX: vmagent: properly discover targets if multiple namespace selectors are put inside `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170).
* BUGFIX: vmagent: properly discover `role: endpoints` and `role: endpointslices` targets in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182).
* BUGFIX: properly generate filename for `*.tar.gz` archive inside `_checksums.txt` file posted at [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1171).

View file

@ -87,7 +87,7 @@ func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][
}
func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
swos := aw.getScrapeWorkObjectsForLabels(labels)
aw.swosByNamespaceLock.Lock()
swosByKey := aw.swosByNamespace[namespace]
if swosByKey == nil {
@ -114,10 +114,10 @@ func (aw *apiWatcher) removeScrapeWorks(namespace, key string) {
aw.swosByNamespaceLock.Unlock()
}
func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} {
func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} {
swos := make([]interface{}, 0, len(labelss))
for _, labels := range labelss {
swo := swcFunc(labels)
swo := aw.swcFunc(labels)
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if swo != nil && !reflect.ValueOf(swo).IsNil() {
swos = append(swos, swo)
@ -253,34 +253,17 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
apiURL := gw.apiServer + path
gw.mu.Lock()
uw := gw.m[apiURL]
if uw == nil {
needStart := uw == nil
if needStart {
uw = newURLWatcher(role, namespaces[i], apiURL, gw)
gw.m[apiURL] = uw
}
gw.mu.Unlock()
uw.subscribeAPIWatcher(aw)
}
}
func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(namespace string, aws []*apiWatcher, objectsByKey map[string]object) {
if len(aws) == 0 {
return
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
for key, o := range objectsByKey {
labels := o.getTargetLabels(gw)
for i, aw := range aws {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[i][key] = swos
}
if needStart {
uw.reloadObjects()
go uw.watchForUpdates()
}
}
for i, aw := range aws {
aw.reloadScrapeWorks(namespace, swosByKey[i])
uw.subscribeAPIWatcher(aw)
}
}
@ -314,15 +297,12 @@ type urlWatcher struct {
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// mu protects aws, awsPending, objectsByKey and resourceVersion
// mu protects aws, objectsByKey and resourceVersion
mu sync.Mutex
// aws contains registered apiWatcher objects
aws map[*apiWatcher]struct{}
// awsPending contains pending apiWatcher objects, which must be moved to aws in a batch
awsPending map[*apiWatcher]struct{}
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
@ -348,7 +328,6 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
parseObjectList: parseObjectList,
aws: make(map[*apiWatcher]struct{}),
awsPending: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
@ -358,8 +337,6 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)),
}
logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
go uw.watchForUpdates()
go uw.processPendingSubscribers()
return uw
}
@ -369,10 +346,16 @@ func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) {
}
uw.mu.Lock()
if _, ok := uw.aws[aw]; !ok {
if _, ok := uw.awsPending[aw]; !ok {
uw.awsPending[aw] = struct{}{}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Inc()
swosByKey := make(map[string][]interface{})
for key, o := range uw.objectsByKey {
labels := o.getTargetLabels(uw.gw)
swos := aw.getScrapeWorkObjectsForLabels(labels)
if len(swos) > 0 {
swosByKey[key] = swos
}
}
aw.reloadScrapeWorks(uw.namespace, swosByKey)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Inc()
}
uw.mu.Unlock()
}
@ -381,43 +364,11 @@ func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
uw.mu.Lock()
if _, ok := uw.aws[aw]; ok {
delete(uw.aws, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Dec()
} else if _, ok := uw.awsPending[aw]; ok {
delete(uw.awsPending, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Dec()
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Dec()
}
uw.mu.Unlock()
}
func (uw *urlWatcher) processPendingSubscribers() {
t := time.NewTicker(time.Second)
for range t.C {
var awsPending []*apiWatcher
var objectsByKey map[string]object
uw.mu.Lock()
if len(uw.awsPending) > 0 {
awsPending = getAPIWatchers(uw.awsPending)
for _, aw := range awsPending {
if _, ok := uw.aws[aw]; ok {
logger.Panicf("BUG: aw=%p already exists in uw.aws", aw)
}
uw.aws[aw] = struct{}{}
delete(uw.awsPending, aw)
}
objectsByKey = make(map[string]object, len(uw.objectsByKey))
for key, o := range uw.objectsByKey {
objectsByKey[key] = o
}
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Add(-len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Add(len(awsPending))
uw.mu.Unlock()
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, awsPending, objectsByKey)
}
}
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.mu.Lock()
uw.resourceVersion = resourceVersion
@ -478,9 +429,31 @@ func (uw *urlWatcher) reloadObjects() string {
aws := getAPIWatchers(uw.aws)
uw.mu.Unlock()
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, aws, objectsByKey)
uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey)
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
return metadata.ResourceVersion
return uw.resourceVersion
}
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) {
if len(aws) == 0 {
return
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
for key, o := range objectsByKey {
labels := o.getTargetLabels(uw.gw)
for i, aw := range aws {
swos := aw.getScrapeWorkObjectsForLabels(labels)
if len(swos) > 0 {
swosByKey[i][key] = swos
}
}
}
for i, aw := range aws {
aw.reloadScrapeWorks(uw.namespace, swosByKey[i])
}
}
func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher {

View file

@ -231,17 +231,11 @@ func (scfg *scrapeConfig) run() {
cfg := <-scfg.cfgCh
var swsPrev []*ScrapeWork
updateScrapeWork := func(cfg *Config) {
for {
startTime := time.Now()
sws := scfg.getScrapeWork(cfg, swsPrev)
retry := sg.update(sws)
swsPrev = sws
scfg.discoveryDuration.UpdateDuration(startTime)
if !retry {
return
}
time.Sleep(2 * time.Second)
}
startTime := time.Now()
sws := scfg.getScrapeWork(cfg, swsPrev)
sg.update(sws)
swsPrev = sws
scfg.discoveryDuration.UpdateDuration(startTime)
}
updateScrapeWork(cfg)
atomic.AddInt32(&PendingScrapeConfigs, -1)
@ -301,7 +295,7 @@ func (sg *scraperGroup) stop() {
sg.wg.Wait()
}
func (sg *scraperGroup) update(sws []*ScrapeWork) (retry bool) {
func (sg *scraperGroup) update(sws []*ScrapeWork) {
sg.mLock.Lock()
defer sg.mLock.Unlock()
@ -358,7 +352,6 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) (retry bool) {
sg.changesCount.Add(additionsCount + deletionsCount)
logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m))
}
return deletionsCount > 0 && len(sg.m) == 0
}
type scraper struct {