lib/promscrape/discovery/kubernetes: key ScrapeWork objects by urlWatcher instead of namespace

This makes the code less fragile if urlWatcher would depend on additional to namepsace properties.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
This commit is contained in:
Aliaksandr Valialkin 2021-05-17 23:45:20 +03:00
parent bae4d61ef2
commit 8764b0ae21
2 changed files with 65 additions and 69 deletions

View file

@ -53,9 +53,9 @@ type apiWatcher struct {
gw *groupWatcher
// swos contains per-namepsace maps of ScrapeWork objects for the given apiWatcher
swosByNamespace map[string]map[string][]interface{}
swosByNamespaceLock sync.Mutex
// swos contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher
swosByURLWatcher map[*urlWatcher]map[string][]interface{}
swosByURLWatcherLock sync.Mutex
swosCount *metrics.Counter
}
@ -66,11 +66,11 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
proxyURL := sdc.ProxyURL.URL()
gw := getGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
return &apiWatcher{
role: sdc.Role,
swcFunc: swcFunc,
gw: gw,
swosByNamespace: make(map[string]map[string][]interface{}),
swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)),
role: sdc.Role,
swcFunc: swcFunc,
gw: gw,
swosByURLWatcher: make(map[*urlWatcher]map[string][]interface{}),
swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)),
}
}
@ -80,25 +80,25 @@ func (aw *apiWatcher) mustStart() {
func (aw *apiWatcher) mustStop() {
aw.gw.unsubscribeAPIWatcher(aw)
aw.swosByNamespaceLock.Lock()
aw.swosByNamespace = make(map[string]map[string][]interface{})
aw.swosByNamespaceLock.Unlock()
aw.swosByURLWatcherLock.Lock()
aw.swosByURLWatcher = make(map[*urlWatcher]map[string][]interface{})
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][]interface{}) {
aw.swosByNamespaceLock.Lock()
aw.swosCount.Add(len(swosByKey) - len(aw.swosByNamespace[namespace]))
aw.swosByNamespace[namespace] = swosByKey
aw.swosByNamespaceLock.Unlock()
func (aw *apiWatcher) reloadScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) {
aw.swosByURLWatcherLock.Lock()
aw.swosCount.Add(len(swosByKey) - len(aw.swosByURLWatcher[uw]))
aw.swosByURLWatcher[uw] = swosByKey
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) {
func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[string]string) {
swos := aw.getScrapeWorkObjectsForLabels(labels)
aw.swosByNamespaceLock.Lock()
swosByKey := aw.swosByNamespace[namespace]
aw.swosByURLWatcherLock.Lock()
swosByKey := aw.swosByURLWatcher[uw]
if swosByKey == nil {
swosByKey = make(map[string][]interface{})
aw.swosByNamespace[namespace] = swosByKey
aw.swosByURLWatcher[uw] = swosByKey
}
if len(swos) > 0 {
aw.swosCount.Add(len(swos) - len(swosByKey[key]))
@ -107,17 +107,17 @@ func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]
aw.swosCount.Add(-len(swosByKey[key]))
delete(swosByKey, key)
}
aw.swosByNamespaceLock.Unlock()
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) removeScrapeWorks(namespace, key string) {
aw.swosByNamespaceLock.Lock()
swosByKey := aw.swosByNamespace[namespace]
func (aw *apiWatcher) removeScrapeWorks(uw *urlWatcher, key string) {
aw.swosByURLWatcherLock.Lock()
swosByKey := aw.swosByURLWatcher[uw]
if len(swosByKey) > 0 {
aw.swosCount.Add(-len(swosByKey[key]))
delete(swosByKey, key)
}
aw.swosByNamespaceLock.Unlock()
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} {
@ -136,17 +136,17 @@ func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string)
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
aw.gw.registerPendingAPIWatchers()
aw.swosByNamespaceLock.Lock()
defer aw.swosByNamespaceLock.Unlock()
aw.swosByURLWatcherLock.Lock()
defer aw.swosByURLWatcherLock.Unlock()
size := 0
for _, swosByKey := range aw.swosByNamespace {
for _, swosByKey := range aw.swosByURLWatcher {
for _, swosLocal := range swosByKey {
size += len(swosLocal)
}
}
swos := make([]interface{}, 0, size)
for _, swosByKey := range aw.swosByNamespace {
for _, swosByKey := range aw.swosByURLWatcher {
for _, swosLocal := range swosByKey {
swos = append(swos, swosLocal...)
}
@ -257,14 +257,14 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
gw.startWatchersForRole("pod", nil)
gw.startWatchersForRole("service", nil)
}
paths, namespaces := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors)
for i, path := range paths {
paths := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors)
for _, path := range paths {
apiURL := gw.apiServer + path
gw.mu.Lock()
uw := gw.m[apiURL]
needStart := uw == nil
if needStart {
uw = newURLWatcher(role, namespaces[i], apiURL, gw)
uw = newURLWatcher(role, apiURL, gw)
gw.m[apiURL] = uw
}
if aw != nil {
@ -352,14 +352,13 @@ type urlWatcher struct {
staleResourceVersions *metrics.Counter
}
func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher {
func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
uw := &urlWatcher{
role: role,
namespace: namespace,
apiURL: apiURL,
gw: gw,
role: role,
apiURL: apiURL,
gw: gw,
parseObject: parseObject,
parseObjectList: parseObjectList,
@ -499,7 +498,7 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatch
}
}
for i, aw := range aws {
aw.reloadScrapeWorks(uw.namespace, swosByKey[i])
aw.reloadScrapeWorks(uw, swosByKey[i])
}
}
@ -589,7 +588,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
if len(uw.aws) > 0 {
labels := o.getTargetLabels(uw.gw)
for aw := range uw.aws {
aw.setScrapeWorks(uw.namespace, key, labels)
aw.setScrapeWorks(uw, key, labels)
}
}
uw.gw.mu.Unlock()
@ -606,7 +605,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
delete(uw.objectsByKey, key)
}
for aw := range uw.aws {
aw.removeScrapeWorks(uw.namespace, key)
aw.removeScrapeWorks(uw, key)
}
uw.gw.mu.Unlock()
case "BOOKMARK":
@ -663,19 +662,19 @@ func parseError(data []byte) (*Error, error) {
return &em, nil
}
func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) ([]string, []string) {
func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) []string {
objectType := getObjectTypeByRole(role)
if objectType == "nodes" || len(namespaces) == 0 {
query := joinSelectors(role, selectors)
path := getAPIPath(objectType, "", query)
return []string{path}, []string{""}
return []string{path}
}
query := joinSelectors(role, selectors)
paths := make([]string, len(namespaces))
for i, namespace := range namespaces {
paths[i] = getAPIPath(objectType, namespace, query)
}
return paths, namespaces
return paths
}
func getAPIPath(objectType, namespace, query string) string {

View file

@ -11,54 +11,51 @@ import (
)
func TestGetAPIPathsWithNamespaces(t *testing.T) {
f := func(role string, namespaces []string, selectors []Selector, expectedPaths, expectedNamespaces []string) {
f := func(role string, namespaces []string, selectors []Selector, expectedPaths []string) {
t.Helper()
paths, resultNamespaces := getAPIPathsWithNamespaces(role, namespaces, selectors)
paths := getAPIPathsWithNamespaces(role, namespaces, selectors)
if !reflect.DeepEqual(paths, expectedPaths) {
t.Fatalf("unexpected paths; got\n%q\nwant\n%q", paths, expectedPaths)
}
if !reflect.DeepEqual(resultNamespaces, expectedNamespaces) {
t.Fatalf("unexpected namespaces; got\n%q\nwant\n%q", resultNamespaces, expectedNamespaces)
}
}
// role=node
f("node", nil, nil, []string{"/api/v1/nodes"}, []string{""})
f("node", []string{"foo", "bar"}, nil, []string{"/api/v1/nodes"}, []string{""})
f("node", nil, nil, []string{"/api/v1/nodes"})
f("node", []string{"foo", "bar"}, nil, []string{"/api/v1/nodes"})
f("node", nil, []Selector{
{
Role: "pod",
Label: "foo",
Field: "bar",
},
}, []string{"/api/v1/nodes"}, []string{""})
}, []string{"/api/v1/nodes"})
f("node", nil, []Selector{
{
Role: "node",
Label: "foo",
Field: "bar",
},
}, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}, []string{""})
}, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"})
f("node", []string{"x", "y"}, []Selector{
{
Role: "node",
Label: "foo",
Field: "bar",
},
}, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}, []string{""})
}, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"})
// role=pod
f("pod", nil, nil, []string{"/api/v1/pods"}, []string{""})
f("pod", nil, nil, []string{"/api/v1/pods"})
f("pod", []string{"foo", "bar"}, nil, []string{
"/api/v1/namespaces/foo/pods",
"/api/v1/namespaces/bar/pods",
}, []string{"foo", "bar"})
})
f("pod", nil, []Selector{
{
Role: "node",
Label: "foo",
},
}, []string{"/api/v1/pods"}, []string{""})
}, []string{"/api/v1/pods"})
f("pod", nil, []Selector{
{
Role: "pod",
@ -69,7 +66,7 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) {
Label: "x",
Field: "y",
},
}, []string{"/api/v1/pods?labelSelector=foo%2Cx&fieldSelector=y"}, []string{""})
}, []string{"/api/v1/pods?labelSelector=foo%2Cx&fieldSelector=y"})
f("pod", []string{"x", "y"}, []Selector{
{
Role: "pod",
@ -83,14 +80,14 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) {
}, []string{
"/api/v1/namespaces/x/pods?labelSelector=foo%2Cx&fieldSelector=y",
"/api/v1/namespaces/y/pods?labelSelector=foo%2Cx&fieldSelector=y",
}, []string{"x", "y"})
})
// role=service
f("service", nil, nil, []string{"/api/v1/services"}, []string{""})
f("service", nil, nil, []string{"/api/v1/services"})
f("service", []string{"x", "y"}, nil, []string{
"/api/v1/namespaces/x/services",
"/api/v1/namespaces/y/services",
}, []string{"x", "y"})
})
f("service", nil, []Selector{
{
Role: "node",
@ -100,7 +97,7 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) {
Role: "service",
Field: "bar",
},
}, []string{"/api/v1/services?fieldSelector=bar"}, []string{""})
}, []string{"/api/v1/services?fieldSelector=bar"})
f("service", []string{"x", "y"}, []Selector{
{
Role: "service",
@ -109,14 +106,14 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) {
}, []string{
"/api/v1/namespaces/x/services?labelSelector=abc%3Dde",
"/api/v1/namespaces/y/services?labelSelector=abc%3Dde",
}, []string{"x", "y"})
})
// role=endpoints
f("endpoints", nil, nil, []string{"/api/v1/endpoints"}, []string{""})
f("endpoints", nil, nil, []string{"/api/v1/endpoints"})
f("endpoints", []string{"x", "y"}, nil, []string{
"/api/v1/namespaces/x/endpoints",
"/api/v1/namespaces/y/endpoints",
}, []string{"x", "y"})
})
f("endpoints", []string{"x", "y"}, []Selector{
{
Role: "endpoints",
@ -129,10 +126,10 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) {
}, []string{
"/api/v1/namespaces/x/endpoints?labelSelector=bbb",
"/api/v1/namespaces/y/endpoints?labelSelector=bbb",
}, []string{"x", "y"})
})
// role=endpointslices
f("endpointslices", nil, nil, []string{"/apis/discovery.k8s.io/v1beta1/endpointslices"}, []string{""})
f("endpointslices", nil, nil, []string{"/apis/discovery.k8s.io/v1beta1/endpointslices"})
f("endpointslices", []string{"x", "y"}, []Selector{
{
Role: "endpointslices",
@ -142,10 +139,10 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) {
}, []string{
"/apis/discovery.k8s.io/v1beta1/namespaces/x/endpointslices?labelSelector=label&fieldSelector=field",
"/apis/discovery.k8s.io/v1beta1/namespaces/y/endpointslices?labelSelector=label&fieldSelector=field",
}, []string{"x", "y"})
})
// role=ingress
f("ingress", nil, nil, []string{"/apis/networking.k8s.io/v1beta1/ingresses"}, []string{""})
f("ingress", nil, nil, []string{"/apis/networking.k8s.io/v1beta1/ingresses"})
f("ingress", []string{"x", "y"}, []Selector{
{
Role: "node",
@ -166,7 +163,7 @@ func TestGetAPIPathsWithNamespaces(t *testing.T) {
}, []string{
"/apis/networking.k8s.io/v1beta1/namespaces/x/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc",
"/apis/networking.k8s.io/v1beta1/namespaces/y/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc",
}, []string{"x", "y"})
})
}
func TestParseBookmark(t *testing.T) {