diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index b4a8d5d2ce..1f93c4be57 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -60,6 +60,15 @@ type Config struct { baseDir string } +func (cfg *Config) mustStart() { + startTime := time.Now() + logger.Infof("starting service discovery routines...") + for i := range cfg.ScrapeConfigs { + cfg.ScrapeConfigs[i].mustStart(cfg.baseDir) + } + logger.Infof("started service discovery routines in %.3f seconds", time.Since(startTime).Seconds()) +} + func (cfg *Config) mustStop() { startTime := time.Now() logger.Infof("stopping service discovery routines...") @@ -120,6 +129,21 @@ type ScrapeConfig struct { swc *scrapeWorkConfig } +func (sc *ScrapeConfig) mustStart(baseDir string) { + for i := range sc.KubernetesSDConfigs { + swosFunc := func(metaLabels map[string]string) interface{} { + target := metaLabels["__address__"] + sw, err := sc.swc.getScrapeWork(target, nil, metaLabels) + if err != nil { + logger.Errorf("cannot create kubernetes_sd_config target %q for job_name %q: %s", target, sc.swc.jobName, err) + return nil + } + return sw + } + sc.KubernetesSDConfigs[i].MustStart(baseDir, swosFunc) + } +} + func (sc *ScrapeConfig) mustStop() { for i := range sc.KubernetesSDConfigs { sc.KubernetesSDConfigs[i].MustStop() @@ -243,15 +267,7 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { ok := true for j := range sc.KubernetesSDConfigs { sdc := &sc.KubernetesSDConfigs[j] - swos, err := sdc.GetScrapeWorkObjects(cfg.baseDir, func(metaLabels map[string]string) interface{} { - target := metaLabels["__address__"] - sw, err := sc.swc.getScrapeWork(target, nil, metaLabels) - if err != nil { - logger.Errorf("cannot create kubernetes_sd_config target %q for job_name %q: %s", target, sc.swc.jobName, err) - return nil - } - return sw - }) + swos, err := sdc.GetScrapeWorkObjects() if err != nil { logger.Errorf("skipping kubernetes_sd_config targets for job_name %q because of error: %s", sc.swc.jobName, err) ok = false diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index a092210877..8afa6b6136 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) // apiConfig contains config for API server @@ -15,18 +14,12 @@ type apiConfig struct { aw *apiWatcher } -func (ac *apiConfig) mustStop() { - ac.aw.mustStop() +func (ac *apiConfig) mustStart() { + ac.aw.mustStart() } -var configMap = discoveryutils.NewConfigMap() - -func getAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) { - v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir, swcFunc) }) - if err != nil { - return nil, err - } - return v.(*apiConfig), nil +func (ac *apiConfig) mustStop() { + ac.aw.mustStop() } func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) { diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index aa12daad59..c78b334f8b 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -72,6 +72,10 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc } } +func (aw *apiWatcher) mustStart() { + aw.gw.startWatchersForRole(aw.role, aw) +} + func (aw *apiWatcher) mustStop() { aw.gw.unsubscribeAPIWatcher(aw) aw.swosByNamespaceLock.Lock() @@ -128,7 +132,8 @@ func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) // getScrapeWorkObjects returns all the ScrapeWork objects for the given aw. func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { - aw.gw.startWatchersForRole(aw.role, aw) + aw.gw.registerPendingAPIWatchers() + aw.swosByNamespaceLock.Lock() defer aw.swosByNamespaceLock.Unlock() @@ -272,10 +277,8 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object { key := namespace + "/" + name gw.startWatchersForRole(role, nil) - gw.mu.Lock() - defer gw.mu.Unlock() - - for _, uw := range gw.m { + uws := gw.getURLWatchers() + for _, uw := range uws { if uw.role != role { // Role mismatch continue @@ -310,7 +313,9 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { uw.reloadObjects() go uw.watchForUpdates() } - uw.subscribeAPIWatcher(aw) + if aw != nil { + uw.subscribeAPIWatcher(aw) + } } } @@ -326,12 +331,28 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { return gw.client.Do(req) } -func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { +func (gw *groupWatcher) registerPendingAPIWatchers() { + uws := gw.getURLWatchers() + for _, uw := range uws { + uw.registerPendingAPIWatchers() + } +} + +func (gw *groupWatcher) getURLWatchers() []*urlWatcher { gw.mu.Lock() + uws := make([]*urlWatcher, 0, len(gw.m)) for _, uw := range gw.m { - uw.unsubscribeAPIWatcher(aw) + uws = append(uws, uw) } gw.mu.Unlock() + return uws +} + +func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { + uws := gw.getURLWatchers() + for _, uw := range uws { + uw.unsubscribeAPIWatcher(aw) + } } // urlWatcher watches for an apiURL and updates object states in objectsByKey. @@ -344,9 +365,16 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc - // mu protects aws, objectsByKey and resourceVersion + // mu protects aws, awsPending, objectsByKey and resourceVersion mu sync.Mutex + // awsPending contains pending apiWatcher objects, which are registered in a batch. + // Batch registering saves CPU time needed for registering big number of Kubernetes objects + // shared among big number of scrape jobs, since per-object labels are generated only once + // for all the scrape jobs (each scrape job is associated with a single apiWatcher). + // See reloadScrapeWorksForAPIWatchers for details. + awsPending map[*apiWatcher]struct{} + // aws contains registered apiWatcher objects aws map[*apiWatcher]struct{} @@ -374,6 +402,7 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher parseObject: parseObject, parseObjectList: parseObjectList, + awsPending: make(map[*apiWatcher]struct{}), aws: make(map[*apiWatcher]struct{}), objectsByKey: make(map[string]object), @@ -388,30 +417,38 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher } func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) { - if aw == nil { - return - } uw.mu.Lock() if _, ok := uw.aws[aw]; !ok { - swosByKey := make(map[string][]interface{}) - for key, o := range uw.objectsByKey { - labels := o.getTargetLabels(uw.gw) - swos := aw.getScrapeWorkObjectsForLabels(labels) - if len(swos) > 0 { - swosByKey[key] = swos - } + if _, ok := uw.awsPending[aw]; !ok { + uw.awsPending[aw] = struct{}{} + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc() } - aw.reloadScrapeWorks(uw.namespace, swosByKey) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Inc() } uw.mu.Unlock() } +func (uw *urlWatcher) registerPendingAPIWatchers() { + uw.mu.Lock() + awsPending := make([]*apiWatcher, 0, len(uw.awsPending)) + for aw := range uw.awsPending { + awsPending = append(awsPending, aw) + delete(uw.awsPending, aw) + } + uw.reloadScrapeWorksForAPIWatchers(awsPending, uw.objectsByKey) + uw.mu.Unlock() + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending)) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending)) +} + func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { uw.mu.Lock() + if _, ok := uw.awsPending[aw]; ok { + delete(uw.awsPending, aw) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec() + } if _, ok := uw.aws[aw]; ok { delete(uw.aws, aw) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Dec() + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec() } uw.mu.Unlock() } diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index 9315730f26..28160743bc 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -17,6 +17,9 @@ type SDConfig struct { ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` Namespaces Namespaces `yaml:"namespaces,omitempty"` Selectors []Selector `yaml:"selectors,omitempty"` + + cfg *apiConfig + startErr error } // Namespaces represents namespaces for SDConfig @@ -37,23 +40,33 @@ type Selector struct { // ScrapeWorkConstructorFunc must construct ScrapeWork object for the given metaLabels. type ScrapeWorkConstructorFunc func(metaLabels map[string]string) interface{} -// GetScrapeWorkObjects returns ScrapeWork objects for the given sdc and baseDir. +// GetScrapeWorkObjects returns ScrapeWork objects for the given sdc. +// +// This function must be called after MustStart call. +func (sdc *SDConfig) GetScrapeWorkObjects() ([]interface{}, error) { + if sdc.cfg == nil { + return nil, sdc.startErr + } + return sdc.cfg.aw.getScrapeWorkObjects(), nil +} + +// MustStart initializes sdc before its usage. // // swcFunc is used for constructing such objects. -func (sdc *SDConfig) GetScrapeWorkObjects(baseDir string, swcFunc ScrapeWorkConstructorFunc) ([]interface{}, error) { - cfg, err := getAPIConfig(sdc, baseDir, swcFunc) +func (sdc *SDConfig) MustStart(baseDir string, swcFunc ScrapeWorkConstructorFunc) { + cfg, err := newAPIConfig(sdc, baseDir, swcFunc) if err != nil { - return nil, fmt.Errorf("cannot create API config: %w", err) + sdc.startErr = fmt.Errorf("cannot create API config for kubernetes: %w", err) + return } - return cfg.aw.getScrapeWorkObjects(), nil + cfg.aw.mustStart() + sdc.cfg = cfg } // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - v := configMap.Delete(sdc) - if v != nil { - // v can be nil if GetLabels wasn't called yet. - cfg := v.(*apiConfig) - cfg.mustStop() + if sdc.cfg != nil { + // sdc.cfg can be nil on MustStart error. + sdc.cfg.mustStop() } } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 861c35c4d1..7dd4548916 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -93,6 +93,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) if err != nil { logger.Fatalf("cannot read %q: %s", configFile, err) } + cfg.mustStart() scs := newScrapeConfigs(pushData) scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() }) @@ -130,6 +131,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) goto waitForChans } cfg.mustStop() + cfgNew.mustStart() cfg = cfgNew data = dataNew case <-tickerCh: @@ -143,6 +145,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) goto waitForChans } cfg.mustStop() + cfgNew.mustStart() cfg = cfgNew data = dataNew case <-globalStopCh: