lib/promscrape: cache ScrapeWork

This should reduce the time needed for updating big number of scrape targets.
This commit is contained in:
Aliaksandr Valialkin 2021-02-26 21:41:54 +02:00
parent 815666e6a6
commit ed8441ec52
2 changed files with 96 additions and 7 deletions

View file

@ -6,12 +6,15 @@ import (
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"path/filepath" "path/filepath"
"sort"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -510,6 +513,8 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
disableKeepAlive: sc.DisableKeepAlive, disableKeepAlive: sc.DisableKeepAlive,
streamParse: sc.StreamParse, streamParse: sc.StreamParse,
scrapeAlignInterval: sc.ScrapeAlignInterval, scrapeAlignInterval: sc.ScrapeAlignInterval,
cache: newScrapeWorkCache(),
} }
return swc, nil return swc, nil
} }
@ -533,6 +538,52 @@ type scrapeWorkConfig struct {
disableKeepAlive bool disableKeepAlive bool
streamParse bool streamParse bool
scrapeAlignInterval time.Duration scrapeAlignInterval time.Duration
cache *scrapeWorkCache
}
type scrapeWorkCache struct {
mu sync.Mutex
m map[string]*scrapeWorkEntry
lastCleanupTime uint64
}
type scrapeWorkEntry struct {
sw *ScrapeWork
lastAccessTime uint64
}
func newScrapeWorkCache() *scrapeWorkCache {
return &scrapeWorkCache{
m: make(map[string]*scrapeWorkEntry),
}
}
func (swc *scrapeWorkCache) Get(key string) *ScrapeWork {
currentTime := fasttime.UnixTimestamp()
swc.mu.Lock()
swe := swc.m[key]
swe.lastAccessTime = currentTime
swc.mu.Unlock()
return swe.sw
}
func (swc *scrapeWorkCache) Set(key string, sw *ScrapeWork) {
currentTime := fasttime.UnixTimestamp()
swc.mu.Lock()
swc.m[key] = &scrapeWorkEntry{
sw: sw,
lastAccessTime: currentTime,
}
if currentTime > swc.lastCleanupTime+10*60 {
for k, swe := range swc.m {
if currentTime > swe.lastAccessTime+2*60 {
delete(swc.m, k)
}
}
swc.lastCleanupTime = currentTime
}
swc.mu.Unlock()
} }
type targetLabelsGetter interface { type targetLabelsGetter interface {
@ -562,7 +613,7 @@ func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, t
go func() { go func() {
for metaLabels := range workCh { for metaLabels := range workCh {
target := metaLabels["__address__"] target := metaLabels["__address__"]
sw, err := getScrapeWork(swc, target, nil, metaLabels) sw, err := swc.getScrapeWork(target, nil, metaLabels)
if err != nil { if err != nil {
err = fmt.Errorf("skipping %s target %q for job_name %q because of error: %w", discoveryType, target, swc.jobName, err) err = fmt.Errorf("skipping %s target %q for job_name %q because of error: %w", discoveryType, target, swc.jobName, err)
} }
@ -643,7 +694,7 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName) logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName)
continue continue
} }
sw, err := getScrapeWork(swc, target, stc.Labels, metaLabels) sw, err := swc.getScrapeWork(target, stc.Labels, metaLabels)
if err != nil { if err != nil {
// Do not return this error, since other targets may be valid // Do not return this error, since other targets may be valid
logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err) logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err)
@ -656,7 +707,42 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
return dst return dst
} }
func getScrapeWork(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
key := getScrapeWorkKey(extraLabels, metaLabels)
if sw := swc.cache.Get(key); sw != nil {
return sw, nil
}
sw, err := swc.getScrapeWorkReal(target, extraLabels, metaLabels)
if err != nil {
swc.cache.Set(key, sw)
}
return sw, err
}
func getScrapeWorkKey(extraLabels, metaLabels map[string]string) string {
var b []byte
b = appendSortedKeyValuePairs(b, extraLabels)
b = appendSortedKeyValuePairs(b, metaLabels)
return string(b)
}
func appendSortedKeyValuePairs(dst []byte, m map[string]string) []byte {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
dst = strconv.AppendQuote(dst, k)
dst = append(dst, ':')
dst = strconv.AppendQuote(dst, m[k])
dst = append(dst, ',')
}
dst = append(dst, '\n')
return dst
}
func (swc *scrapeWorkConfig) getScrapeWorkReal(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params) labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params)
var originalLabels []prompbmarshal.Label var originalLabels []prompbmarshal.Label
if !*dropOriginalLabels { if !*dropOriginalLabels {

View file

@ -16,6 +16,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
@ -143,7 +144,7 @@ type apiWatcher struct {
// The last time the apiWatcher was queried for cached objects. // The last time the apiWatcher was queried for cached objects.
// It is used for stopping unused watchers. // It is used for stopping unused watchers.
lastAccessTime time.Time lastAccessTime uint64
} }
func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher { func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector) *apiWatcher {
@ -156,7 +157,7 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa
watchersByURL: make(map[string]*urlWatcher), watchersByURL: make(map[string]*urlWatcher),
lastAccessTime: time.Now(), lastAccessTime: fasttime.UnixTimestamp(),
} }
} }
@ -174,6 +175,8 @@ func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string {
} }
uw.mu.Unlock() uw.mu.Unlock()
} }
aw.lastAccessTime = fasttime.UnixTimestamp()
aw.mu.Unlock()
return ms return ms
} }
@ -197,7 +200,7 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
break break
} }
} }
aw.lastAccessTime = time.Now() aw.lastAccessTime = fasttime.UnixTimestamp()
aw.mu.Unlock() aw.mu.Unlock()
return o return o
} }
@ -233,7 +236,7 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseO
func (aw *apiWatcher) needStop() bool { func (aw *apiWatcher) needStop() bool {
aw.mu.Lock() aw.mu.Lock()
defer aw.mu.Unlock() defer aw.mu.Unlock()
return time.Since(aw.lastAccessTime) > 5*time.Minute return fasttime.UnixTimestamp() > aw.lastAccessTime+5*60
} }
// doRequest performs http request to the given requestURL. // doRequest performs http request to the given requestURL.