diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 289dda00c..75f1096bd 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -12,12 +12,13 @@ * FEATURE: vmagent: add support for `socks5 over tls` proxy in `proxy_url` config option. It can be set up with the following config: `proxy_url: tls+socks5://proxy-addr:port`. * FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167). * FEATURE: vmagent: add AWS IAM roles for tasks support for EC2 service discovery according to [these docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html). -* FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options to `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections. +* FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options in `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections. * FEATURE: vmagent: pass `X-Prometheus-Scrape-Timeout-Seconds` header to scrape targets as Prometheus does. In this case scrape targets can limit the time needed for performing the scrape. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813118733) for details. * FEATURE: vmauth: add support for authorization via [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/). See [the docs](https://victoriametrics.github.io/vmauth.html#auth-config) for details. * BUGFIX: vmagent: properly work with simple HTTP proxies which don't support `CONNECT` method. For example, [PushProx](https://github.com/prometheus-community/PushProx). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179). * BUGFIX: vmagent: properly discover targets if multiple namespace selectors are put inside `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170). +* BUGFIX: vmagent: properly discover `role: endpoints` and `role: endpointslices` targets in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182). * BUGFIX: properly generate filename for `*.tar.gz` archive inside `_checksums.txt` file posted at [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1171). diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index e9f096e72..eb27c1040 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -87,7 +87,7 @@ func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][ } func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) { - swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + swos := aw.getScrapeWorkObjectsForLabels(labels) aw.swosByNamespaceLock.Lock() swosByKey := aw.swosByNamespace[namespace] if swosByKey == nil { @@ -114,10 +114,10 @@ func (aw *apiWatcher) removeScrapeWorks(namespace, key string) { aw.swosByNamespaceLock.Unlock() } -func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} { +func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} { swos := make([]interface{}, 0, len(labelss)) for _, labels := range labelss { - swo := swcFunc(labels) + swo := aw.swcFunc(labels) // The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1 if swo != nil && !reflect.ValueOf(swo).IsNil() { swos = append(swos, swo) @@ -253,34 +253,17 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { apiURL := gw.apiServer + path gw.mu.Lock() uw := gw.m[apiURL] - if uw == nil { + needStart := uw == nil + if needStart { uw = newURLWatcher(role, namespaces[i], apiURL, gw) gw.m[apiURL] = uw } gw.mu.Unlock() - uw.subscribeAPIWatcher(aw) - } -} - -func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(namespace string, aws []*apiWatcher, objectsByKey map[string]object) { - if len(aws) == 0 { - return - } - swosByKey := make([]map[string][]interface{}, len(aws)) - for i := range aws { - swosByKey[i] = make(map[string][]interface{}) - } - for key, o := range objectsByKey { - labels := o.getTargetLabels(gw) - for i, aw := range aws { - swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) - if len(swos) > 0 { - swosByKey[i][key] = swos - } + if needStart { + uw.reloadObjects() + go uw.watchForUpdates() } - } - for i, aw := range aws { - aw.reloadScrapeWorks(namespace, swosByKey[i]) + uw.subscribeAPIWatcher(aw) } } @@ -314,15 +297,12 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc - // mu protects aws, awsPending, objectsByKey and resourceVersion + // mu protects aws, objectsByKey and resourceVersion mu sync.Mutex // aws contains registered apiWatcher objects aws map[*apiWatcher]struct{} - // awsPending contains pending apiWatcher objects, which must be moved to aws in a batch - awsPending map[*apiWatcher]struct{} - // objectsByKey contains the latest state for objects obtained from apiURL objectsByKey map[string]object @@ -348,7 +328,6 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher parseObjectList: parseObjectList, aws: make(map[*apiWatcher]struct{}), - awsPending: make(map[*apiWatcher]struct{}), objectsByKey: make(map[string]object), objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), @@ -358,8 +337,6 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)), } logger.Infof("started %s watcher for %q", uw.role, uw.apiURL) - go uw.watchForUpdates() - go uw.processPendingSubscribers() return uw } @@ -369,10 +346,16 @@ func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) { } uw.mu.Lock() if _, ok := uw.aws[aw]; !ok { - if _, ok := uw.awsPending[aw]; !ok { - uw.awsPending[aw] = struct{}{} - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Inc() + swosByKey := make(map[string][]interface{}) + for key, o := range uw.objectsByKey { + labels := o.getTargetLabels(uw.gw) + swos := aw.getScrapeWorkObjectsForLabels(labels) + if len(swos) > 0 { + swosByKey[key] = swos + } } + aw.reloadScrapeWorks(uw.namespace, swosByKey) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Inc() } uw.mu.Unlock() } @@ -381,43 +364,11 @@ func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { uw.mu.Lock() if _, ok := uw.aws[aw]; ok { delete(uw.aws, aw) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Dec() - } else if _, ok := uw.awsPending[aw]; ok { - delete(uw.awsPending, aw) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Dec() + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Dec() } uw.mu.Unlock() } -func (uw *urlWatcher) processPendingSubscribers() { - t := time.NewTicker(time.Second) - for range t.C { - var awsPending []*apiWatcher - var objectsByKey map[string]object - - uw.mu.Lock() - if len(uw.awsPending) > 0 { - awsPending = getAPIWatchers(uw.awsPending) - for _, aw := range awsPending { - if _, ok := uw.aws[aw]; ok { - logger.Panicf("BUG: aw=%p already exists in uw.aws", aw) - } - uw.aws[aw] = struct{}{} - delete(uw.awsPending, aw) - } - objectsByKey = make(map[string]object, len(uw.objectsByKey)) - for key, o := range uw.objectsByKey { - objectsByKey[key] = o - } - } - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Add(-len(awsPending)) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Add(len(awsPending)) - uw.mu.Unlock() - - uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, awsPending, objectsByKey) - } -} - func (uw *urlWatcher) setResourceVersion(resourceVersion string) { uw.mu.Lock() uw.resourceVersion = resourceVersion @@ -478,9 +429,31 @@ func (uw *urlWatcher) reloadObjects() string { aws := getAPIWatchers(uw.aws) uw.mu.Unlock() - uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, aws, objectsByKey) + uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey) logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) - return metadata.ResourceVersion + return uw.resourceVersion +} + +func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) { + if len(aws) == 0 { + return + } + swosByKey := make([]map[string][]interface{}, len(aws)) + for i := range aws { + swosByKey[i] = make(map[string][]interface{}) + } + for key, o := range objectsByKey { + labels := o.getTargetLabels(uw.gw) + for i, aw := range aws { + swos := aw.getScrapeWorkObjectsForLabels(labels) + if len(swos) > 0 { + swosByKey[i][key] = swos + } + } + } + for i, aw := range aws { + aw.reloadScrapeWorks(uw.namespace, swosByKey[i]) + } } func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher { diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index d02f1732f..861c35c4d 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -231,17 +231,11 @@ func (scfg *scrapeConfig) run() { cfg := <-scfg.cfgCh var swsPrev []*ScrapeWork updateScrapeWork := func(cfg *Config) { - for { - startTime := time.Now() - sws := scfg.getScrapeWork(cfg, swsPrev) - retry := sg.update(sws) - swsPrev = sws - scfg.discoveryDuration.UpdateDuration(startTime) - if !retry { - return - } - time.Sleep(2 * time.Second) - } + startTime := time.Now() + sws := scfg.getScrapeWork(cfg, swsPrev) + sg.update(sws) + swsPrev = sws + scfg.discoveryDuration.UpdateDuration(startTime) } updateScrapeWork(cfg) atomic.AddInt32(&PendingScrapeConfigs, -1) @@ -301,7 +295,7 @@ func (sg *scraperGroup) stop() { sg.wg.Wait() } -func (sg *scraperGroup) update(sws []*ScrapeWork) (retry bool) { +func (sg *scraperGroup) update(sws []*ScrapeWork) { sg.mLock.Lock() defer sg.mLock.Unlock() @@ -358,7 +352,6 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) (retry bool) { sg.changesCount.Add(additionsCount + deletionsCount) logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m)) } - return deletionsCount > 0 && len(sg.m) == 0 } type scraper struct {