From 5b08e6fb1695429c1785aeac67ff54fb34e3b911 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 2 Apr 2021 14:45:08 +0300 Subject: [PATCH] lib/promscrape/discovery/kubernetes: properly track objects with the same names in multiple namespaces This is a follow-up for 12e4785fe8dc64d6c4422725e833cefdab49d8f6 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170 --- .../discovery/kubernetes/api_watcher.go | 119 +++++++++--------- .../discovery/kubernetes/common_types.go | 4 + .../discovery/kubernetes/endpoints.go | 10 +- .../discovery/kubernetes/endpointslices.go | 10 +- .../discovery/kubernetes/ingress.go | 10 +- lib/promscrape/discovery/kubernetes/node.go | 11 +- lib/promscrape/discovery/kubernetes/pod.go | 10 +- .../discovery/kubernetes/service.go | 10 +- 8 files changed, 95 insertions(+), 89 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 4397676bc..e9f096e72 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -32,7 +32,7 @@ type WatchEvent struct { // object is any Kubernetes object. type object interface { - name() string + key() string getTargetLabels(gw *groupWatcher) []map[string]string } @@ -79,37 +79,37 @@ func (aw *apiWatcher) mustStop() { aw.swosByNamespaceLock.Unlock() } -func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByName map[string][]interface{}) { +func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][]interface{}) { aw.swosByNamespaceLock.Lock() - aw.swosCount.Add(len(swosByName) - len(aw.swosByNamespace[namespace])) - aw.swosByNamespace[namespace] = swosByName + aw.swosCount.Add(len(swosByKey) - len(aw.swosByNamespace[namespace])) + aw.swosByNamespace[namespace] = swosByKey aw.swosByNamespaceLock.Unlock() } -func (aw *apiWatcher) setScrapeWorks(namespace, name string, labels []map[string]string) { +func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) { swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) aw.swosByNamespaceLock.Lock() - swosByName := aw.swosByNamespace[namespace] - if swosByName == nil { - swosByName = make(map[string][]interface{}) - aw.swosByNamespace[namespace] = swosByName + swosByKey := aw.swosByNamespace[namespace] + if swosByKey == nil { + swosByKey = make(map[string][]interface{}) + aw.swosByNamespace[namespace] = swosByKey } if len(swos) > 0 { - aw.swosCount.Add(len(swos) - len(swosByName[name])) - swosByName[name] = swos + aw.swosCount.Add(len(swos) - len(swosByKey[key])) + swosByKey[key] = swos } else { - aw.swosCount.Add(-len(swosByName[name])) - delete(swosByName, name) + aw.swosCount.Add(-len(swosByKey[key])) + delete(swosByKey, key) } aw.swosByNamespaceLock.Unlock() } -func (aw *apiWatcher) removeScrapeWorks(namespace, name string) { +func (aw *apiWatcher) removeScrapeWorks(namespace, key string) { aw.swosByNamespaceLock.Lock() - swosByName := aw.swosByNamespace[namespace] - if len(swosByName) > 0 { - aw.swosCount.Add(-len(swosByName[name])) - delete(swosByName, name) + swosByKey := aw.swosByNamespace[namespace] + if len(swosByKey) > 0 { + aw.swosCount.Add(-len(swosByKey[key])) + delete(swosByKey, key) } aw.swosByNamespaceLock.Unlock() } @@ -133,14 +133,14 @@ func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { defer aw.swosByNamespaceLock.Unlock() size := 0 - for _, swosByName := range aw.swosByNamespace { - for _, swosLocal := range swosByName { + for _, swosByKey := range aw.swosByNamespace { + for _, swosLocal := range swosByKey { size += len(swosLocal) } } swos := make([]interface{}, 0, size) - for _, swosByName := range aw.swosByNamespace { - for _, swosLocal := range swosByName { + for _, swosByKey := range aw.swosByNamespace { + for _, swosLocal := range swosByKey { swos = append(swos, swosLocal...) } } @@ -223,6 +223,7 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { // this is needed for testing return nil } + key := namespace + "/" + name gw.startWatchersForRole(role, nil) gw.mu.Lock() defer gw.mu.Unlock() @@ -237,7 +238,7 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { continue } uw.mu.Lock() - o := uw.objectsByName[name] + o := uw.objectsByKey[key] uw.mu.Unlock() if o != nil { return o @@ -261,25 +262,25 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { } } -func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(namespace string, aws []*apiWatcher, objectsByName map[string]object) { +func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(namespace string, aws []*apiWatcher, objectsByKey map[string]object) { if len(aws) == 0 { return } - swosByName := make([]map[string][]interface{}, len(aws)) + swosByKey := make([]map[string][]interface{}, len(aws)) for i := range aws { - swosByName[i] = make(map[string][]interface{}) + swosByKey[i] = make(map[string][]interface{}) } - for name, o := range objectsByName { + for key, o := range objectsByKey { labels := o.getTargetLabels(gw) for i, aw := range aws { swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) if len(swos) > 0 { - swosByName[i][name] = swos + swosByKey[i][key] = swos } } } for i, aw := range aws { - aw.reloadScrapeWorks(namespace, swosByName[i]) + aw.reloadScrapeWorks(namespace, swosByKey[i]) } } @@ -303,7 +304,7 @@ func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { gw.mu.Unlock() } -// urlWatcher watches for an apiURL and updates object states in objectsByName. +// urlWatcher watches for an apiURL and updates object states in objectsByKey. type urlWatcher struct { role string namespace string @@ -313,7 +314,7 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc - // mu protects aws, awsPending, objectsByName and resourceVersion + // mu protects aws, awsPending, objectsByKey and resourceVersion mu sync.Mutex // aws contains registered apiWatcher objects @@ -322,8 +323,8 @@ type urlWatcher struct { // awsPending contains pending apiWatcher objects, which must be moved to aws in a batch awsPending map[*apiWatcher]struct{} - // objectsByName contains the latest state for objects obtained from apiURL - objectsByName map[string]object + // objectsByKey contains the latest state for objects obtained from apiURL + objectsByKey map[string]object resourceVersion string @@ -346,9 +347,9 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher parseObject: parseObject, parseObjectList: parseObjectList, - aws: make(map[*apiWatcher]struct{}), - awsPending: make(map[*apiWatcher]struct{}), - objectsByName: make(map[string]object), + aws: make(map[*apiWatcher]struct{}), + awsPending: make(map[*apiWatcher]struct{}), + objectsByKey: make(map[string]object), objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), @@ -392,7 +393,7 @@ func (uw *urlWatcher) processPendingSubscribers() { t := time.NewTicker(time.Second) for range t.C { var awsPending []*apiWatcher - var objectsByName map[string]object + var objectsByKey map[string]object uw.mu.Lock() if len(uw.awsPending) > 0 { @@ -404,16 +405,16 @@ func (uw *urlWatcher) processPendingSubscribers() { uw.aws[aw] = struct{}{} delete(uw.awsPending, aw) } - objectsByName = make(map[string]object, len(uw.objectsByName)) - for name, o := range uw.objectsByName { - objectsByName[name] = o + objectsByKey = make(map[string]object, len(uw.objectsByKey)) + for key, o := range uw.objectsByKey { + objectsByKey[key] = o } } metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Add(-len(awsPending)) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Add(len(awsPending)) uw.mu.Unlock() - uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, awsPending, objectsByName) + uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, awsPending, objectsByKey) } } @@ -445,7 +446,7 @@ func (uw *urlWatcher) reloadObjects() string { logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) return "" } - objectsByName, metadata, err := uw.parseObjectList(resp.Body) + objectsByKey, metadata, err := uw.parseObjectList(resp.Body) _ = resp.Body.Close() if err != nil { logger.Errorf("cannot parse objects from %q: %s", requestURL, err) @@ -454,18 +455,18 @@ func (uw *urlWatcher) reloadObjects() string { uw.mu.Lock() var updated, removed, added int - for name := range uw.objectsByName { - if o, ok := objectsByName[name]; ok { - uw.objectsByName[name] = o + for key := range uw.objectsByKey { + if o, ok := objectsByKey[key]; ok { + uw.objectsByKey[key] = o updated++ } else { - delete(uw.objectsByName, name) + delete(uw.objectsByKey, key) removed++ } } - for name, o := range objectsByName { - if _, ok := uw.objectsByName[name]; !ok { - uw.objectsByName[name] = o + for key, o := range objectsByKey { + if _, ok := uw.objectsByKey[key]; !ok { + uw.objectsByKey[key] = o added++ } } @@ -477,8 +478,8 @@ func (uw *urlWatcher) reloadObjects() string { aws := getAPIWatchers(uw.aws) uw.mu.Unlock() - uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, aws, objectsByName) - logger.Infof("reloaded %d objects from %q", len(objectsByName), requestURL) + uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, aws, objectsByKey) + logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) return metadata.ResourceVersion } @@ -564,37 +565,37 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { if err != nil { return err } - name := o.name() + key := o.key() uw.mu.Lock() - if _, ok := uw.objectsByName[name]; !ok { + if _, ok := uw.objectsByKey[key]; !ok { uw.objectsCount.Inc() uw.objectsAdded.Inc() } else { uw.objectsUpdated.Inc() } - uw.objectsByName[name] = o + uw.objectsByKey[key] = o aws := getAPIWatchers(uw.aws) uw.mu.Unlock() labels := o.getTargetLabels(uw.gw) for _, aw := range aws { - aw.setScrapeWorks(uw.namespace, name, labels) + aw.setScrapeWorks(uw.namespace, key, labels) } case "DELETED": o, err := uw.parseObject(we.Object) if err != nil { return err } - name := o.name() + key := o.key() uw.mu.Lock() - if _, ok := uw.objectsByName[name]; ok { + if _, ok := uw.objectsByKey[key]; ok { uw.objectsCount.Dec() uw.objectsRemoved.Inc() - delete(uw.objectsByName, name) + delete(uw.objectsByKey, key) } aws := getAPIWatchers(uw.aws) uw.mu.Unlock() for _, aw := range aws { - aw.removeScrapeWorks(uw.namespace, name) + aw.removeScrapeWorks(uw.namespace, key) } case "BOOKMARK": // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks diff --git a/lib/promscrape/discovery/kubernetes/common_types.go b/lib/promscrape/discovery/kubernetes/common_types.go index 5eab6e4d1..be93bbb4a 100644 --- a/lib/promscrape/discovery/kubernetes/common_types.go +++ b/lib/promscrape/discovery/kubernetes/common_types.go @@ -16,6 +16,10 @@ type ObjectMeta struct { OwnerReferences []OwnerReference } +func (om *ObjectMeta) key() string { + return om.Namespace + "/" + om.Name +} + // ListMeta is a Kubernetes list metadata // https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#listmeta-v1-meta type ListMeta struct { diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index 032eebb29..805a88b01 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -8,8 +8,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -func (eps *Endpoints) name() string { - return eps.Metadata.Name +func (eps *Endpoints) key() string { + return eps.Metadata.key() } func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) { @@ -18,11 +18,11 @@ func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) { if err := d.Decode(&epsl); err != nil { return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err) } - objectsByName := make(map[string]object) + objectsByKey := make(map[string]object) for _, eps := range epsl.Items { - objectsByName[eps.name()] = eps + objectsByKey[eps.key()] = eps } - return objectsByName, epsl.Metadata, nil + return objectsByKey, epsl.Metadata, nil } func parseEndpoints(data []byte) (object, error) { diff --git a/lib/promscrape/discovery/kubernetes/endpointslices.go b/lib/promscrape/discovery/kubernetes/endpointslices.go index bf4f96f3e..5e1961e92 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices.go @@ -9,8 +9,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -func (eps *EndpointSlice) name() string { - return eps.Metadata.Name +func (eps *EndpointSlice) key() string { + return eps.Metadata.key() } func parseEndpointSliceList(r io.Reader) (map[string]object, ListMeta, error) { @@ -19,11 +19,11 @@ func parseEndpointSliceList(r io.Reader) (map[string]object, ListMeta, error) { if err := d.Decode(&epsl); err != nil { return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList: %w", err) } - objectsByName := make(map[string]object) + objectsByKey := make(map[string]object) for _, eps := range epsl.Items { - objectsByName[eps.name()] = eps + objectsByKey[eps.key()] = eps } - return objectsByName, epsl.Metadata, nil + return objectsByKey, epsl.Metadata, nil } func parseEndpointSlice(data []byte) (object, error) { diff --git a/lib/promscrape/discovery/kubernetes/ingress.go b/lib/promscrape/discovery/kubernetes/ingress.go index 9a99b267f..cca6bdb24 100644 --- a/lib/promscrape/discovery/kubernetes/ingress.go +++ b/lib/promscrape/discovery/kubernetes/ingress.go @@ -6,8 +6,8 @@ import ( "io" ) -func (ig *Ingress) name() string { - return ig.Metadata.Name +func (ig *Ingress) key() string { + return ig.Metadata.key() } func parseIngressList(r io.Reader) (map[string]object, ListMeta, error) { @@ -16,11 +16,11 @@ func parseIngressList(r io.Reader) (map[string]object, ListMeta, error) { if err := d.Decode(&igl); err != nil { return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList: %w", err) } - objectsByName := make(map[string]object) + objectsByKey := make(map[string]object) for _, ig := range igl.Items { - objectsByName[ig.name()] = ig + objectsByKey[ig.key()] = ig } - return objectsByName, igl.Metadata, nil + return objectsByKey, igl.Metadata, nil } func parseIngress(data []byte) (object, error) { diff --git a/lib/promscrape/discovery/kubernetes/node.go b/lib/promscrape/discovery/kubernetes/node.go index a1d07f96d..6c990c846 100644 --- a/lib/promscrape/discovery/kubernetes/node.go +++ b/lib/promscrape/discovery/kubernetes/node.go @@ -8,8 +8,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -func (n *Node) name() string { - return n.Metadata.Name +// getNodesLabels returns labels for k8s nodes obtained from the given cfg +func (n *Node) key() string { + return n.Metadata.key() } func parseNodeList(r io.Reader) (map[string]object, ListMeta, error) { @@ -18,11 +19,11 @@ func parseNodeList(r io.Reader) (map[string]object, ListMeta, error) { if err := d.Decode(&nl); err != nil { return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList: %w", err) } - objectsByName := make(map[string]object) + objectsByKey := make(map[string]object) for _, n := range nl.Items { - objectsByName[n.name()] = n + objectsByKey[n.key()] = n } - return objectsByName, nl.Metadata, nil + return objectsByKey, nl.Metadata, nil } func parseNode(data []byte) (object, error) { diff --git a/lib/promscrape/discovery/kubernetes/pod.go b/lib/promscrape/discovery/kubernetes/pod.go index 56280523c..8a88ffaca 100644 --- a/lib/promscrape/discovery/kubernetes/pod.go +++ b/lib/promscrape/discovery/kubernetes/pod.go @@ -10,8 +10,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -func (p *Pod) name() string { - return p.Metadata.Name +func (p *Pod) key() string { + return p.Metadata.key() } func parsePodList(r io.Reader) (map[string]object, ListMeta, error) { @@ -20,11 +20,11 @@ func parsePodList(r io.Reader) (map[string]object, ListMeta, error) { if err := d.Decode(&pl); err != nil { return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList: %w", err) } - objectsByName := make(map[string]object) + objectsByKey := make(map[string]object) for _, p := range pl.Items { - objectsByName[p.name()] = p + objectsByKey[p.key()] = p } - return objectsByName, pl.Metadata, nil + return objectsByKey, pl.Metadata, nil } func parsePod(data []byte) (object, error) { diff --git a/lib/promscrape/discovery/kubernetes/service.go b/lib/promscrape/discovery/kubernetes/service.go index d140992b0..b74bd2653 100644 --- a/lib/promscrape/discovery/kubernetes/service.go +++ b/lib/promscrape/discovery/kubernetes/service.go @@ -8,8 +8,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -func (s *Service) name() string { - return s.Metadata.Name +func (s *Service) key() string { + return s.Metadata.key() } func parseServiceList(r io.Reader) (map[string]object, ListMeta, error) { @@ -18,11 +18,11 @@ func parseServiceList(r io.Reader) (map[string]object, ListMeta, error) { if err := d.Decode(&sl); err != nil { return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList: %w", err) } - objectsByName := make(map[string]object) + objectsByKey := make(map[string]object) for _, s := range sl.Items { - objectsByName[s.name()] = s + objectsByKey[s.key()] = s } - return objectsByName, sl.Metadata, nil + return objectsByKey, sl.Metadata, nil } func parseService(data []byte) (object, error) {