lib/promscrape/discovery/kubernetes: cache ScrapeWork objects as soon as the corresponding k8s objects are changed

This should reduce CPU usage and memory usage when Kubernetes contains tens of thousands of objects
This commit is contained in:
Aliaksandr Valialkin 2021-03-02 16:42:48 +02:00
parent 22b1941cfc
commit 6a7ef768ff
3 changed files with 83 additions and 111 deletions

View file

@ -14,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -241,10 +240,23 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
ok := true
for j := range sc.KubernetesSDConfigs {
sdc := &sc.KubernetesSDConfigs[j]
var okLocal bool
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "kubernetes_sd_config")
if ok {
ok = okLocal
swos, err := sdc.GetScrapeWorkObjects(cfg.baseDir, func(metaLabels map[string]string) interface{} {
target := metaLabels["__address__"]
sw, err := sc.swc.getScrapeWork(target, nil, metaLabels)
if err != nil {
logger.Errorf("cannot create kubernetes_sd_config target target %q for job_name %q: %s", target, sc.swc.jobName, err)
return nil
}
return sw
})
if err != nil {
logger.Errorf("skipping kubernetes_sd_config targets for job_name %q because of error: %s", sc.swc.jobName, err)
ok = false
break
}
for _, swo := range swos {
sw := swo.(*ScrapeWork)
dst = append(dst, sw)
}
}
if ok {
@ -252,7 +264,7 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering kubernetes targets for job %q, so preserving the previous targets", sc.swc.jobName)
logger.Errorf("there were errors when discovering kubernetes_sd_config targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
@ -555,8 +567,6 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
disableKeepAlive: sc.DisableKeepAlive,
streamParse: sc.StreamParse,
scrapeAlignInterval: sc.ScrapeAlignInterval,
cache: newScrapeWorkCache(),
}
return swc, nil
}
@ -580,64 +590,6 @@ type scrapeWorkConfig struct {
disableKeepAlive bool
streamParse bool
scrapeAlignInterval time.Duration
cache *scrapeWorkCache
}
type scrapeWorkCache struct {
mu sync.Mutex
m map[string]*scrapeWorkEntry
lastCleanupTime uint64
}
type scrapeWorkEntry struct {
sw *ScrapeWork
lastAccessTime uint64
}
func newScrapeWorkCache() *scrapeWorkCache {
return &scrapeWorkCache{
m: make(map[string]*scrapeWorkEntry),
}
}
func (swc *scrapeWorkCache) Get(key string) *ScrapeWork {
scrapeWorkCacheRequests.Inc()
currentTime := fasttime.UnixTimestamp()
swc.mu.Lock()
swe := swc.m[key]
if swe != nil {
swe.lastAccessTime = currentTime
}
swc.mu.Unlock()
if swe == nil {
return nil
}
scrapeWorkCacheHits.Inc()
return swe.sw
}
var (
scrapeWorkCacheRequests = metrics.NewCounter(`vm_promscrape_scrapework_cache_requests_total`)
scrapeWorkCacheHits = metrics.NewCounter(`vm_promscrape_scrapework_cache_hits_total`)
)
func (swc *scrapeWorkCache) Set(key string, sw *ScrapeWork) {
currentTime := fasttime.UnixTimestamp()
swc.mu.Lock()
swc.m[key] = &scrapeWorkEntry{
sw: sw,
lastAccessTime: currentTime,
}
if currentTime > swc.lastCleanupTime+10*60 {
for k, swe := range swc.m {
if currentTime > swe.lastAccessTime+2*60 {
delete(swc.m, k)
}
}
swc.lastCleanupTime = currentTime
}
swc.mu.Unlock()
}
type targetLabelsGetter interface {
@ -761,26 +713,6 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
return dst
}
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
bb := scrapeWorkKeyBufPool.Get()
defer scrapeWorkKeyBufPool.Put(bb)
bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels)
keyStrUnsafe := bytesutil.ToUnsafeString(bb.B)
if needSkipScrapeWork(keyStrUnsafe) {
return nil, nil
}
if sw := swc.cache.Get(keyStrUnsafe); sw != nil {
return sw, nil
}
sw, err := swc.getScrapeWorkReal(target, extraLabels, metaLabels)
if err == nil {
swc.cache.Set(string(bb.B), sw)
}
return sw, err
}
var scrapeWorkKeyBufPool bytesutil.ByteBufferPool
func appendScrapeWorkKey(dst []byte, target string, extraLabels, metaLabels map[string]string) []byte {
dst = append(dst, target...)
dst = append(dst, ',')
@ -814,7 +746,17 @@ func appendSortedKeyValuePairs(dst []byte, m map[string]string) []byte {
return dst
}
func (swc *scrapeWorkConfig) getScrapeWorkReal(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
var scrapeWorkKeyBufPool bytesutil.ByteBufferPool
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
// Verify whether the scrape work must be skipped.
bb := scrapeWorkKeyBufPool.Get()
defer scrapeWorkKeyBufPool.Put(bb)
bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels)
if needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B)) {
return nil, nil
}
labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params)
var originalLabels []prompbmarshal.Label
if !*dropOriginalLabels {

View file

@ -36,15 +36,15 @@ func (ac *apiConfig) mustStop() {
var configMap = discoveryutils.NewConfigMap()
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
func getAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir, swcFunc) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) {
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
@ -97,7 +97,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
},
Timeout: *apiServerTimeout,
}
aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors)
aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors, swcFunc)
cfg := &apiConfig{
aw: aw,
}
@ -141,6 +141,9 @@ type apiWatcher struct {
// Selectors to apply during watch
selectors []Selector
// Constructor for creating ScrapeWork objects from labels.
swcFunc ScrapeWorkConstructorFunc
// mu protects watchersByURL
mu sync.Mutex
@ -157,7 +160,7 @@ func (aw *apiWatcher) mustStop() {
aw.wg.Wait()
}
func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher {
func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
stopCtx, stopFunc := context.WithCancel(context.Background())
return &apiWatcher{
apiServer: apiServer,
@ -165,6 +168,7 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa
client: client,
namespaces: namespaces,
selectors: selectors,
swcFunc: swcFunc,
watchersByURL: make(map[string]*urlWatcher),
@ -173,23 +177,23 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa
}
}
// getLabelsForRole returns all the sets of labels for the given role.
func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string {
// getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role.
func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} {
aw.startWatchersForRole(role)
var ms []map[string]string
var swos []interface{}
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
for _, labels := range uw.labelsByKey {
ms = append(ms, labels...)
for _, swosLocal := range uw.swosByKey {
swos = append(swos, swosLocal...)
}
uw.mu.Unlock()
}
aw.mu.Unlock()
return ms
return swos
}
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
@ -288,12 +292,12 @@ type urlWatcher struct {
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// mu protects objectsByKey and labelsByKey
// mu protects objectsByKey and swosByKey
mu sync.Mutex
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
labelsByKey map[string][]map[string]string
swosByKey map[string][]interface{}
// the parent apiWatcher
aw *apiWatcher
@ -316,7 +320,7 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject
parseObjectList: parseObjectList,
objectsByKey: make(map[string]object),
labelsByKey: make(map[string][]map[string]string),
swosByKey: make(map[string][]interface{}),
aw: aw,
@ -354,20 +358,35 @@ func (uw *urlWatcher) reloadObjects() string {
}
return ""
}
labelsByKey := make(map[string][]map[string]string, len(objectsByKey))
swosByKey := make(map[string][]interface{}, len(objectsByKey))
for k, o := range objectsByKey {
labelsByKey[k] = o.getTargetLabels(aw)
labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[k] = swos
}
}
uw.mu.Lock()
uw.objectsRemoved.Add(-len(uw.objectsByKey))
uw.objectsAdded.Add(len(objectsByKey))
uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey))
uw.objectsByKey = objectsByKey
uw.labelsByKey = labelsByKey
uw.swosByKey = swosByKey
uw.mu.Unlock()
return metadata.ResourceVersion
}
func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} {
swos := make([]interface{}, 0, len(labelss))
for _, labels := range labelss {
swo := swcFunc(labels)
if swo != nil {
swos = append(swos, swo)
}
}
return swos
}
// watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
@ -439,6 +458,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) {
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
aw := uw.aw
d := json.NewDecoder(r)
var we WatchEvent
for {
@ -459,9 +479,14 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
}
uw.objectsByKey[key] = o
uw.mu.Unlock()
labels := o.getTargetLabels(uw.aw)
labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
uw.mu.Lock()
uw.labelsByKey[key] = labels
if len(swos) > 0 {
uw.swosByKey[key] = swos
} else {
delete(uw.swosByKey, key)
}
uw.mu.Unlock()
case "DELETED":
uw.mu.Lock()
@ -470,7 +495,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.objectsCount.Dec()
}
delete(uw.objectsByKey, key)
delete(uw.labelsByKey, key)
delete(uw.swosByKey, key)
uw.mu.Unlock()
default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)

View file

@ -37,15 +37,20 @@ type Selector struct {
Field string `yaml:"field"`
}
// GetLabels returns labels for the given sdc and baseDir.
func (sdc *SDConfig) GetLabels(baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc, baseDir)
// ScrapeWorkConstructorFunc must construct ScrapeWork object for the given metaLabels.
type ScrapeWorkConstructorFunc func(metaLabels map[string]string) interface{}
// GetScrapeWorkObjects returns ScrapeWork objects for the given sdc and baseDir.
//
// swcFunc is used for constructing such objects.
func (sdc *SDConfig) GetScrapeWorkObjects(baseDir string, swcFunc ScrapeWorkConstructorFunc) ([]interface{}, error) {
cfg, err := getAPIConfig(sdc, baseDir, swcFunc)
if err != nil {
return nil, fmt.Errorf("cannot create API config: %w", err)
}
switch sdc.Role {
case "node", "pod", "service", "endpoints", "endpointslices", "ingress":
return cfg.aw.getLabelsForRole(sdc.Role), nil
return cfg.aw.getScrapeWorkObjectsForRole(sdc.Role), nil
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role)
}