From ed8441ec5240f3cd2d70c6372f1b13b8761c83f5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Feb 2021 21:41:54 +0200 Subject: [PATCH] lib/promscrape: cache ScrapeWork This should reduce the time needed for updating big number of scrape targets. --- lib/promscrape/config.go | 92 +++++++++++++++++++++- lib/promscrape/discovery/kubernetes/api.go | 11 ++- 2 files changed, 96 insertions(+), 7 deletions(-) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index b41bcc463..351637cf9 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -6,12 +6,15 @@ import ( "io/ioutil" "net/url" "path/filepath" + "sort" + "strconv" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -510,6 +513,8 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf disableKeepAlive: sc.DisableKeepAlive, streamParse: sc.StreamParse, scrapeAlignInterval: sc.ScrapeAlignInterval, + + cache: newScrapeWorkCache(), } return swc, nil } @@ -533,6 +538,52 @@ type scrapeWorkConfig struct { disableKeepAlive bool streamParse bool scrapeAlignInterval time.Duration + + cache *scrapeWorkCache +} + +type scrapeWorkCache struct { + mu sync.Mutex + m map[string]*scrapeWorkEntry + lastCleanupTime uint64 +} + +type scrapeWorkEntry struct { + sw *ScrapeWork + lastAccessTime uint64 +} + +func newScrapeWorkCache() *scrapeWorkCache { + return &scrapeWorkCache{ + m: make(map[string]*scrapeWorkEntry), + } +} + +func (swc *scrapeWorkCache) Get(key string) *ScrapeWork { + currentTime := fasttime.UnixTimestamp() + swc.mu.Lock() + swe := swc.m[key] + swe.lastAccessTime = currentTime + swc.mu.Unlock() + return swe.sw +} + +func (swc *scrapeWorkCache) Set(key string, sw *ScrapeWork) { + currentTime := fasttime.UnixTimestamp() + swc.mu.Lock() + swc.m[key] = &scrapeWorkEntry{ + sw: sw, + lastAccessTime: currentTime, + } + if currentTime > swc.lastCleanupTime+10*60 { + for k, swe := range swc.m { + if currentTime > swe.lastAccessTime+2*60 { + delete(swc.m, k) + } + } + swc.lastCleanupTime = currentTime + } + swc.mu.Unlock() } type targetLabelsGetter interface { @@ -562,7 +613,7 @@ func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, t go func() { for metaLabels := range workCh { target := metaLabels["__address__"] - sw, err := getScrapeWork(swc, target, nil, metaLabels) + sw, err := swc.getScrapeWork(target, nil, metaLabels) if err != nil { err = fmt.Errorf("skipping %s target %q for job_name %q because of error: %w", discoveryType, target, swc.jobName, err) } @@ -643,7 +694,7 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName) continue } - sw, err := getScrapeWork(swc, target, stc.Labels, metaLabels) + sw, err := swc.getScrapeWork(target, stc.Labels, metaLabels) if err != nil { // Do not return this error, since other targets may be valid logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err) @@ -656,7 +707,42 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf return dst } -func getScrapeWork(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { +func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { + key := getScrapeWorkKey(extraLabels, metaLabels) + if sw := swc.cache.Get(key); sw != nil { + return sw, nil + } + sw, err := swc.getScrapeWorkReal(target, extraLabels, metaLabels) + if err != nil { + swc.cache.Set(key, sw) + } + return sw, err +} + +func getScrapeWorkKey(extraLabels, metaLabels map[string]string) string { + var b []byte + b = appendSortedKeyValuePairs(b, extraLabels) + b = appendSortedKeyValuePairs(b, metaLabels) + return string(b) +} + +func appendSortedKeyValuePairs(dst []byte, m map[string]string) []byte { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + dst = strconv.AppendQuote(dst, k) + dst = append(dst, ':') + dst = strconv.AppendQuote(dst, m[k]) + dst = append(dst, ',') + } + dst = append(dst, '\n') + return dst +} + +func (swc *scrapeWorkConfig) getScrapeWorkReal(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params) var originalLabels []prompbmarshal.Label if !*dropOriginalLabels { diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index 3212cf3d3..a11e528c7 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" @@ -143,7 +144,7 @@ type apiWatcher struct { // The last time the apiWatcher was queried for cached objects. // It is used for stopping unused watchers. - lastAccessTime time.Time + lastAccessTime uint64 } func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher { @@ -156,7 +157,7 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa watchersByURL: make(map[string]*urlWatcher), - lastAccessTime: time.Now(), + lastAccessTime: fasttime.UnixTimestamp(), } } @@ -174,6 +175,8 @@ func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string { } uw.mu.Unlock() } + aw.lastAccessTime = fasttime.UnixTimestamp() + aw.mu.Unlock() return ms } @@ -197,7 +200,7 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { break } } - aw.lastAccessTime = time.Now() + aw.lastAccessTime = fasttime.UnixTimestamp() aw.mu.Unlock() return o } @@ -233,7 +236,7 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseO func (aw *apiWatcher) needStop() bool { aw.mu.Lock() defer aw.mu.Unlock() - return time.Since(aw.lastAccessTime) > 5*time.Minute + return fasttime.UnixTimestamp() > aw.lastAccessTime+5*60 } // doRequest performs http request to the given requestURL.