lib/promscrape/discovery/kubernetes: reduce CPU time spent on registering big number of Kubernetes objects shared among big number of scrape jobs

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182
This commit is contained in:
Aliaksandr Valialkin 2021-04-05 22:02:09 +03:00
parent a51d0ec6ec
commit b46194472f
5 changed files with 114 additions and 52 deletions

View file

@ -60,6 +60,15 @@ type Config struct {
baseDir string
}
func (cfg *Config) mustStart() {
startTime := time.Now()
logger.Infof("starting service discovery routines...")
for i := range cfg.ScrapeConfigs {
cfg.ScrapeConfigs[i].mustStart(cfg.baseDir)
}
logger.Infof("started service discovery routines in %.3f seconds", time.Since(startTime).Seconds())
}
func (cfg *Config) mustStop() {
startTime := time.Now()
logger.Infof("stopping service discovery routines...")
@ -120,6 +129,21 @@ type ScrapeConfig struct {
swc *scrapeWorkConfig
}
func (sc *ScrapeConfig) mustStart(baseDir string) {
for i := range sc.KubernetesSDConfigs {
swosFunc := func(metaLabels map[string]string) interface{} {
target := metaLabels["__address__"]
sw, err := sc.swc.getScrapeWork(target, nil, metaLabels)
if err != nil {
logger.Errorf("cannot create kubernetes_sd_config target %q for job_name %q: %s", target, sc.swc.jobName, err)
return nil
}
return sw
}
sc.KubernetesSDConfigs[i].MustStart(baseDir, swosFunc)
}
}
func (sc *ScrapeConfig) mustStop() {
for i := range sc.KubernetesSDConfigs {
sc.KubernetesSDConfigs[i].MustStop()
@ -243,15 +267,7 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
ok := true
for j := range sc.KubernetesSDConfigs {
sdc := &sc.KubernetesSDConfigs[j]
swos, err := sdc.GetScrapeWorkObjects(cfg.baseDir, func(metaLabels map[string]string) interface{} {
target := metaLabels["__address__"]
sw, err := sc.swc.getScrapeWork(target, nil, metaLabels)
if err != nil {
logger.Errorf("cannot create kubernetes_sd_config target %q for job_name %q: %s", target, sc.swc.jobName, err)
return nil
}
return sw
})
swos, err := sdc.GetScrapeWorkObjects()
if err != nil {
logger.Errorf("skipping kubernetes_sd_config targets for job_name %q because of error: %s", sc.swc.jobName, err)
ok = false

View file

@ -7,7 +7,6 @@ import (
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// apiConfig contains config for API server
@ -15,18 +14,12 @@ type apiConfig struct {
aw *apiWatcher
}
func (ac *apiConfig) mustStop() {
ac.aw.mustStop()
func (ac *apiConfig) mustStart() {
ac.aw.mustStart()
}
var configMap = discoveryutils.NewConfigMap()
func getAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir, swcFunc) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
func (ac *apiConfig) mustStop() {
ac.aw.mustStop()
}
func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) {

View file

@ -72,6 +72,10 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
}
}
func (aw *apiWatcher) mustStart() {
aw.gw.startWatchersForRole(aw.role, aw)
}
func (aw *apiWatcher) mustStop() {
aw.gw.unsubscribeAPIWatcher(aw)
aw.swosByNamespaceLock.Lock()
@ -128,7 +132,8 @@ func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string)
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
aw.gw.startWatchersForRole(aw.role, aw)
aw.gw.registerPendingAPIWatchers()
aw.swosByNamespaceLock.Lock()
defer aw.swosByNamespaceLock.Unlock()
@ -272,10 +277,8 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object {
key := namespace + "/" + name
gw.startWatchersForRole(role, nil)
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
uws := gw.getURLWatchers()
for _, uw := range uws {
if uw.role != role {
// Role mismatch
continue
@ -310,7 +313,9 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
uw.reloadObjects()
go uw.watchForUpdates()
}
uw.subscribeAPIWatcher(aw)
if aw != nil {
uw.subscribeAPIWatcher(aw)
}
}
}
@ -326,12 +331,28 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
return gw.client.Do(req)
}
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
func (gw *groupWatcher) registerPendingAPIWatchers() {
uws := gw.getURLWatchers()
for _, uw := range uws {
uw.registerPendingAPIWatchers()
}
}
func (gw *groupWatcher) getURLWatchers() []*urlWatcher {
gw.mu.Lock()
uws := make([]*urlWatcher, 0, len(gw.m))
for _, uw := range gw.m {
uw.unsubscribeAPIWatcher(aw)
uws = append(uws, uw)
}
gw.mu.Unlock()
return uws
}
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
uws := gw.getURLWatchers()
for _, uw := range uws {
uw.unsubscribeAPIWatcher(aw)
}
}
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
@ -344,9 +365,16 @@ type urlWatcher struct {
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// mu protects aws, objectsByKey and resourceVersion
// mu protects aws, awsPending, objectsByKey and resourceVersion
mu sync.Mutex
// awsPending contains pending apiWatcher objects, which are registered in a batch.
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
// shared among big number of scrape jobs, since per-object labels are generated only once
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
// See reloadScrapeWorksForAPIWatchers for details.
awsPending map[*apiWatcher]struct{}
// aws contains registered apiWatcher objects
aws map[*apiWatcher]struct{}
@ -374,6 +402,7 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
parseObject: parseObject,
parseObjectList: parseObjectList,
awsPending: make(map[*apiWatcher]struct{}),
aws: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object),
@ -388,30 +417,38 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
}
func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) {
if aw == nil {
return
}
uw.mu.Lock()
if _, ok := uw.aws[aw]; !ok {
swosByKey := make(map[string][]interface{})
for key, o := range uw.objectsByKey {
labels := o.getTargetLabels(uw.gw)
swos := aw.getScrapeWorkObjectsForLabels(labels)
if len(swos) > 0 {
swosByKey[key] = swos
}
if _, ok := uw.awsPending[aw]; !ok {
uw.awsPending[aw] = struct{}{}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc()
}
aw.reloadScrapeWorks(uw.namespace, swosByKey)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Inc()
}
uw.mu.Unlock()
}
func (uw *urlWatcher) registerPendingAPIWatchers() {
uw.mu.Lock()
awsPending := make([]*apiWatcher, 0, len(uw.awsPending))
for aw := range uw.awsPending {
awsPending = append(awsPending, aw)
delete(uw.awsPending, aw)
}
uw.reloadScrapeWorksForAPIWatchers(awsPending, uw.objectsByKey)
uw.mu.Unlock()
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending))
}
func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
uw.mu.Lock()
if _, ok := uw.awsPending[aw]; ok {
delete(uw.awsPending, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec()
}
if _, ok := uw.aws[aw]; ok {
delete(uw.aws, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q}`, uw.role)).Dec()
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec()
}
uw.mu.Unlock()
}

View file

@ -17,6 +17,9 @@ type SDConfig struct {
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
Namespaces Namespaces `yaml:"namespaces,omitempty"`
Selectors []Selector `yaml:"selectors,omitempty"`
cfg *apiConfig
startErr error
}
// Namespaces represents namespaces for SDConfig
@ -37,23 +40,33 @@ type Selector struct {
// ScrapeWorkConstructorFunc must construct ScrapeWork object for the given metaLabels.
type ScrapeWorkConstructorFunc func(metaLabels map[string]string) interface{}
// GetScrapeWorkObjects returns ScrapeWork objects for the given sdc and baseDir.
// GetScrapeWorkObjects returns ScrapeWork objects for the given sdc.
//
// This function must be called after MustStart call.
func (sdc *SDConfig) GetScrapeWorkObjects() ([]interface{}, error) {
if sdc.cfg == nil {
return nil, sdc.startErr
}
return sdc.cfg.aw.getScrapeWorkObjects(), nil
}
// MustStart initializes sdc before its usage.
//
// swcFunc is used for constructing such objects.
func (sdc *SDConfig) GetScrapeWorkObjects(baseDir string, swcFunc ScrapeWorkConstructorFunc) ([]interface{}, error) {
cfg, err := getAPIConfig(sdc, baseDir, swcFunc)
func (sdc *SDConfig) MustStart(baseDir string, swcFunc ScrapeWorkConstructorFunc) {
cfg, err := newAPIConfig(sdc, baseDir, swcFunc)
if err != nil {
return nil, fmt.Errorf("cannot create API config: %w", err)
sdc.startErr = fmt.Errorf("cannot create API config for kubernetes: %w", err)
return
}
return cfg.aw.getScrapeWorkObjects(), nil
cfg.aw.mustStart()
sdc.cfg = cfg
}
// MustStop stops further usage for sdc.
func (sdc *SDConfig) MustStop() {
v := configMap.Delete(sdc)
if v != nil {
// v can be nil if GetLabels wasn't called yet.
cfg := v.(*apiConfig)
cfg.mustStop()
if sdc.cfg != nil {
// sdc.cfg can be nil on MustStart error.
sdc.cfg.mustStop()
}
}

View file

@ -93,6 +93,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
if err != nil {
logger.Fatalf("cannot read %q: %s", configFile, err)
}
cfg.mustStart()
scs := newScrapeConfigs(pushData)
scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
@ -130,6 +131,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
goto waitForChans
}
cfg.mustStop()
cfgNew.mustStart()
cfg = cfgNew
data = dataNew
case <-tickerCh:
@ -143,6 +145,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
goto waitForChans
}
cfg.mustStop()
cfgNew.mustStart()
cfg = cfgNew
data = dataNew
case <-globalStopCh: