From d4098985159ff78cc5ac67a33e84b02bd317b538 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 14 Mar 2021 21:10:35 +0200 Subject: [PATCH] lib/promscrape/discovery/kubernetes: further optimize kubernetes service discovery for the case with many scrape jobs Do not re-calculate labels per each scrape job - reuse them instead for scrape jobs with identical Kubernetes role Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113 --- .../discovery/kubernetes/api_watcher.go | 433 ++++++++++-------- .../discovery/kubernetes/endpoints.go | 12 +- .../discovery/kubernetes/endpointslices.go | 6 +- .../discovery/kubernetes/ingress.go | 2 +- lib/promscrape/discovery/kubernetes/node.go | 2 +- lib/promscrape/discovery/kubernetes/pod.go | 2 +- .../discovery/kubernetes/service.go | 2 +- 7 files changed, 262 insertions(+), 197 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 7f621f21ab..1e09e8955e 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -33,7 +33,7 @@ type WatchEvent struct { // object is any Kubernetes object. type object interface { key() string - getTargetLabels(aw *apiWatcher) []map[string]string + getTargetLabels(gw *groupWatcher) []map[string]string } // parseObjectFunc must parse object from the given data. @@ -44,59 +44,42 @@ type parseObjectListFunc func(r io.Reader) (map[string]object, ListMeta, error) // apiWatcher is used for watching for Kuberntes object changes and caching their latest states. type apiWatcher struct { - // Kubenetes API server address in the form http://api-server - apiServer string - - // ac contains auth config for communicating with apiServer - ac *promauth.Config - - // sdc contains the related SDConfig - sdc *SDConfig + role string // Constructor for creating ScrapeWork objects from labels swcFunc ScrapeWorkConstructorFunc + gw *groupWatcher + // swos contains a map of ScrapeWork objects for the given apiWatcher swosByKey map[string][]interface{} swosByKeyLock sync.Mutex - // a map of watchers keyed by request urls - watchersByURL map[string]*urlWatcher - watchersByURLLock sync.Mutex - - stopCh chan struct{} - wg sync.WaitGroup + swosCount *metrics.Counter } func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc ScrapeWorkConstructorFunc) *apiWatcher { + namespaces := sdc.Namespaces.Names + selectors := sdc.Selectors + proxyURL := sdc.ProxyURL.URL() + gw := getGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL) return &apiWatcher{ - apiServer: apiServer, - ac: ac, - sdc: sdc, + role: sdc.Role, swcFunc: swcFunc, - - swosByKey: make(map[string][]interface{}), - watchersByURL: make(map[string]*urlWatcher), - - stopCh: make(chan struct{}), + gw: gw, + swosByKey: make(map[string][]interface{}), + swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)), } } func (aw *apiWatcher) mustStop() { - close(aw.stopCh) - aw.wg.Wait() + aw.gw.unsubscribeAPIWatcher(aw) + aw.reloadScrapeWorks(make(map[string][]interface{})) } -func (aw *apiWatcher) reloadScrapeWorks(objectsByKey map[string]object) { - swosByKey := make(map[string][]interface{}) - for key, o := range objectsByKey { - labels := o.getTargetLabels(aw) - swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) - if len(swos) > 0 { - swosByKey[key] = swos - } - } +func (aw *apiWatcher) reloadScrapeWorks(swosByKey map[string][]interface{}) { aw.swosByKeyLock.Lock() + aw.swosCount.Add(len(swosByKey) - len(aw.swosByKey)) aw.swosByKey = swosByKey aw.swosByKeyLock.Unlock() } @@ -105,8 +88,10 @@ func (aw *apiWatcher) setScrapeWorks(key string, labels []map[string]string) { swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) aw.swosByKeyLock.Lock() if len(swos) > 0 { + aw.swosCount.Add(len(swos) - len(aw.swosByKey[key])) aw.swosByKey[key] = swos } else { + aw.swosCount.Add(-len(aw.swosByKey[key])) delete(aw.swosByKey, key) } aw.swosByKeyLock.Unlock() @@ -114,6 +99,7 @@ func (aw *apiWatcher) setScrapeWorks(key string, labels []map[string]string) { func (aw *apiWatcher) removeScrapeWorks(key string) { aw.swosByKeyLock.Lock() + aw.swosCount.Add(-len(aw.swosByKey[key])) delete(aw.swosByKey, key) aw.swosByKeyLock.Unlock() } @@ -132,7 +118,7 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss [] // getScrapeWorkObjects returns all the ScrapeWork objects for the given aw. func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { - aw.startWatchersForRole(aw.sdc.Role, true) + aw.gw.startWatchersForRole(aw.role, aw) aw.swosByKeyLock.Lock() defer aw.swosByKeyLock.Unlock() @@ -147,115 +133,20 @@ func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { return swos } -// getObjectByRole returns an object with the given (namespace, name) key and the given role. -func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { - if aw == nil { - // this is needed for testing - return nil - } - key := namespace + "/" + name - aw.startWatchersForRole(role, false) - aw.watchersByURLLock.Lock() - defer aw.watchersByURLLock.Unlock() - - for _, uw := range aw.watchersByURL { - if uw.role != role { - continue - } - uw.mu.Lock() - o := uw.objectsByKey[key] - uw.mu.Unlock() - if o != nil { - return o - } - } - return nil -} - -func (aw *apiWatcher) startWatchersForRole(role string, registerAPIWatcher bool) { - paths := getAPIPaths(role, aw.sdc.Namespaces.Names, aw.sdc.Selectors) - for _, path := range paths { - apiURL := aw.apiServer + path - aw.startWatcherForURL(role, apiURL, registerAPIWatcher) - } -} - -func (aw *apiWatcher) startWatcherForURL(role, apiURL string, registerAPIWatcher bool) { - aw.watchersByURLLock.Lock() - if aw.watchersByURL[apiURL] != nil { - // Watcher for the given path already exists. - aw.watchersByURLLock.Unlock() - return - } - uw := getURLWatcher(role, apiURL, aw.sdc.ProxyURL.URL(), aw.ac) - aw.watchersByURL[apiURL] = uw - aw.watchersByURLLock.Unlock() - uw.initOnce() - if registerAPIWatcher { - uw.addAPIWatcher(aw) - } - - aw.wg.Add(1) - go func() { - defer aw.wg.Done() - <-aw.stopCh - if registerAPIWatcher { - uw.removeAPIWatcher(aw) - } - aw.watchersByURLLock.Lock() - delete(aw.watchersByURL, apiURL) - aw.watchersByURLLock.Unlock() - }() -} - -func getURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher { - key := fmt.Sprintf("url=%s, proxyURL=%v, authConfig=%s", apiURL, proxyURL, ac.String()) - urlWatchersLock.Lock() - uw := urlWatchers[key] - if uw == nil { - uw = newURLWatcher(role, apiURL, proxyURL, ac) - urlWatchers[key] = uw - } - urlWatchersLock.Unlock() - return uw -} - -var ( - urlWatchersLock sync.Mutex - urlWatchers = make(map[string]*urlWatcher) -) - -// urlWatcher watches for an apiURL and updates object states in objectsByKey. -type urlWatcher struct { - role string - apiURL string +// groupWatcher watches for Kubernetes objects on the given apiServer with the given namespaces, +// selectors and authorization using the given client. +type groupWatcher struct { + apiServer string + namespaces []string + selectors []Selector authorization string client *http.Client - parseObject parseObjectFunc - parseObjectList parseObjectListFunc - - // once is used for initializing the urlWatcher only once - once sync.Once - - // mu protects aws, objectsByKey and resourceVersion mu sync.Mutex - - // aws contains registered apiWatcher objects - aws map[*apiWatcher]struct{} - - // objectsByKey contains the latest state for objects obtained from apiURL - objectsByKey map[string]object - - resourceVersion string - - objectsCount *metrics.Counter - objectsAdded *metrics.Counter - objectsRemoved *metrics.Counter - objectsUpdated *metrics.Counter + m map[string]*urlWatcher } -func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher { +func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher { var proxy func(*http.Request) (*url.URL, error) if proxyURL != nil { proxy = http.ProxyURL(proxyURL) @@ -269,18 +160,173 @@ func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) }, Timeout: *apiServerTimeout, } + return &groupWatcher{ + apiServer: apiServer, + authorization: ac.Authorization, + namespaces: namespaces, + selectors: selectors, + client: client, + m: make(map[string]*urlWatcher), + } +} + +func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher { + key := fmt.Sprintf("apiServer=%s, namespaces=%s, selectors=%s, proxyURL=%v, authConfig=%s", + apiServer, namespaces, selectorsKey(selectors), proxyURL, ac.String()) + groupWatchersLock.Lock() + gw := groupWatchers[key] + if gw == nil { + gw = newGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL) + groupWatchers[key] = gw + } + groupWatchersLock.Unlock() + return gw +} + +func selectorsKey(selectors []Selector) string { + var sb strings.Builder + for _, s := range selectors { + fmt.Fprintf(&sb, "{role=%q, label=%q, field=%q}", s.Role, s.Label, s.Field) + } + return sb.String() +} + +var ( + groupWatchersLock sync.Mutex + groupWatchers = make(map[string]*groupWatcher) + + _ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 { + groupWatchersLock.Lock() + n := len(groupWatchers) + groupWatchersLock.Unlock() + return float64(n) + }) +) + +// getObjectByRole returns an object with the given (namespace, name) key and the given role. +func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { + if gw == nil { + // this is needed for testing + return nil + } + key := namespace + "/" + name + gw.startWatchersForRole(role, nil) + gw.mu.Lock() + defer gw.mu.Unlock() + + for _, uw := range gw.m { + if uw.role != role { + continue + } + uw.mu.Lock() + o := uw.objectsByKey[key] + uw.mu.Unlock() + if o != nil { + return o + } + } + return nil +} + +func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { + paths := getAPIPaths(role, gw.namespaces, gw.selectors) + for _, path := range paths { + apiURL := gw.apiServer + path + gw.mu.Lock() + uw := gw.m[apiURL] + if uw == nil { + uw = newURLWatcher(role, apiURL, gw) + gw.m[apiURL] = uw + } + gw.mu.Unlock() + uw.subscribeAPIWatcher(aw) + } +} + +func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) { + if len(aws) == 0 { + return + } + swosByKey := make([]map[string][]interface{}, len(aws)) + for i := range aws { + swosByKey[i] = make(map[string][]interface{}) + } + for key, o := range objectsByKey { + labels := o.getTargetLabels(gw) + for i, aw := range aws { + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + if len(swos) > 0 { + swosByKey[i][key] = swos + } + } + } + for i, aw := range aws { + aw.reloadScrapeWorks(swosByKey[i]) + } +} + +// doRequest performs http request to the given requestURL. +func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { + req, err := http.NewRequest("GET", requestURL, nil) + if err != nil { + logger.Fatalf("cannot create a request for %q: %s", requestURL, err) + } + if gw.authorization != "" { + req.Header.Set("Authorization", gw.authorization) + } + return gw.client.Do(req) +} + +func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { + gw.mu.Lock() + for _, uw := range gw.m { + uw.unsubscribeAPIWatcher(aw) + } + gw.mu.Unlock() +} + +// urlWatcher watches for an apiURL and updates object states in objectsByKey. +type urlWatcher struct { + role string + apiURL string + gw *groupWatcher + + parseObject parseObjectFunc + parseObjectList parseObjectListFunc + + // mu protects aws, awsPending, objectsByKey and resourceVersion + mu sync.Mutex + + // aws contains registered apiWatcher objects + aws map[*apiWatcher]struct{} + + // awsPending contains pending apiWatcher objects, which must be moved to aws in a batch + awsPending map[*apiWatcher]struct{} + + // objectsByKey contains the latest state for objects obtained from apiURL + objectsByKey map[string]object + + resourceVersion string + + objectsCount *metrics.Counter + objectsAdded *metrics.Counter + objectsRemoved *metrics.Counter + objectsUpdated *metrics.Counter +} + +func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { parseObject, parseObjectList := getObjectParsersForRole(role) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() uw := &urlWatcher{ - role: role, - apiURL: apiURL, - authorization: ac.Authorization, - client: client, + role: role, + apiURL: apiURL, + gw: gw, parseObject: parseObject, parseObjectList: parseObjectList, aws: make(map[*apiWatcher]struct{}), + awsPending: make(map[*apiWatcher]struct{}), objectsByKey: make(map[string]object), objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), @@ -288,45 +334,65 @@ func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)), } + logger.Infof("started %s watcher for %q", uw.role, uw.apiURL) + go uw.watchForUpdates() + go uw.processPendingSubscribers() return uw } -func (uw *urlWatcher) initOnce() { - uw.once.Do(func() { - uw.reloadObjects() - go uw.watchForUpdates() - }) -} - -func (uw *urlWatcher) addAPIWatcher(aw *apiWatcher) { - uw.mu.Lock() - if _, ok := uw.aws[aw]; ok { - logger.Panicf("BUG: aw=%p has been already added", aw) +func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) { + if aw == nil { + return } - uw.aws[aw] = struct{}{} - aw.reloadScrapeWorks(uw.objectsByKey) - uw.mu.Unlock() -} - -func (uw *urlWatcher) removeAPIWatcher(aw *apiWatcher) { uw.mu.Lock() if _, ok := uw.aws[aw]; !ok { - logger.Panicf("BUG: aw=%p is missing", aw) + if _, ok := uw.awsPending[aw]; !ok { + uw.awsPending[aw] = struct{}{} + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Inc() + } } - delete(uw.aws, aw) uw.mu.Unlock() } -// doRequest performs http request to the given requestURL. -func (uw *urlWatcher) doRequest(requestURL string) (*http.Response, error) { - req, err := http.NewRequest("GET", requestURL, nil) - if err != nil { - logger.Fatalf("cannot create a request for %q: %s", requestURL, err) +func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { + uw.mu.Lock() + if _, ok := uw.aws[aw]; ok { + delete(uw.aws, aw) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Dec() + } else if _, ok := uw.awsPending[aw]; ok { + delete(uw.awsPending, aw) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Dec() } - if uw.authorization != "" { - req.Header.Set("Authorization", uw.authorization) + uw.mu.Unlock() +} + +func (uw *urlWatcher) processPendingSubscribers() { + t := time.NewTicker(time.Second) + for range t.C { + var awsPending []*apiWatcher + var objectsByKey map[string]object + + uw.mu.Lock() + if len(uw.awsPending) > 0 { + awsPending = getAPIWatchers(uw.awsPending) + for _, aw := range awsPending { + if _, ok := uw.aws[aw]; ok { + logger.Panicf("BUG: aw=%p already exists in uw.aws", aw) + } + uw.aws[aw] = struct{}{} + delete(uw.awsPending, aw) + } + objectsByKey = make(map[string]object, len(uw.objectsByKey)) + for key, o := range uw.objectsByKey { + objectsByKey[key] = o + } + } + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Add(-len(awsPending)) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Add(len(awsPending)) + uw.mu.Unlock() + + uw.gw.reloadScrapeWorksForAPIWatchers(awsPending, objectsByKey) } - return uw.client.Do(req) } func (uw *urlWatcher) setResourceVersion(resourceVersion string) { @@ -346,7 +412,7 @@ func (uw *urlWatcher) reloadObjects() string { } requestURL := uw.apiURL - resp, err := uw.doRequest(requestURL) + resp, err := uw.gw.doRequest(requestURL) if err != nil { logger.Errorf("cannot perform request to %q: %s", requestURL, err) return "" @@ -386,22 +452,19 @@ func (uw *urlWatcher) reloadObjects() string { uw.objectsAdded.Add(added) uw.objectsCount.Add(added - removed) uw.resourceVersion = metadata.ResourceVersion + aws := getAPIWatchers(uw.aws) uw.mu.Unlock() - for _, aw := range uw.getAPIWatchers() { - aw.reloadScrapeWorks(objectsByKey) - } - logger.Infof("loaded %d objects from %q", len(objectsByKey), requestURL) + uw.gw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey) + logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) return metadata.ResourceVersion } -func (uw *urlWatcher) getAPIWatchers() []*apiWatcher { - uw.mu.Lock() - aws := make([]*apiWatcher, 0, len(uw.aws)) - for aw := range uw.aws { +func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher { + aws := make([]*apiWatcher, 0, len(awsMap)) + for aw := range awsMap { aws = append(aws, aw) } - uw.mu.Unlock() return aws } @@ -423,7 +486,7 @@ func (uw *urlWatcher) watchForUpdates() { if strings.Contains(apiURL, "?") { delimiter = "&" } - timeoutSeconds := time.Duration(0.9 * float64(uw.client.Timeout)).Seconds() + timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds() apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) for { resourceVersion := uw.reloadObjects() @@ -431,7 +494,7 @@ func (uw *urlWatcher) watchForUpdates() { if resourceVersion != "" { requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) } - resp, err := uw.doRequest(requestURL) + resp, err := uw.gw.doRequest(requestURL) if err != nil { logger.Errorf("cannot perform request to %q: %s", requestURL, err) backoffSleep() @@ -486,9 +549,10 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.objectsUpdated.Inc() } uw.objectsByKey[key] = o + aws := getAPIWatchers(uw.aws) uw.mu.Unlock() - for _, aw := range uw.getAPIWatchers() { - labels := o.getTargetLabels(aw) + labels := o.getTargetLabels(uw.gw) + for _, aw := range aws { aw.setScrapeWorks(key, labels) } case "DELETED": @@ -503,8 +567,9 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.objectsRemoved.Inc() delete(uw.objectsByKey, key) } + aws := getAPIWatchers(uw.aws) uw.mu.Unlock() - for _, aw := range uw.getAPIWatchers() { + for _, aw := range aws { aw.removeScrapeWorks(key) } case "BOOKMARK": diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index 5119f513a0..805a88b01c 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -90,17 +90,17 @@ type EndpointPort struct { // getTargetLabels returns labels for each endpoint in eps. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints -func (eps *Endpoints) getTargetLabels(aw *apiWatcher) []map[string]string { +func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string { var svc *Service - if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { + if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { svc = o.(*Service) } podPortsSeen := make(map[*Pod][]int) var ms []map[string]string for _, ess := range eps.Subsets { for _, epp := range ess.Ports { - ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.Addresses, epp, svc, "true") - ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.NotReadyAddresses, epp, svc, "false") + ms = appendEndpointLabelsForAddresses(ms, gw, podPortsSeen, eps, ess.Addresses, epp, svc, "true") + ms = appendEndpointLabelsForAddresses(ms, gw, podPortsSeen, eps, ess.NotReadyAddresses, epp, svc, "false") } } @@ -135,11 +135,11 @@ func (eps *Endpoints) getTargetLabels(aw *apiWatcher) []map[string]string { return ms } -func appendEndpointLabelsForAddresses(ms []map[string]string, aw *apiWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints, +func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher, podPortsSeen map[*Pod][]int, eps *Endpoints, eas []EndpointAddress, epp EndpointPort, svc *Service, ready string) []map[string]string { for _, ea := range eas { var p *Pod - if o := aw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { + if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { p = o.(*Pod) } m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready) diff --git a/lib/promscrape/discovery/kubernetes/endpointslices.go b/lib/promscrape/discovery/kubernetes/endpointslices.go index a8739ee5b6..5e1961e926 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices.go @@ -37,16 +37,16 @@ func parseEndpointSlice(data []byte) (object, error) { // getTargetLabels returns labels for eps. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices -func (eps *EndpointSlice) getTargetLabels(aw *apiWatcher) []map[string]string { +func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string { var svc *Service - if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { + if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { svc = o.(*Service) } podPortsSeen := make(map[*Pod][]int) var ms []map[string]string for _, ess := range eps.Endpoints { var p *Pod - if o := aw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { + if o := gw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { p = o.(*Pod) } for _, epp := range eps.Ports { diff --git a/lib/promscrape/discovery/kubernetes/ingress.go b/lib/promscrape/discovery/kubernetes/ingress.go index d7f5df66fe..cca6bdb240 100644 --- a/lib/promscrape/discovery/kubernetes/ingress.go +++ b/lib/promscrape/discovery/kubernetes/ingress.go @@ -87,7 +87,7 @@ type HTTPIngressPath struct { // getTargetLabels returns labels for ig. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress -func (ig *Ingress) getTargetLabels(aw *apiWatcher) []map[string]string { +func (ig *Ingress) getTargetLabels(gw *groupWatcher) []map[string]string { tlsHosts := make(map[string]bool) for _, tls := range ig.Spec.TLS { for _, host := range tls.Hosts { diff --git a/lib/promscrape/discovery/kubernetes/node.go b/lib/promscrape/discovery/kubernetes/node.go index a7e392d650..6c990c8461 100644 --- a/lib/promscrape/discovery/kubernetes/node.go +++ b/lib/promscrape/discovery/kubernetes/node.go @@ -76,7 +76,7 @@ type NodeDaemonEndpoints struct { // getTargetLabels returs labels for the given n. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node -func (n *Node) getTargetLabels(aw *apiWatcher) []map[string]string { +func (n *Node) getTargetLabels(gw *groupWatcher) []map[string]string { addr := getNodeAddr(n.Status.Addresses) if len(addr) == 0 { // Skip node without address diff --git a/lib/promscrape/discovery/kubernetes/pod.go b/lib/promscrape/discovery/kubernetes/pod.go index c3e3f314ae..8a88ffaca4 100644 --- a/lib/promscrape/discovery/kubernetes/pod.go +++ b/lib/promscrape/discovery/kubernetes/pod.go @@ -97,7 +97,7 @@ type PodCondition struct { // getTargetLabels returns labels for each port of the given p. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod -func (p *Pod) getTargetLabels(aw *apiWatcher) []map[string]string { +func (p *Pod) getTargetLabels(gw *groupWatcher) []map[string]string { if len(p.Status.PodIP) == 0 { // Skip pod without IP return nil diff --git a/lib/promscrape/discovery/kubernetes/service.go b/lib/promscrape/discovery/kubernetes/service.go index c5c0ddd8cf..b74bd26532 100644 --- a/lib/promscrape/discovery/kubernetes/service.go +++ b/lib/promscrape/discovery/kubernetes/service.go @@ -71,7 +71,7 @@ type ServicePort struct { // getTargetLabels returns labels for each port of the given s. // // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service -func (s *Service) getTargetLabels(aw *apiWatcher) []map[string]string { +func (s *Service) getTargetLabels(gw *groupWatcher) []map[string]string { host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace) var ms []map[string]string for _, sp := range s.Spec.Ports {