lib/promscrape/discovery/kubernetes: further optimize kubernetes service discovery for the case with many scrape jobs

Do not re-calculate labels per each scrape job - reuse them instead for scrape jobs with identical Kubernetes role

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113
This commit is contained in:
Aliaksandr Valialkin 2021-03-14 21:10:35 +02:00
parent 7a16e8e3a2
commit d409898515
7 changed files with 262 additions and 197 deletions

View file

@ -33,7 +33,7 @@ type WatchEvent struct {
// object is any Kubernetes object. // object is any Kubernetes object.
type object interface { type object interface {
key() string key() string
getTargetLabels(aw *apiWatcher) []map[string]string getTargetLabels(gw *groupWatcher) []map[string]string
} }
// parseObjectFunc must parse object from the given data. // parseObjectFunc must parse object from the given data.
@ -44,59 +44,42 @@ type parseObjectListFunc func(r io.Reader) (map[string]object, ListMeta, error)
// apiWatcher is used for watching for Kuberntes object changes and caching their latest states. // apiWatcher is used for watching for Kuberntes object changes and caching their latest states.
type apiWatcher struct { type apiWatcher struct {
// Kubenetes API server address in the form http://api-server role string
apiServer string
// ac contains auth config for communicating with apiServer
ac *promauth.Config
// sdc contains the related SDConfig
sdc *SDConfig
// Constructor for creating ScrapeWork objects from labels // Constructor for creating ScrapeWork objects from labels
swcFunc ScrapeWorkConstructorFunc swcFunc ScrapeWorkConstructorFunc
gw *groupWatcher
// swos contains a map of ScrapeWork objects for the given apiWatcher // swos contains a map of ScrapeWork objects for the given apiWatcher
swosByKey map[string][]interface{} swosByKey map[string][]interface{}
swosByKeyLock sync.Mutex swosByKeyLock sync.Mutex
// a map of watchers keyed by request urls swosCount *metrics.Counter
watchersByURL map[string]*urlWatcher
watchersByURLLock sync.Mutex
stopCh chan struct{}
wg sync.WaitGroup
} }
func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc ScrapeWorkConstructorFunc) *apiWatcher { func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
namespaces := sdc.Namespaces.Names
selectors := sdc.Selectors
proxyURL := sdc.ProxyURL.URL()
gw := getGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
return &apiWatcher{ return &apiWatcher{
apiServer: apiServer, role: sdc.Role,
ac: ac,
sdc: sdc,
swcFunc: swcFunc, swcFunc: swcFunc,
gw: gw,
swosByKey: make(map[string][]interface{}), swosByKey: make(map[string][]interface{}),
watchersByURL: make(map[string]*urlWatcher), swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)),
stopCh: make(chan struct{}),
} }
} }
func (aw *apiWatcher) mustStop() { func (aw *apiWatcher) mustStop() {
close(aw.stopCh) aw.gw.unsubscribeAPIWatcher(aw)
aw.wg.Wait() aw.reloadScrapeWorks(make(map[string][]interface{}))
} }
func (aw *apiWatcher) reloadScrapeWorks(objectsByKey map[string]object) { func (aw *apiWatcher) reloadScrapeWorks(swosByKey map[string][]interface{}) {
swosByKey := make(map[string][]interface{})
for key, o := range objectsByKey {
labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[key] = swos
}
}
aw.swosByKeyLock.Lock() aw.swosByKeyLock.Lock()
aw.swosCount.Add(len(swosByKey) - len(aw.swosByKey))
aw.swosByKey = swosByKey aw.swosByKey = swosByKey
aw.swosByKeyLock.Unlock() aw.swosByKeyLock.Unlock()
} }
@ -105,8 +88,10 @@ func (aw *apiWatcher) setScrapeWorks(key string, labels []map[string]string) {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
aw.swosByKeyLock.Lock() aw.swosByKeyLock.Lock()
if len(swos) > 0 { if len(swos) > 0 {
aw.swosCount.Add(len(swos) - len(aw.swosByKey[key]))
aw.swosByKey[key] = swos aw.swosByKey[key] = swos
} else { } else {
aw.swosCount.Add(-len(aw.swosByKey[key]))
delete(aw.swosByKey, key) delete(aw.swosByKey, key)
} }
aw.swosByKeyLock.Unlock() aw.swosByKeyLock.Unlock()
@ -114,6 +99,7 @@ func (aw *apiWatcher) setScrapeWorks(key string, labels []map[string]string) {
func (aw *apiWatcher) removeScrapeWorks(key string) { func (aw *apiWatcher) removeScrapeWorks(key string) {
aw.swosByKeyLock.Lock() aw.swosByKeyLock.Lock()
aw.swosCount.Add(-len(aw.swosByKey[key]))
delete(aw.swosByKey, key) delete(aw.swosByKey, key)
aw.swosByKeyLock.Unlock() aw.swosByKeyLock.Unlock()
} }
@ -132,7 +118,7 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw. // getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
aw.startWatchersForRole(aw.sdc.Role, true) aw.gw.startWatchersForRole(aw.role, aw)
aw.swosByKeyLock.Lock() aw.swosByKeyLock.Lock()
defer aw.swosByKeyLock.Unlock() defer aw.swosByKeyLock.Unlock()
@ -147,115 +133,20 @@ func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
return swos return swos
} }
// getObjectByRole returns an object with the given (namespace, name) key and the given role. // groupWatcher watches for Kubernetes objects on the given apiServer with the given namespaces,
func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { // selectors and authorization using the given client.
if aw == nil { type groupWatcher struct {
// this is needed for testing apiServer string
return nil namespaces []string
} selectors []Selector
key := namespace + "/" + name
aw.startWatchersForRole(role, false)
aw.watchersByURLLock.Lock()
defer aw.watchersByURLLock.Unlock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
o := uw.objectsByKey[key]
uw.mu.Unlock()
if o != nil {
return o
}
}
return nil
}
func (aw *apiWatcher) startWatchersForRole(role string, registerAPIWatcher bool) {
paths := getAPIPaths(role, aw.sdc.Namespaces.Names, aw.sdc.Selectors)
for _, path := range paths {
apiURL := aw.apiServer + path
aw.startWatcherForURL(role, apiURL, registerAPIWatcher)
}
}
func (aw *apiWatcher) startWatcherForURL(role, apiURL string, registerAPIWatcher bool) {
aw.watchersByURLLock.Lock()
if aw.watchersByURL[apiURL] != nil {
// Watcher for the given path already exists.
aw.watchersByURLLock.Unlock()
return
}
uw := getURLWatcher(role, apiURL, aw.sdc.ProxyURL.URL(), aw.ac)
aw.watchersByURL[apiURL] = uw
aw.watchersByURLLock.Unlock()
uw.initOnce()
if registerAPIWatcher {
uw.addAPIWatcher(aw)
}
aw.wg.Add(1)
go func() {
defer aw.wg.Done()
<-aw.stopCh
if registerAPIWatcher {
uw.removeAPIWatcher(aw)
}
aw.watchersByURLLock.Lock()
delete(aw.watchersByURL, apiURL)
aw.watchersByURLLock.Unlock()
}()
}
func getURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher {
key := fmt.Sprintf("url=%s, proxyURL=%v, authConfig=%s", apiURL, proxyURL, ac.String())
urlWatchersLock.Lock()
uw := urlWatchers[key]
if uw == nil {
uw = newURLWatcher(role, apiURL, proxyURL, ac)
urlWatchers[key] = uw
}
urlWatchersLock.Unlock()
return uw
}
var (
urlWatchersLock sync.Mutex
urlWatchers = make(map[string]*urlWatcher)
)
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
type urlWatcher struct {
role string
apiURL string
authorization string authorization string
client *http.Client client *http.Client
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// once is used for initializing the urlWatcher only once
once sync.Once
// mu protects aws, objectsByKey and resourceVersion
mu sync.Mutex mu sync.Mutex
m map[string]*urlWatcher
// aws contains registered apiWatcher objects
aws map[*apiWatcher]struct{}
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
resourceVersion string
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsUpdated *metrics.Counter
} }
func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher { func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
var proxy func(*http.Request) (*url.URL, error) var proxy func(*http.Request) (*url.URL, error)
if proxyURL != nil { if proxyURL != nil {
proxy = http.ProxyURL(proxyURL) proxy = http.ProxyURL(proxyURL)
@ -269,18 +160,173 @@ func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config)
}, },
Timeout: *apiServerTimeout, Timeout: *apiServerTimeout,
} }
return &groupWatcher{
apiServer: apiServer,
authorization: ac.Authorization,
namespaces: namespaces,
selectors: selectors,
client: client,
m: make(map[string]*urlWatcher),
}
}
func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
key := fmt.Sprintf("apiServer=%s, namespaces=%s, selectors=%s, proxyURL=%v, authConfig=%s",
apiServer, namespaces, selectorsKey(selectors), proxyURL, ac.String())
groupWatchersLock.Lock()
gw := groupWatchers[key]
if gw == nil {
gw = newGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
groupWatchers[key] = gw
}
groupWatchersLock.Unlock()
return gw
}
func selectorsKey(selectors []Selector) string {
var sb strings.Builder
for _, s := range selectors {
fmt.Fprintf(&sb, "{role=%q, label=%q, field=%q}", s.Role, s.Label, s.Field)
}
return sb.String()
}
var (
groupWatchersLock sync.Mutex
groupWatchers = make(map[string]*groupWatcher)
_ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 {
groupWatchersLock.Lock()
n := len(groupWatchers)
groupWatchersLock.Unlock()
return float64(n)
})
)
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
if gw == nil {
// this is needed for testing
return nil
}
key := namespace + "/" + name
gw.startWatchersForRole(role, nil)
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
if uw.role != role {
continue
}
uw.mu.Lock()
o := uw.objectsByKey[key]
uw.mu.Unlock()
if o != nil {
return o
}
}
return nil
}
func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
paths := getAPIPaths(role, gw.namespaces, gw.selectors)
for _, path := range paths {
apiURL := gw.apiServer + path
gw.mu.Lock()
uw := gw.m[apiURL]
if uw == nil {
uw = newURLWatcher(role, apiURL, gw)
gw.m[apiURL] = uw
}
gw.mu.Unlock()
uw.subscribeAPIWatcher(aw)
}
}
func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) {
if len(aws) == 0 {
return
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
for key, o := range objectsByKey {
labels := o.getTargetLabels(gw)
for i, aw := range aws {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[i][key] = swos
}
}
}
for i, aw := range aws {
aw.reloadScrapeWorks(swosByKey[i])
}
}
// doRequest performs http request to the given requestURL.
func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
if gw.authorization != "" {
req.Header.Set("Authorization", gw.authorization)
}
return gw.client.Do(req)
}
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
gw.mu.Lock()
for _, uw := range gw.m {
uw.unsubscribeAPIWatcher(aw)
}
gw.mu.Unlock()
}
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
type urlWatcher struct {
role string
apiURL string
gw *groupWatcher
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// mu protects aws, awsPending, objectsByKey and resourceVersion
mu sync.Mutex
// aws contains registered apiWatcher objects
aws map[*apiWatcher]struct{}
// awsPending contains pending apiWatcher objects, which must be moved to aws in a batch
awsPending map[*apiWatcher]struct{}
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
resourceVersion string
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsUpdated *metrics.Counter
}
func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role) parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
uw := &urlWatcher{ uw := &urlWatcher{
role: role, role: role,
apiURL: apiURL, apiURL: apiURL,
authorization: ac.Authorization, gw: gw,
client: client,
parseObject: parseObject, parseObject: parseObject,
parseObjectList: parseObjectList, parseObjectList: parseObjectList,
aws: make(map[*apiWatcher]struct{}), aws: make(map[*apiWatcher]struct{}),
awsPending: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object), objectsByKey: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
@ -288,45 +334,65 @@ func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config)
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)), objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
} }
logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
go uw.watchForUpdates()
go uw.processPendingSubscribers()
return uw return uw
} }
func (uw *urlWatcher) initOnce() { func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) {
uw.once.Do(func() { if aw == nil {
uw.reloadObjects() return
go uw.watchForUpdates()
})
}
func (uw *urlWatcher) addAPIWatcher(aw *apiWatcher) {
uw.mu.Lock()
if _, ok := uw.aws[aw]; ok {
logger.Panicf("BUG: aw=%p has been already added", aw)
} }
uw.aws[aw] = struct{}{}
aw.reloadScrapeWorks(uw.objectsByKey)
uw.mu.Unlock()
}
func (uw *urlWatcher) removeAPIWatcher(aw *apiWatcher) {
uw.mu.Lock() uw.mu.Lock()
if _, ok := uw.aws[aw]; !ok { if _, ok := uw.aws[aw]; !ok {
logger.Panicf("BUG: aw=%p is missing", aw) if _, ok := uw.awsPending[aw]; !ok {
uw.awsPending[aw] = struct{}{}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Inc()
}
} }
delete(uw.aws, aw)
uw.mu.Unlock() uw.mu.Unlock()
} }
// doRequest performs http request to the given requestURL. func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
func (uw *urlWatcher) doRequest(requestURL string) (*http.Response, error) { uw.mu.Lock()
req, err := http.NewRequest("GET", requestURL, nil) if _, ok := uw.aws[aw]; ok {
if err != nil { delete(uw.aws, aw)
logger.Fatalf("cannot create a request for %q: %s", requestURL, err) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Dec()
} else if _, ok := uw.awsPending[aw]; ok {
delete(uw.awsPending, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Dec()
} }
if uw.authorization != "" { uw.mu.Unlock()
req.Header.Set("Authorization", uw.authorization) }
func (uw *urlWatcher) processPendingSubscribers() {
t := time.NewTicker(time.Second)
for range t.C {
var awsPending []*apiWatcher
var objectsByKey map[string]object
uw.mu.Lock()
if len(uw.awsPending) > 0 {
awsPending = getAPIWatchers(uw.awsPending)
for _, aw := range awsPending {
if _, ok := uw.aws[aw]; ok {
logger.Panicf("BUG: aw=%p already exists in uw.aws", aw)
}
uw.aws[aw] = struct{}{}
delete(uw.awsPending, aw)
}
objectsByKey = make(map[string]object, len(uw.objectsByKey))
for key, o := range uw.objectsByKey {
objectsByKey[key] = o
}
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Add(-len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Add(len(awsPending))
uw.mu.Unlock()
uw.gw.reloadScrapeWorksForAPIWatchers(awsPending, objectsByKey)
} }
return uw.client.Do(req)
} }
func (uw *urlWatcher) setResourceVersion(resourceVersion string) { func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
@ -346,7 +412,7 @@ func (uw *urlWatcher) reloadObjects() string {
} }
requestURL := uw.apiURL requestURL := uw.apiURL
resp, err := uw.doRequest(requestURL) resp, err := uw.gw.doRequest(requestURL)
if err != nil { if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err) logger.Errorf("cannot perform request to %q: %s", requestURL, err)
return "" return ""
@ -386,22 +452,19 @@ func (uw *urlWatcher) reloadObjects() string {
uw.objectsAdded.Add(added) uw.objectsAdded.Add(added)
uw.objectsCount.Add(added - removed) uw.objectsCount.Add(added - removed)
uw.resourceVersion = metadata.ResourceVersion uw.resourceVersion = metadata.ResourceVersion
aws := getAPIWatchers(uw.aws)
uw.mu.Unlock() uw.mu.Unlock()
for _, aw := range uw.getAPIWatchers() { uw.gw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey)
aw.reloadScrapeWorks(objectsByKey) logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
}
logger.Infof("loaded %d objects from %q", len(objectsByKey), requestURL)
return metadata.ResourceVersion return metadata.ResourceVersion
} }
func (uw *urlWatcher) getAPIWatchers() []*apiWatcher { func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher {
uw.mu.Lock() aws := make([]*apiWatcher, 0, len(awsMap))
aws := make([]*apiWatcher, 0, len(uw.aws)) for aw := range awsMap {
for aw := range uw.aws {
aws = append(aws, aw) aws = append(aws, aw)
} }
uw.mu.Unlock()
return aws return aws
} }
@ -423,7 +486,7 @@ func (uw *urlWatcher) watchForUpdates() {
if strings.Contains(apiURL, "?") { if strings.Contains(apiURL, "?") {
delimiter = "&" delimiter = "&"
} }
timeoutSeconds := time.Duration(0.9 * float64(uw.client.Timeout)).Seconds() timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for { for {
resourceVersion := uw.reloadObjects() resourceVersion := uw.reloadObjects()
@ -431,7 +494,7 @@ func (uw *urlWatcher) watchForUpdates() {
if resourceVersion != "" { if resourceVersion != "" {
requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion)
} }
resp, err := uw.doRequest(requestURL) resp, err := uw.gw.doRequest(requestURL)
if err != nil { if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err) logger.Errorf("cannot perform request to %q: %s", requestURL, err)
backoffSleep() backoffSleep()
@ -486,9 +549,10 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.objectsUpdated.Inc() uw.objectsUpdated.Inc()
} }
uw.objectsByKey[key] = o uw.objectsByKey[key] = o
aws := getAPIWatchers(uw.aws)
uw.mu.Unlock() uw.mu.Unlock()
for _, aw := range uw.getAPIWatchers() { labels := o.getTargetLabels(uw.gw)
labels := o.getTargetLabels(aw) for _, aw := range aws {
aw.setScrapeWorks(key, labels) aw.setScrapeWorks(key, labels)
} }
case "DELETED": case "DELETED":
@ -503,8 +567,9 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.objectsRemoved.Inc() uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key) delete(uw.objectsByKey, key)
} }
aws := getAPIWatchers(uw.aws)
uw.mu.Unlock() uw.mu.Unlock()
for _, aw := range uw.getAPIWatchers() { for _, aw := range aws {
aw.removeScrapeWorks(key) aw.removeScrapeWorks(key)
} }
case "BOOKMARK": case "BOOKMARK":

View file

@ -90,17 +90,17 @@ type EndpointPort struct {
// getTargetLabels returns labels for each endpoint in eps. // getTargetLabels returns labels for each endpoint in eps.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
func (eps *Endpoints) getTargetLabels(aw *apiWatcher) []map[string]string { func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string {
var svc *Service var svc *Service
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service) svc = o.(*Service)
} }
podPortsSeen := make(map[*Pod][]int) podPortsSeen := make(map[*Pod][]int)
var ms []map[string]string var ms []map[string]string
for _, ess := range eps.Subsets { for _, ess := range eps.Subsets {
for _, epp := range ess.Ports { for _, epp := range ess.Ports {
ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.Addresses, epp, svc, "true") ms = appendEndpointLabelsForAddresses(ms, gw, podPortsSeen, eps, ess.Addresses, epp, svc, "true")
ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.NotReadyAddresses, epp, svc, "false") ms = appendEndpointLabelsForAddresses(ms, gw, podPortsSeen, eps, ess.NotReadyAddresses, epp, svc, "false")
} }
} }
@ -135,11 +135,11 @@ func (eps *Endpoints) getTargetLabels(aw *apiWatcher) []map[string]string {
return ms return ms
} }
func appendEndpointLabelsForAddresses(ms []map[string]string, aw *apiWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints, func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints,
eas []EndpointAddress, epp EndpointPort, svc *Service, ready string) []map[string]string { eas []EndpointAddress, epp EndpointPort, svc *Service, ready string) []map[string]string {
for _, ea := range eas { for _, ea := range eas {
var p *Pod var p *Pod
if o := aw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
p = o.(*Pod) p = o.(*Pod)
} }
m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready) m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready)

View file

@ -37,16 +37,16 @@ func parseEndpointSlice(data []byte) (object, error) {
// getTargetLabels returns labels for eps. // getTargetLabels returns labels for eps.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices
func (eps *EndpointSlice) getTargetLabels(aw *apiWatcher) []map[string]string { func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string {
var svc *Service var svc *Service
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service) svc = o.(*Service)
} }
podPortsSeen := make(map[*Pod][]int) podPortsSeen := make(map[*Pod][]int)
var ms []map[string]string var ms []map[string]string
for _, ess := range eps.Endpoints { for _, ess := range eps.Endpoints {
var p *Pod var p *Pod
if o := aw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { if o := gw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil {
p = o.(*Pod) p = o.(*Pod)
} }
for _, epp := range eps.Ports { for _, epp := range eps.Ports {

View file

@ -87,7 +87,7 @@ type HTTPIngressPath struct {
// getTargetLabels returns labels for ig. // getTargetLabels returns labels for ig.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress
func (ig *Ingress) getTargetLabels(aw *apiWatcher) []map[string]string { func (ig *Ingress) getTargetLabels(gw *groupWatcher) []map[string]string {
tlsHosts := make(map[string]bool) tlsHosts := make(map[string]bool)
for _, tls := range ig.Spec.TLS { for _, tls := range ig.Spec.TLS {
for _, host := range tls.Hosts { for _, host := range tls.Hosts {

View file

@ -76,7 +76,7 @@ type NodeDaemonEndpoints struct {
// getTargetLabels returs labels for the given n. // getTargetLabels returs labels for the given n.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
func (n *Node) getTargetLabels(aw *apiWatcher) []map[string]string { func (n *Node) getTargetLabels(gw *groupWatcher) []map[string]string {
addr := getNodeAddr(n.Status.Addresses) addr := getNodeAddr(n.Status.Addresses)
if len(addr) == 0 { if len(addr) == 0 {
// Skip node without address // Skip node without address

View file

@ -97,7 +97,7 @@ type PodCondition struct {
// getTargetLabels returns labels for each port of the given p. // getTargetLabels returns labels for each port of the given p.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod
func (p *Pod) getTargetLabels(aw *apiWatcher) []map[string]string { func (p *Pod) getTargetLabels(gw *groupWatcher) []map[string]string {
if len(p.Status.PodIP) == 0 { if len(p.Status.PodIP) == 0 {
// Skip pod without IP // Skip pod without IP
return nil return nil

View file

@ -71,7 +71,7 @@ type ServicePort struct {
// getTargetLabels returns labels for each port of the given s. // getTargetLabels returns labels for each port of the given s.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service
func (s *Service) getTargetLabels(aw *apiWatcher) []map[string]string { func (s *Service) getTargetLabels(gw *groupWatcher) []map[string]string {
host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace) host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace)
var ms []map[string]string var ms []map[string]string
for _, sp := range s.Spec.Ports { for _, sp := range s.Spec.Ports {