From f9c1fe38520bcf5b259729148dea515e25448754 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 2 Mar 2021 16:42:48 +0200 Subject: [PATCH] lib/promscrape/discovery/kubernetes: cache ScrapeWork objects as soon as the corresponding k8s objects are changed This should reduce CPU usage and memory usage when Kubernetes contains tens of thousands of objects --- lib/promscrape/config.go | 116 +++++------------- lib/promscrape/discovery/kubernetes/api.go | 65 +++++++--- .../discovery/kubernetes/kubernetes.go | 13 +- 3 files changed, 83 insertions(+), 111 deletions(-) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 5318bd9af5..38b031677d 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -14,7 +14,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -241,10 +240,23 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { ok := true for j := range sc.KubernetesSDConfigs { sdc := &sc.KubernetesSDConfigs[j] - var okLocal bool - dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "kubernetes_sd_config") - if ok { - ok = okLocal + swos, err := sdc.GetScrapeWorkObjects(cfg.baseDir, func(metaLabels map[string]string) interface{} { + target := metaLabels["__address__"] + sw, err := sc.swc.getScrapeWork(target, nil, metaLabels) + if err != nil { + logger.Errorf("cannot create kubernetes_sd_config target target %q for job_name %q: %s", target, sc.swc.jobName, err) + return nil + } + return sw + }) + if err != nil { + logger.Errorf("skipping kubernetes_sd_config targets for job_name %q because of error: %s", sc.swc.jobName, err) + ok = false + break + } + for _, swo := range swos { + sw := swo.(*ScrapeWork) + dst = append(dst, sw) } } if ok { @@ -252,7 +264,7 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { } swsPrev := swsPrevByJob[sc.swc.jobName] if len(swsPrev) > 0 { - logger.Errorf("there were errors when discovering kubernetes targets for job %q, so preserving the previous targets", sc.swc.jobName) + logger.Errorf("there were errors when discovering kubernetes_sd_config targets for job %q, so preserving the previous targets", sc.swc.jobName) dst = append(dst[:dstLen], swsPrev...) } } @@ -555,8 +567,6 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf disableKeepAlive: sc.DisableKeepAlive, streamParse: sc.StreamParse, scrapeAlignInterval: sc.ScrapeAlignInterval, - - cache: newScrapeWorkCache(), } return swc, nil } @@ -580,64 +590,6 @@ type scrapeWorkConfig struct { disableKeepAlive bool streamParse bool scrapeAlignInterval time.Duration - - cache *scrapeWorkCache -} - -type scrapeWorkCache struct { - mu sync.Mutex - m map[string]*scrapeWorkEntry - lastCleanupTime uint64 -} - -type scrapeWorkEntry struct { - sw *ScrapeWork - lastAccessTime uint64 -} - -func newScrapeWorkCache() *scrapeWorkCache { - return &scrapeWorkCache{ - m: make(map[string]*scrapeWorkEntry), - } -} - -func (swc *scrapeWorkCache) Get(key string) *ScrapeWork { - scrapeWorkCacheRequests.Inc() - currentTime := fasttime.UnixTimestamp() - swc.mu.Lock() - swe := swc.m[key] - if swe != nil { - swe.lastAccessTime = currentTime - } - swc.mu.Unlock() - if swe == nil { - return nil - } - scrapeWorkCacheHits.Inc() - return swe.sw -} - -var ( - scrapeWorkCacheRequests = metrics.NewCounter(`vm_promscrape_scrapework_cache_requests_total`) - scrapeWorkCacheHits = metrics.NewCounter(`vm_promscrape_scrapework_cache_hits_total`) -) - -func (swc *scrapeWorkCache) Set(key string, sw *ScrapeWork) { - currentTime := fasttime.UnixTimestamp() - swc.mu.Lock() - swc.m[key] = &scrapeWorkEntry{ - sw: sw, - lastAccessTime: currentTime, - } - if currentTime > swc.lastCleanupTime+10*60 { - for k, swe := range swc.m { - if currentTime > swe.lastAccessTime+2*60 { - delete(swc.m, k) - } - } - swc.lastCleanupTime = currentTime - } - swc.mu.Unlock() } type targetLabelsGetter interface { @@ -761,26 +713,6 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf return dst } -func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { - bb := scrapeWorkKeyBufPool.Get() - defer scrapeWorkKeyBufPool.Put(bb) - bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels) - keyStrUnsafe := bytesutil.ToUnsafeString(bb.B) - if needSkipScrapeWork(keyStrUnsafe) { - return nil, nil - } - if sw := swc.cache.Get(keyStrUnsafe); sw != nil { - return sw, nil - } - sw, err := swc.getScrapeWorkReal(target, extraLabels, metaLabels) - if err == nil { - swc.cache.Set(string(bb.B), sw) - } - return sw, err -} - -var scrapeWorkKeyBufPool bytesutil.ByteBufferPool - func appendScrapeWorkKey(dst []byte, target string, extraLabels, metaLabels map[string]string) []byte { dst = append(dst, target...) dst = append(dst, ',') @@ -814,7 +746,17 @@ func appendSortedKeyValuePairs(dst []byte, m map[string]string) []byte { return dst } -func (swc *scrapeWorkConfig) getScrapeWorkReal(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { +var scrapeWorkKeyBufPool bytesutil.ByteBufferPool + +func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { + // Verify whether the scrape work must be skipped. + bb := scrapeWorkKeyBufPool.Get() + defer scrapeWorkKeyBufPool.Put(bb) + bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels) + if needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B)) { + return nil, nil + } + labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params) var originalLabels []prompbmarshal.Label if !*dropOriginalLabels { diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index 97507d75d9..7a517bfd66 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -36,15 +36,15 @@ func (ac *apiConfig) mustStop() { var configMap = discoveryutils.NewConfigMap() -func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { - v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) }) +func getAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) { + v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir, swcFunc) }) if err != nil { return nil, err } return v.(*apiConfig), nil } -func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { +func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) { ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) if err != nil { return nil, fmt.Errorf("cannot parse auth config: %w", err) @@ -97,7 +97,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { }, Timeout: *apiServerTimeout, } - aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors) + aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors, swcFunc) cfg := &apiConfig{ aw: aw, } @@ -141,6 +141,9 @@ type apiWatcher struct { // Selectors to apply during watch selectors []Selector + // Constructor for creating ScrapeWork objects from labels. + swcFunc ScrapeWorkConstructorFunc + // mu protects watchersByURL mu sync.Mutex @@ -157,7 +160,7 @@ func (aw *apiWatcher) mustStop() { aw.wg.Wait() } -func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher { +func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher { stopCtx, stopFunc := context.WithCancel(context.Background()) return &apiWatcher{ apiServer: apiServer, @@ -165,6 +168,7 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa client: client, namespaces: namespaces, selectors: selectors, + swcFunc: swcFunc, watchersByURL: make(map[string]*urlWatcher), @@ -173,23 +177,23 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa } } -// getLabelsForRole returns all the sets of labels for the given role. -func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string { +// getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role. +func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} { aw.startWatchersForRole(role) - var ms []map[string]string + var swos []interface{} aw.mu.Lock() for _, uw := range aw.watchersByURL { if uw.role != role { continue } uw.mu.Lock() - for _, labels := range uw.labelsByKey { - ms = append(ms, labels...) + for _, swosLocal := range uw.swosByKey { + swos = append(swos, swosLocal...) } uw.mu.Unlock() } aw.mu.Unlock() - return ms + return swos } // getObjectByRole returns an object with the given (namespace, name) key and the given role. @@ -288,12 +292,12 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc - // mu protects objectsByKey and labelsByKey + // mu protects objectsByKey and swosByKey mu sync.Mutex // objectsByKey contains the latest state for objects obtained from apiURL objectsByKey map[string]object - labelsByKey map[string][]map[string]string + swosByKey map[string][]interface{} // the parent apiWatcher aw *apiWatcher @@ -316,7 +320,7 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject parseObjectList: parseObjectList, objectsByKey: make(map[string]object), - labelsByKey: make(map[string][]map[string]string), + swosByKey: make(map[string][]interface{}), aw: aw, @@ -354,20 +358,35 @@ func (uw *urlWatcher) reloadObjects() string { } return "" } - labelsByKey := make(map[string][]map[string]string, len(objectsByKey)) + swosByKey := make(map[string][]interface{}, len(objectsByKey)) for k, o := range objectsByKey { - labelsByKey[k] = o.getTargetLabels(aw) + labels := o.getTargetLabels(aw) + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + if len(swos) > 0 { + swosByKey[k] = swos + } } uw.mu.Lock() uw.objectsRemoved.Add(-len(uw.objectsByKey)) uw.objectsAdded.Add(len(objectsByKey)) uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey)) uw.objectsByKey = objectsByKey - uw.labelsByKey = labelsByKey + uw.swosByKey = swosByKey uw.mu.Unlock() return metadata.ResourceVersion } +func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} { + swos := make([]interface{}, 0, len(labelss)) + for _, labels := range labelss { + swo := swcFunc(labels) + if swo != nil { + swos = append(swos, swo) + } + } + return swos +} + // watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes @@ -439,6 +458,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) { // readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events. func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { + aw := uw.aw d := json.NewDecoder(r) var we WatchEvent for { @@ -459,9 +479,14 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { } uw.objectsByKey[key] = o uw.mu.Unlock() - labels := o.getTargetLabels(uw.aw) + labels := o.getTargetLabels(aw) + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) uw.mu.Lock() - uw.labelsByKey[key] = labels + if len(swos) > 0 { + uw.swosByKey[key] = swos + } else { + delete(uw.swosByKey, key) + } uw.mu.Unlock() case "DELETED": uw.mu.Lock() @@ -470,7 +495,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.objectsCount.Dec() } delete(uw.objectsByKey, key) - delete(uw.labelsByKey, key) + delete(uw.swosByKey, key) uw.mu.Unlock() default: return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index 07c2c00ee3..2bab345506 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -37,15 +37,20 @@ type Selector struct { Field string `yaml:"field"` } -// GetLabels returns labels for the given sdc and baseDir. -func (sdc *SDConfig) GetLabels(baseDir string) ([]map[string]string, error) { - cfg, err := getAPIConfig(sdc, baseDir) +// ScrapeWorkConstructorFunc must construct ScrapeWork object for the given metaLabels. +type ScrapeWorkConstructorFunc func(metaLabels map[string]string) interface{} + +// GetScrapeWorkObjects returns ScrapeWork objects for the given sdc and baseDir. +// +// swcFunc is used for constructing such objects. +func (sdc *SDConfig) GetScrapeWorkObjects(baseDir string, swcFunc ScrapeWorkConstructorFunc) ([]interface{}, error) { + cfg, err := getAPIConfig(sdc, baseDir, swcFunc) if err != nil { return nil, fmt.Errorf("cannot create API config: %w", err) } switch sdc.Role { case "node", "pod", "service", "endpoints", "endpointslices", "ingress": - return cfg.aw.getLabelsForRole(sdc.Role), nil + return cfg.aw.getScrapeWorkObjectsForRole(sdc.Role), nil default: return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role) }