From a51d0ec6ec1d5c01ed322122cf7eab421d3a2b8e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 5 Apr 2021 20:27:23 +0300 Subject: [PATCH] lib/promscrape/discovery/kubernetes: load objects missing in local cache from api seriver in getObjectByRole() This should fix possible race for `role: endpoints` and `role: endpointslices` service discovery, when the referred `pod` and `service` objects aren't propagated to urlWatcher cache yet. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details. --- .../discovery/kubernetes/api_watcher.go | 67 ++++++++++++++++--- .../discovery/kubernetes/endpoints.go | 6 +- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index eb27c1040..aa12daad5 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -223,6 +223,53 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { // this is needed for testing return nil } + o := gw.getCachedObjectByRole(role, namespace, name) + if o != nil { + // Fast path: the object has been found in the cache. + return o + } + + // The object wasn't found in the cache. Try querying it directly from API server. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details. + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_loads_total{role=%q}`, role)).Inc() + objectType := getObjectTypeByRole(role) + path := getAPIPath(objectType, namespace, "") + path += "/" + name + requestURL := gw.apiServer + path + resp, err := gw.doRequest(requestURL) + if err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() + logger.Errorf("cannot obtain data for object %s (namespace=%q, name=%q): %s", role, namespace, name, err) + return nil + } + data, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() + logger.Errorf("cannot read response from %q: %s", requestURL, err) + return nil + } + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_misses_total{role=%q}`, role)).Inc() + return nil + } + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() + logger.Errorf("unexpected status code when reading response from %q; got %d; want %d; response body: %q", requestURL, resp.StatusCode, http.StatusOK, data) + return nil + } + parseObject, _ := getObjectParsersForRole(role) + o, err = parseObject(data) + if err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() + logger.Errorf("cannot parse object obtained from %q: %s; response body: %q", requestURL, err, data) + return nil + } + // There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself. + return o +} + +func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object { key := namespace + "/" + name gw.startWatchersForRole(role, nil) gw.mu.Lock() @@ -625,32 +672,32 @@ func parseError(data []byte) (*Error, error) { } func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) ([]string, []string) { - objectName := getObjectNameByRole(role) - if objectName == "nodes" || len(namespaces) == 0 { + objectType := getObjectTypeByRole(role) + if objectType == "nodes" || len(namespaces) == 0 { query := joinSelectors(role, selectors) - path := getAPIPath(objectName, "", query) + path := getAPIPath(objectType, "", query) return []string{path}, []string{""} } query := joinSelectors(role, selectors) paths := make([]string, len(namespaces)) for i, namespace := range namespaces { - paths[i] = getAPIPath(objectName, namespace, query) + paths[i] = getAPIPath(objectType, namespace, query) } return paths, namespaces } -func getAPIPath(objectName, namespace, query string) string { - suffix := objectName +func getAPIPath(objectType, namespace, query string) string { + suffix := objectType if namespace != "" { - suffix = "namespaces/" + namespace + "/" + objectName + suffix = "namespaces/" + namespace + "/" + objectType } if len(query) > 0 { suffix += "?" + query } - if objectName == "ingresses" { + if objectType == "ingresses" { return "/apis/networking.k8s.io/v1beta1/" + suffix } - if objectName == "endpointslices" { + if objectType == "endpointslices" { return "/apis/discovery.k8s.io/v1beta1/" + suffix } return "/api/v1/" + suffix @@ -679,7 +726,7 @@ func joinSelectors(role string, selectors []Selector) string { return strings.Join(args, "&") } -func getObjectNameByRole(role string) string { +func getObjectTypeByRole(role string) string { switch role { case "node": return "nodes" diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index 805a88b01..af4b16b5e 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -139,8 +139,10 @@ func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher, eas []EndpointAddress, epp EndpointPort, svc *Service, ready string) []map[string]string { for _, ea := range eas { var p *Pod - if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { - p = o.(*Pod) + if ea.TargetRef.Name != "" { + if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { + p = o.(*Pod) + } } m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready) ms = append(ms, m)