From 2eed410466d335598415429a57567ab6a7ac6d1d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Mon, 17 May 2021 23:45:20 +0300 Subject: [PATCH] lib/promscrape/discovery/kubernetes: key ScrapeWork objects by urlWatcher instead of namespace This makes the code less fragile if urlWatcher would depend on additional to namepsace properties. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 --- .../discovery/kubernetes/api_watcher.go | 85 +++++++++---------- .../discovery/kubernetes/api_watcher_test.go | 49 +++++------ 2 files changed, 65 insertions(+), 69 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 266edc0841..0fbfb8f281 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -53,9 +53,9 @@ type apiWatcher struct { gw *groupWatcher - // swos contains per-namepsace maps of ScrapeWork objects for the given apiWatcher - swosByNamespace map[string]map[string][]interface{} - swosByNamespaceLock sync.Mutex + // swos contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher + swosByURLWatcher map[*urlWatcher]map[string][]interface{} + swosByURLWatcherLock sync.Mutex swosCount *metrics.Counter } @@ -66,11 +66,11 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc proxyURL := sdc.ProxyURL.URL() gw := getGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL) return &apiWatcher{ - role: sdc.Role, - swcFunc: swcFunc, - gw: gw, - swosByNamespace: make(map[string]map[string][]interface{}), - swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)), + role: sdc.Role, + swcFunc: swcFunc, + gw: gw, + swosByURLWatcher: make(map[*urlWatcher]map[string][]interface{}), + swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)), } } @@ -80,25 +80,25 @@ func (aw *apiWatcher) mustStart() { func (aw *apiWatcher) mustStop() { aw.gw.unsubscribeAPIWatcher(aw) - aw.swosByNamespaceLock.Lock() - aw.swosByNamespace = make(map[string]map[string][]interface{}) - aw.swosByNamespaceLock.Unlock() + aw.swosByURLWatcherLock.Lock() + aw.swosByURLWatcher = make(map[*urlWatcher]map[string][]interface{}) + aw.swosByURLWatcherLock.Unlock() } -func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][]interface{}) { - aw.swosByNamespaceLock.Lock() - aw.swosCount.Add(len(swosByKey) - len(aw.swosByNamespace[namespace])) - aw.swosByNamespace[namespace] = swosByKey - aw.swosByNamespaceLock.Unlock() +func (aw *apiWatcher) reloadScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) { + aw.swosByURLWatcherLock.Lock() + aw.swosCount.Add(len(swosByKey) - len(aw.swosByURLWatcher[uw])) + aw.swosByURLWatcher[uw] = swosByKey + aw.swosByURLWatcherLock.Unlock() } -func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) { +func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[string]string) { swos := aw.getScrapeWorkObjectsForLabels(labels) - aw.swosByNamespaceLock.Lock() - swosByKey := aw.swosByNamespace[namespace] + aw.swosByURLWatcherLock.Lock() + swosByKey := aw.swosByURLWatcher[uw] if swosByKey == nil { swosByKey = make(map[string][]interface{}) - aw.swosByNamespace[namespace] = swosByKey + aw.swosByURLWatcher[uw] = swosByKey } if len(swos) > 0 { aw.swosCount.Add(len(swos) - len(swosByKey[key])) @@ -107,17 +107,17 @@ func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string] aw.swosCount.Add(-len(swosByKey[key])) delete(swosByKey, key) } - aw.swosByNamespaceLock.Unlock() + aw.swosByURLWatcherLock.Unlock() } -func (aw *apiWatcher) removeScrapeWorks(namespace, key string) { - aw.swosByNamespaceLock.Lock() - swosByKey := aw.swosByNamespace[namespace] +func (aw *apiWatcher) removeScrapeWorks(uw *urlWatcher, key string) { + aw.swosByURLWatcherLock.Lock() + swosByKey := aw.swosByURLWatcher[uw] if len(swosByKey) > 0 { aw.swosCount.Add(-len(swosByKey[key])) delete(swosByKey, key) } - aw.swosByNamespaceLock.Unlock() + aw.swosByURLWatcherLock.Unlock() } func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} { @@ -136,17 +136,17 @@ func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { aw.gw.registerPendingAPIWatchers() - aw.swosByNamespaceLock.Lock() - defer aw.swosByNamespaceLock.Unlock() + aw.swosByURLWatcherLock.Lock() + defer aw.swosByURLWatcherLock.Unlock() size := 0 - for _, swosByKey := range aw.swosByNamespace { + for _, swosByKey := range aw.swosByURLWatcher { for _, swosLocal := range swosByKey { size += len(swosLocal) } } swos := make([]interface{}, 0, size) - for _, swosByKey := range aw.swosByNamespace { + for _, swosByKey := range aw.swosByURLWatcher { for _, swosLocal := range swosByKey { swos = append(swos, swosLocal...) } @@ -257,14 +257,14 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { gw.startWatchersForRole("pod", nil) gw.startWatchersForRole("service", nil) } - paths, namespaces := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors) - for i, path := range paths { + paths := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors) + for _, path := range paths { apiURL := gw.apiServer + path gw.mu.Lock() uw := gw.m[apiURL] needStart := uw == nil if needStart { - uw = newURLWatcher(role, namespaces[i], apiURL, gw) + uw = newURLWatcher(role, apiURL, gw) gw.m[apiURL] = uw } if aw != nil { @@ -352,14 +352,13 @@ type urlWatcher struct { staleResourceVersions *metrics.Counter } -func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher { +func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { parseObject, parseObjectList := getObjectParsersForRole(role) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() uw := &urlWatcher{ - role: role, - namespace: namespace, - apiURL: apiURL, - gw: gw, + role: role, + apiURL: apiURL, + gw: gw, parseObject: parseObject, parseObjectList: parseObjectList, @@ -499,7 +498,7 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatch } } for i, aw := range aws { - aw.reloadScrapeWorks(uw.namespace, swosByKey[i]) + aw.reloadScrapeWorks(uw, swosByKey[i]) } } @@ -589,7 +588,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { if len(uw.aws) > 0 { labels := o.getTargetLabels(uw.gw) for aw := range uw.aws { - aw.setScrapeWorks(uw.namespace, key, labels) + aw.setScrapeWorks(uw, key, labels) } } uw.gw.mu.Unlock() @@ -606,7 +605,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { delete(uw.objectsByKey, key) } for aw := range uw.aws { - aw.removeScrapeWorks(uw.namespace, key) + aw.removeScrapeWorks(uw, key) } uw.gw.mu.Unlock() case "BOOKMARK": @@ -663,19 +662,19 @@ func parseError(data []byte) (*Error, error) { return &em, nil } -func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) ([]string, []string) { +func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) []string { objectType := getObjectTypeByRole(role) if objectType == "nodes" || len(namespaces) == 0 { query := joinSelectors(role, selectors) path := getAPIPath(objectType, "", query) - return []string{path}, []string{""} + return []string{path} } query := joinSelectors(role, selectors) paths := make([]string, len(namespaces)) for i, namespace := range namespaces { paths[i] = getAPIPath(objectType, namespace, query) } - return paths, namespaces + return paths } func getAPIPath(objectType, namespace, query string) string { diff --git a/lib/promscrape/discovery/kubernetes/api_watcher_test.go b/lib/promscrape/discovery/kubernetes/api_watcher_test.go index e0262cea56..35b2b181cc 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher_test.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher_test.go @@ -11,54 +11,51 @@ import ( ) func TestGetAPIPathsWithNamespaces(t *testing.T) { - f := func(role string, namespaces []string, selectors []Selector, expectedPaths, expectedNamespaces []string) { + f := func(role string, namespaces []string, selectors []Selector, expectedPaths []string) { t.Helper() - paths, resultNamespaces := getAPIPathsWithNamespaces(role, namespaces, selectors) + paths := getAPIPathsWithNamespaces(role, namespaces, selectors) if !reflect.DeepEqual(paths, expectedPaths) { t.Fatalf("unexpected paths; got\n%q\nwant\n%q", paths, expectedPaths) } - if !reflect.DeepEqual(resultNamespaces, expectedNamespaces) { - t.Fatalf("unexpected namespaces; got\n%q\nwant\n%q", resultNamespaces, expectedNamespaces) - } } // role=node - f("node", nil, nil, []string{"/api/v1/nodes"}, []string{""}) - f("node", []string{"foo", "bar"}, nil, []string{"/api/v1/nodes"}, []string{""}) + f("node", nil, nil, []string{"/api/v1/nodes"}) + f("node", []string{"foo", "bar"}, nil, []string{"/api/v1/nodes"}) f("node", nil, []Selector{ { Role: "pod", Label: "foo", Field: "bar", }, - }, []string{"/api/v1/nodes"}, []string{""}) + }, []string{"/api/v1/nodes"}) f("node", nil, []Selector{ { Role: "node", Label: "foo", Field: "bar", }, - }, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}, []string{""}) + }, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}) f("node", []string{"x", "y"}, []Selector{ { Role: "node", Label: "foo", Field: "bar", }, - }, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}, []string{""}) + }, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}) // role=pod - f("pod", nil, nil, []string{"/api/v1/pods"}, []string{""}) + f("pod", nil, nil, []string{"/api/v1/pods"}) f("pod", []string{"foo", "bar"}, nil, []string{ "/api/v1/namespaces/foo/pods", "/api/v1/namespaces/bar/pods", - }, []string{"foo", "bar"}) + }) f("pod", nil, []Selector{ { Role: "node", Label: "foo", }, - }, []string{"/api/v1/pods"}, []string{""}) + }, []string{"/api/v1/pods"}) f("pod", nil, []Selector{ { Role: "pod", @@ -69,7 +66,7 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) { Label: "x", Field: "y", }, - }, []string{"/api/v1/pods?labelSelector=foo%2Cx&fieldSelector=y"}, []string{""}) + }, []string{"/api/v1/pods?labelSelector=foo%2Cx&fieldSelector=y"}) f("pod", []string{"x", "y"}, []Selector{ { Role: "pod", @@ -83,14 +80,14 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) { }, []string{ "/api/v1/namespaces/x/pods?labelSelector=foo%2Cx&fieldSelector=y", "/api/v1/namespaces/y/pods?labelSelector=foo%2Cx&fieldSelector=y", - }, []string{"x", "y"}) + }) // role=service - f("service", nil, nil, []string{"/api/v1/services"}, []string{""}) + f("service", nil, nil, []string{"/api/v1/services"}) f("service", []string{"x", "y"}, nil, []string{ "/api/v1/namespaces/x/services", "/api/v1/namespaces/y/services", - }, []string{"x", "y"}) + }) f("service", nil, []Selector{ { Role: "node", @@ -100,7 +97,7 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) { Role: "service", Field: "bar", }, - }, []string{"/api/v1/services?fieldSelector=bar"}, []string{""}) + }, []string{"/api/v1/services?fieldSelector=bar"}) f("service", []string{"x", "y"}, []Selector{ { Role: "service", @@ -109,14 +106,14 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) { }, []string{ "/api/v1/namespaces/x/services?labelSelector=abc%3Dde", "/api/v1/namespaces/y/services?labelSelector=abc%3Dde", - }, []string{"x", "y"}) + }) // role=endpoints - f("endpoints", nil, nil, []string{"/api/v1/endpoints"}, []string{""}) + f("endpoints", nil, nil, []string{"/api/v1/endpoints"}) f("endpoints", []string{"x", "y"}, nil, []string{ "/api/v1/namespaces/x/endpoints", "/api/v1/namespaces/y/endpoints", - }, []string{"x", "y"}) + }) f("endpoints", []string{"x", "y"}, []Selector{ { Role: "endpoints", @@ -129,10 +126,10 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) { }, []string{ "/api/v1/namespaces/x/endpoints?labelSelector=bbb", "/api/v1/namespaces/y/endpoints?labelSelector=bbb", - }, []string{"x", "y"}) + }) // role=endpointslices - f("endpointslices", nil, nil, []string{"/apis/discovery.k8s.io/v1beta1/endpointslices"}, []string{""}) + f("endpointslices", nil, nil, []string{"/apis/discovery.k8s.io/v1beta1/endpointslices"}) f("endpointslices", []string{"x", "y"}, []Selector{ { Role: "endpointslices", @@ -142,10 +139,10 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) { }, []string{ "/apis/discovery.k8s.io/v1beta1/namespaces/x/endpointslices?labelSelector=label&fieldSelector=field", "/apis/discovery.k8s.io/v1beta1/namespaces/y/endpointslices?labelSelector=label&fieldSelector=field", - }, []string{"x", "y"}) + }) // role=ingress - f("ingress", nil, nil, []string{"/apis/networking.k8s.io/v1beta1/ingresses"}, []string{""}) + f("ingress", nil, nil, []string{"/apis/networking.k8s.io/v1beta1/ingresses"}) f("ingress", []string{"x", "y"}, []Selector{ { Role: "node", @@ -166,7 +163,7 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) { }, []string{ "/apis/networking.k8s.io/v1beta1/namespaces/x/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", "/apis/networking.k8s.io/v1beta1/namespaces/y/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", - }, []string{"x", "y"}) + }) } func TestParseBookmark(t *testing.T) {