lib/promscrape/discovery/kubernetes: load objects missing in local cache from api seriver in getObjectByRole()

This should fix possible race for `role: endpoints` and `role: endpointslices` service discovery,
when the referred `pod` and `service` objects aren't propagated to urlWatcher cache yet.

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details.
This commit is contained in:
Aliaksandr Valialkin 2021-04-05 20:27:23 +03:00
parent 95dbebf512
commit a51d0ec6ec
2 changed files with 61 additions and 12 deletions

View file

@ -223,6 +223,53 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
// this is needed for testing
return nil
}
o := gw.getCachedObjectByRole(role, namespace, name)
if o != nil {
// Fast path: the object has been found in the cache.
return o
}
// The object wasn't found in the cache. Try querying it directly from API server.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details.
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_loads_total{role=%q}`, role)).Inc()
objectType := getObjectTypeByRole(role)
path := getAPIPath(objectType, namespace, "")
path += "/" + name
requestURL := gw.apiServer + path
resp, err := gw.doRequest(requestURL)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("cannot obtain data for object %s (namespace=%q, name=%q): %s", role, namespace, name, err)
return nil
}
data, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("cannot read response from %q: %s", requestURL, err)
return nil
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_misses_total{role=%q}`, role)).Inc()
return nil
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("unexpected status code when reading response from %q; got %d; want %d; response body: %q", requestURL, resp.StatusCode, http.StatusOK, data)
return nil
}
parseObject, _ := getObjectParsersForRole(role)
o, err = parseObject(data)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("cannot parse object obtained from %q: %s; response body: %q", requestURL, err, data)
return nil
}
// There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself.
return o
}
func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object {
key := namespace + "/" + name
gw.startWatchersForRole(role, nil)
gw.mu.Lock()
@ -625,32 +672,32 @@ func parseError(data []byte) (*Error, error) {
}
func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) ([]string, []string) {
objectName := getObjectNameByRole(role)
if objectName == "nodes" || len(namespaces) == 0 {
objectType := getObjectTypeByRole(role)
if objectType == "nodes" || len(namespaces) == 0 {
query := joinSelectors(role, selectors)
path := getAPIPath(objectName, "", query)
path := getAPIPath(objectType, "", query)
return []string{path}, []string{""}
}
query := joinSelectors(role, selectors)
paths := make([]string, len(namespaces))
for i, namespace := range namespaces {
paths[i] = getAPIPath(objectName, namespace, query)
paths[i] = getAPIPath(objectType, namespace, query)
}
return paths, namespaces
}
func getAPIPath(objectName, namespace, query string) string {
suffix := objectName
func getAPIPath(objectType, namespace, query string) string {
suffix := objectType
if namespace != "" {
suffix = "namespaces/" + namespace + "/" + objectName
suffix = "namespaces/" + namespace + "/" + objectType
}
if len(query) > 0 {
suffix += "?" + query
}
if objectName == "ingresses" {
if objectType == "ingresses" {
return "/apis/networking.k8s.io/v1beta1/" + suffix
}
if objectName == "endpointslices" {
if objectType == "endpointslices" {
return "/apis/discovery.k8s.io/v1beta1/" + suffix
}
return "/api/v1/" + suffix
@ -679,7 +726,7 @@ func joinSelectors(role string, selectors []Selector) string {
return strings.Join(args, "&")
}
func getObjectNameByRole(role string) string {
func getObjectTypeByRole(role string) string {
switch role {
case "node":
return "nodes"

View file

@ -139,8 +139,10 @@ func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher,
eas []EndpointAddress, epp EndpointPort, svc *Service, ready string) []map[string]string {
for _, ea := range eas {
var p *Pod
if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
p = o.(*Pod)
if ea.TargetRef.Name != "" {
if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
p = o.(*Pod)
}
}
m := getEndpointLabelsForAddressAndPort(podPortsSeen, eps, ea, epp, p, svc, ready)
ms = append(ms, m)