From 421a92983ae69234f449021cf964a0cf4e7b2a03 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 29 Apr 2021 10:14:24 +0300 Subject: [PATCH] lib/promscrape/discovery/kubernetes: remove a mutex at urlWatcher - use groupWatcher mutex for accessing all the urlWatcher children This simplifies the code a bit and reduces the probability of improper mutex handling and deadlocks. --- .../discovery/kubernetes/api_watcher.go | 194 ++++++------------ .../discovery/kubernetes/endpoints.go | 4 +- .../discovery/kubernetes/endpointslices.go | 4 +- 3 files changed, 62 insertions(+), 140 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 92a9a6cd6..dd7ba75ca 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -33,6 +33,8 @@ type WatchEvent struct { // object is any Kubernetes object. type object interface { key() string + + // getTargetLabels must be called under gw.mu lock. getTargetLabels(gw *groupWatcher) []map[string]string } @@ -231,62 +233,13 @@ var ( }) ) -// getObjectByRole returns an object with the given (namespace, name) key and the given role. -func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { +func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object { if gw == nil { // this is needed for testing return nil } - o := gw.getCachedObjectByRole(role, namespace, name) - if o != nil { - // Fast path: the object has been found in the cache. - return o - } - - // The object wasn't found in the cache. Try querying it directly from API server. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details. - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_loads_total{role=%q}`, role)).Inc() - objectType := getObjectTypeByRole(role) - path := getAPIPath(objectType, namespace, "") - path += "/" + name - requestURL := gw.apiServer + path - resp, err := gw.doRequest(requestURL) - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("cannot obtain data for object %s (namespace=%q, name=%q): %s", role, namespace, name, err) - return nil - } - data, err := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("cannot read response from %q: %s", requestURL, err) - return nil - } - if resp.StatusCode != http.StatusOK { - if resp.StatusCode == http.StatusNotFound { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_misses_total{role=%q}`, role)).Inc() - return nil - } - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("unexpected status code when reading response from %q; got %d; want %d; response body: %q", requestURL, resp.StatusCode, http.StatusOK, data) - return nil - } - parseObject, _ := getObjectParsersForRole(role) - o, err = parseObject(data) - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("cannot parse object obtained from %q: %s; response body: %q", requestURL, err, data) - return nil - } - // There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself. - return o -} - -func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object { key := namespace + "/" + name - uws := gw.getURLWatchers() - for _, uw := range uws { + for _, uw := range gw.m { if uw.role != role { // Role mismatch continue @@ -295,26 +248,26 @@ func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) obje // Namespace mismatch continue } - uw.mu.Lock() - o := uw.objectsByKey[key] - uw.mu.Unlock() - if o != nil { + if o := uw.objectsByKey[key]; o != nil { return o } } return nil } -func (gw *groupWatcher) refreshEndpointsLabels(namespace, key string) { +func (gw *groupWatcher) refreshEndpointsLabelsLocked(namespace, key string) { // Refresh endpoints and endpointslices labels for the corresponding service as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 - gw.refreshObjectLabels("endpoints", namespace, key) - gw.refreshObjectLabels("endpointslices", namespace, key) + gw.refreshObjectLabelsLocked("endpoints", namespace, key) + gw.refreshObjectLabelsLocked("endpointslices", namespace, key) } -func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) { - uws := gw.getURLWatchers() - for _, uw := range uws { +func (gw *groupWatcher) refreshObjectLabelsLocked(role, namespace, key string) { + for _, uw := range gw.m { + if len(uw.aws) == 0 { + // No apiWatchers to notify + continue + } if uw.role != role { // Role mismatch continue @@ -323,16 +276,9 @@ func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) { // Namespace mismatch continue } - var aws []*apiWatcher - uw.mu.Lock() - o := uw.objectsByKey[key] - if o != nil { - aws = uw.getAPIWatchersLocked() - } - uw.mu.Unlock() - if len(aws) > 0 { + if o := uw.objectsByKey[key]; o != nil { labels := o.getTargetLabels(gw) - for _, aw := range aws { + for aw := range uw.aws { aw.setScrapeWorks(namespace, key, labels) } } @@ -350,14 +296,14 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { uw = newURLWatcher(role, namespaces[i], apiURL, gw) gw.m[apiURL] = uw } + if aw != nil { + uw.subscribeAPIWatcherLocked(aw) + } gw.mu.Unlock() if needStart { uw.reloadObjects() go uw.watchForUpdates() } - if aw != nil { - uw.subscribeAPIWatcher(aw) - } } } @@ -374,30 +320,24 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { } func (gw *groupWatcher) registerPendingAPIWatchers() { - uws := gw.getURLWatchers() - for _, uw := range uws { - uw.registerPendingAPIWatchers() - } -} - -func (gw *groupWatcher) getURLWatchers() []*urlWatcher { gw.mu.Lock() - uws := make([]*urlWatcher, 0, len(gw.m)) + defer gw.mu.Unlock() for _, uw := range gw.m { - uws = append(uws, uw) + uw.registerPendingAPIWatchersLocked() } - gw.mu.Unlock() - return uws } func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { - uws := gw.getURLWatchers() - for _, uw := range uws { - uw.unsubscribeAPIWatcher(aw) + gw.mu.Lock() + defer gw.mu.Unlock() + for _, uw := range gw.m { + uw.unsubscribeAPIWatcherLocked(aw) } } // urlWatcher watches for an apiURL and updates object states in objectsByKey. +// +// urlWatcher fields must be accessed under gw.mu lock. type urlWatcher struct { role string namespace string @@ -407,14 +347,11 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc - // mu protects aws, awsPending, objectsByKey and resourceVersion - mu sync.Mutex - // awsPending contains pending apiWatcher objects, which are registered in a batch. // Batch registering saves CPU time needed for registering big number of Kubernetes objects // shared among big number of scrape jobs, since per-object labels are generated only once // for all the scrape jobs (each scrape job is associated with a single apiWatcher). - // See reloadScrapeWorksForAPIWatchers for details. + // See reloadScrapeWorksForAPIWatchersLocked for details. awsPending map[*apiWatcher]struct{} // aws contains registered apiWatcher objects @@ -458,37 +395,31 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher return uw } -func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) { - uw.mu.Lock() +func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) { if _, ok := uw.aws[aw]; !ok { if _, ok := uw.awsPending[aw]; !ok { uw.awsPending[aw] = struct{}{} metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc() } } - uw.mu.Unlock() } -func (uw *urlWatcher) registerPendingAPIWatchers() { - uw.mu.Lock() +func (uw *urlWatcher) registerPendingAPIWatchersLocked() { if len(uw.awsPending) == 0 { - uw.mu.Unlock() return } awsPending := make([]*apiWatcher, 0, len(uw.awsPending)) for aw := range uw.awsPending { awsPending = append(awsPending, aw) - delete(uw.awsPending, aw) uw.aws[aw] = struct{}{} } - uw.reloadScrapeWorksForAPIWatchers(awsPending, uw.objectsByKey) - uw.mu.Unlock() + uw.reloadScrapeWorksForAPIWatchersLocked(uw.awsPending) + uw.awsPending = make(map[*apiWatcher]struct{}) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending)) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending)) } -func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { - uw.mu.Lock() +func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) { if _, ok := uw.awsPending[aw]; ok { delete(uw.awsPending, aw) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec() @@ -497,20 +428,19 @@ func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { delete(uw.aws, aw) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec() } - uw.mu.Unlock() } func (uw *urlWatcher) setResourceVersion(resourceVersion string) { - uw.mu.Lock() + uw.gw.mu.Lock() uw.resourceVersion = resourceVersion - uw.mu.Unlock() + uw.gw.mu.Unlock() } // reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. func (uw *urlWatcher) reloadObjects() string { - uw.mu.Lock() + uw.gw.mu.Lock() resourceVersion := uw.resourceVersion - uw.mu.Unlock() + uw.gw.mu.Unlock() if resourceVersion != "" { // Fast path - there is no need in reloading the objects. return resourceVersion @@ -535,7 +465,7 @@ func (uw *urlWatcher) reloadObjects() string { return "" } - uw.mu.Lock() + uw.gw.mu.Lock() var updated, removed, added int for key := range uw.objectsByKey { if o, ok := objectsByKey[key]; ok { @@ -556,31 +486,34 @@ func (uw *urlWatcher) reloadObjects() string { uw.objectsRemoved.Add(removed) uw.objectsAdded.Add(added) uw.objectsCount.Add(added - removed) + uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws) uw.resourceVersion = metadata.ResourceVersion - aws := uw.getAPIWatchersLocked() - uw.mu.Unlock() - - uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey) if uw.role == "service" { // Refresh endpoints labels for the corresponding services as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 for key := range objectsByKey { - uw.gw.refreshEndpointsLabels(uw.namespace, key) + uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key) } } + uw.gw.mu.Unlock() + logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) return uw.resourceVersion } -func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) { - if len(aws) == 0 { +func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatcher]struct{}) { + if len(awsMap) == 0 { return } + aws := make([]*apiWatcher, 0, len(awsMap)) + for aw := range awsMap { + aws = append(aws, aw) + } swosByKey := make([]map[string][]interface{}, len(aws)) for i := range aws { swosByKey[i] = make(map[string][]interface{}) } - for key, o := range objectsByKey { + for key, o := range uw.objectsByKey { labels := o.getTargetLabels(uw.gw) for i, aw := range aws { swos := aw.getScrapeWorkObjectsForLabels(labels) @@ -594,15 +527,6 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objects } } -func (uw *urlWatcher) getAPIWatchersLocked() []*apiWatcher { - awsMap := uw.aws - aws := make([]*apiWatcher, 0, len(awsMap)) - for aw := range awsMap { - aws = append(aws, aw) - } - return aws -} - // watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes @@ -678,7 +602,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { return err } key := o.key() - uw.mu.Lock() + uw.gw.mu.Lock() if _, ok := uw.objectsByKey[key]; !ok { uw.objectsCount.Inc() uw.objectsAdded.Inc() @@ -686,41 +610,39 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.objectsUpdated.Inc() } uw.objectsByKey[key] = o - aws := uw.getAPIWatchersLocked() - uw.mu.Unlock() - if len(aws) > 0 { + if len(uw.aws) > 0 { labels := o.getTargetLabels(uw.gw) - for _, aw := range aws { + for aw := range uw.aws { aw.setScrapeWorks(uw.namespace, key, labels) } } if uw.role == "service" { // Refresh endpoints labels for the corresponding service as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 - uw.gw.refreshEndpointsLabels(uw.namespace, key) + uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key) } + uw.gw.mu.Unlock() case "DELETED": o, err := uw.parseObject(we.Object) if err != nil { return err } key := o.key() - uw.mu.Lock() + uw.gw.mu.Lock() if _, ok := uw.objectsByKey[key]; ok { uw.objectsCount.Dec() uw.objectsRemoved.Inc() delete(uw.objectsByKey, key) } - aws := uw.getAPIWatchersLocked() - uw.mu.Unlock() - for _, aw := range aws { + for aw := range uw.aws { aw.removeScrapeWorks(uw.namespace, key) } if uw.role == "service" { // Refresh endpoints labels for the corresponding service as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 - uw.gw.refreshEndpointsLabels(uw.namespace, key) + uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key) } + uw.gw.mu.Unlock() case "BOOKMARK": // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks bm, err := parseBookmark(we.Object) diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index af4b16b5e..68b5c3c3d 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -92,7 +92,7 @@ type EndpointPort struct { // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string { var svc *Service - if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { + if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { svc = o.(*Service) } podPortsSeen := make(map[*Pod][]int) @@ -140,7 +140,7 @@ func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher, for _, ea := range eas { var p *Pod if ea.TargetRef.Name != "" { - if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { + if o := gw.getObjectByRoleLocked("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { p = o.(*Pod) } } diff --git a/lib/promscrape/discovery/kubernetes/endpointslices.go b/lib/promscrape/discovery/kubernetes/endpointslices.go index 5e1961e92..4ac24b475 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices.go @@ -39,14 +39,14 @@ func parseEndpointSlice(data []byte) (object, error) { // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string { var svc *Service - if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { + if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { svc = o.(*Service) } podPortsSeen := make(map[*Pod][]int) var ms []map[string]string for _, ess := range eps.Endpoints { var p *Pod - if o := gw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { + if o := gw.getObjectByRoleLocked("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { p = o.(*Pod) } for _, epp := range eps.Ports {