From c50fd219dccea9d0c694a800991e43392bb032de Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 4 May 2020 15:53:50 +0300 Subject: [PATCH] lib/promscrape/discovery/kubernetes/: unify apiConfig creation --- lib/promscrape/config.go | 7 +- lib/promscrape/discovery/kubernetes/api.go | 153 ++++++++++-------- .../discovery/kubernetes/endpoints.go | 6 +- .../discovery/kubernetes/ingress.go | 6 +- .../discovery/kubernetes/kubernetes.go | 12 +- lib/promscrape/discovery/kubernetes/pod.go | 6 +- .../discovery/kubernetes/service.go | 6 +- 7 files changed, 101 insertions(+), 95 deletions(-) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 7899f40c6..f41e164e9 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -301,12 +301,7 @@ type scrapeWorkConfig struct { } func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { - ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) - if err != nil { - logger.Errorf("cannot parse auth config for `kubernetes_sd_config` for `job_name` %q: %s; skipping it", swc.jobName, err) - return dst - } - targetLabels, err := kubernetes.GetLabels(ac, sdc) + targetLabels, err := kubernetes.GetLabels(sdc, baseDir) if err != nil { logger.Errorf("error when discovering kubernetes nodes for `job_name` %q: %s; skipping it", swc.jobName, err) return dst diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index 9f6ae7db6..c12cdcb6f 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -16,34 +16,101 @@ import ( // apiConfig contains config for API server type apiConfig struct { - Server string - AuthConfig *promauth.Config - Namespaces []string - Selectors []Selector + client *fasthttp.HostClient + server string + hostPort string + authConfig *promauth.Config + namespaces []string + selectors []Selector } -func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) { - hcv, err := getHostClient(cfg.Server, cfg.AuthConfig) +func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { + apiConfigMapLock.Lock() + defer apiConfigMapLock.Unlock() + + if !hasAPIConfigMapCleaner { + hasAPIConfigMapCleaner = true + go apiConfigMapCleaner() + } + + e := apiConfigMap[sdc] + if e != nil { + e.lastAccessTime = time.Now() + return e.cfg, nil + } + cfg, err := newAPIConfig(sdc, baseDir) if err != nil { return nil, err } - query := joinSelectors(role, cfg.Namespaces, cfg.Selectors) + apiConfigMap[sdc] = &apiConfigMapEntry{ + cfg: cfg, + lastAccessTime: time.Now(), + } + return cfg, nil +} + +func apiConfigMapCleaner() { + tc := time.NewTicker(15 * time.Minute) + for currentTime := range tc.C { + apiConfigMapLock.Lock() + for k, e := range apiConfigMap { + if currentTime.Sub(e.lastAccessTime) > 10*time.Minute { + delete(apiConfigMap, k) + } + } + apiConfigMapLock.Unlock() + } +} + +type apiConfigMapEntry struct { + cfg *apiConfig + lastAccessTime time.Time +} + +var ( + apiConfigMap = make(map[*SDConfig]*apiConfigMapEntry) + apiConfigMapLock sync.Mutex + hasAPIConfigMapCleaner bool +) + +func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { + ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) + if err != nil { + return nil, fmt.Errorf("cannot parse auth config: %s", err) + } + hcv, err := newHostClient(sdc.APIServer, ac) + if err != nil { + return nil, fmt.Errorf("cannot create HTTP client for %q: %s", sdc.APIServer, err) + } + cfg := &apiConfig{ + client: hcv.hc, + server: hcv.apiServer, + hostPort: hcv.hostPort, + authConfig: hcv.ac, + namespaces: sdc.Namespaces.Names, + selectors: sdc.Selectors, + } + return cfg, nil +} + +func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) { + query := joinSelectors(role, cfg.namespaces, cfg.selectors) if len(query) > 0 { path += "?" + query } - requestURL := hcv.apiServer + path + requestURL := cfg.server + path var u fasthttp.URI u.Update(requestURL) var req fasthttp.Request req.SetRequestURIBytes(u.RequestURI()) - req.SetHost(hcv.hostPort) + req.SetHost(cfg.hostPort) req.Header.Set("Accept-Encoding", "gzip") - if hcv.ac != nil && hcv.ac.Authorization != "" { - req.Header.Set("Authorization", hcv.ac.Authorization) + if cfg.authConfig != nil && cfg.authConfig.Authorization != "" { + req.Header.Set("Authorization", cfg.authConfig.Authorization) } var resp fasthttp.Response // There is no need in calling DoTimeout, since the timeout is already set in hc.ReadTimeout above. - if err := hcv.hc.Do(&req, &resp); err != nil { + if err := cfg.client.Do(&req, &resp); err != nil { return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err) } var data []byte @@ -64,67 +131,13 @@ func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) { return data, nil } -func getHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) { - if len(apiServer) == 0 { - // ac is ignored when apiServer should be auto-discovered when running inside k8s pod. - ac = nil - } - k := hcKey{ - apiServer: apiServer, - ac: ac, - } - hcMapLock.Lock() - defer hcMapLock.Unlock() - - if !hasHCMapCleaner { - go hcMapCleaner() - hasHCMapCleaner = true - } - hcv := hcMap[k] - if hcv == nil { - hcvNew, err := newHostClient(apiServer, ac) - if err != nil { - return hcv, fmt.Errorf("cannot create new HTTP client for %q: %s", apiServer, err) - } - hcMap[k] = hcvNew - hcv = hcvNew - } - hcv.lastAccessTime = time.Now() - return hcv, nil -} - -func hcMapCleaner() { - tc := time.NewTicker(15 * time.Minute) - for currentTime := range tc.C { - hcMapLock.Lock() - for k, v := range hcMap { - if currentTime.Sub(v.lastAccessTime) > 10*time.Minute { - delete(hcMap, k) - } - } - hcMapLock.Unlock() - } -} - -type hcKey struct { - apiServer string - ac *promauth.Config -} - type hcValue struct { - hc *fasthttp.HostClient - ac *promauth.Config - apiServer string - hostPort string - lastAccessTime time.Time + hc *fasthttp.HostClient + ac *promauth.Config + apiServer string + hostPort string } -var ( - hasHCMapCleaner bool - hcMap = make(map[hcKey]*hcValue) - hcMapLock sync.Mutex -) - func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) { if len(apiServer) == 0 { // Assume we run at k8s pod. diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index 9d48b7e09..40b9c6b83 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -29,14 +29,14 @@ func getEndpointsLabels(cfg *apiConfig) ([]map[string]string, error) { } func getEndpoints(cfg *apiConfig) ([]Endpoints, error) { - if len(cfg.Namespaces) == 0 { + if len(cfg.namespaces) == 0 { return getEndpointsByPath(cfg, "/api/v1/endpoints") } // Query /api/v1/namespaces/* for each namespace. // This fixes authorization issue at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/432 cfgCopy := *cfg - namespaces := cfgCopy.Namespaces - cfgCopy.Namespaces = nil + namespaces := cfgCopy.namespaces + cfgCopy.namespaces = nil cfg = &cfgCopy var result []Endpoints for _, ns := range namespaces { diff --git a/lib/promscrape/discovery/kubernetes/ingress.go b/lib/promscrape/discovery/kubernetes/ingress.go index c90035913..b8f479772 100644 --- a/lib/promscrape/discovery/kubernetes/ingress.go +++ b/lib/promscrape/discovery/kubernetes/ingress.go @@ -19,14 +19,14 @@ func getIngressesLabels(cfg *apiConfig) ([]map[string]string, error) { } func getIngresses(cfg *apiConfig) ([]Ingress, error) { - if len(cfg.Namespaces) == 0 { + if len(cfg.namespaces) == 0 { return getIngressesByPath(cfg, "/apis/extensions/v1beta1/ingresses") } // Query /api/v1/namespaces/* for each namespace. // This fixes authorization issue at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/432 cfgCopy := *cfg - namespaces := cfgCopy.Namespaces - cfgCopy.Namespaces = nil + namespaces := cfgCopy.namespaces + cfgCopy.namespaces = nil cfg = &cfgCopy var result []Ingress for _, ns := range namespaces { diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index fd1af2a3e..320351626 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -35,13 +35,11 @@ type Selector struct { Field string `yaml:"field"` } -// GetLabels returns labels for the given k8s role and the given ac and sdc. -func GetLabels(ac *promauth.Config, sdc *SDConfig) ([]map[string]string, error) { - cfg := &apiConfig{ - Server: sdc.APIServer, - AuthConfig: ac, - Namespaces: sdc.Namespaces.Names, - Selectors: sdc.Selectors, +// GetLabels returns labels for the given sdc and baseDir. +func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) { + cfg, err := getAPIConfig(sdc, baseDir) + if err != nil { + return nil, fmt.Errorf("cannot create API config: %s", err) } switch sdc.Role { case "node": diff --git a/lib/promscrape/discovery/kubernetes/pod.go b/lib/promscrape/discovery/kubernetes/pod.go index 3220faab3..521c57a25 100644 --- a/lib/promscrape/discovery/kubernetes/pod.go +++ b/lib/promscrape/discovery/kubernetes/pod.go @@ -23,14 +23,14 @@ func getPodsLabels(cfg *apiConfig) ([]map[string]string, error) { } func getPods(cfg *apiConfig) ([]Pod, error) { - if len(cfg.Namespaces) == 0 { + if len(cfg.namespaces) == 0 { return getPodsByPath(cfg, "/api/v1/pods") } // Query /api/v1/namespaces/* for each namespace. // This fixes authorization issue at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/432 cfgCopy := *cfg - namespaces := cfgCopy.Namespaces - cfgCopy.Namespaces = nil + namespaces := cfgCopy.namespaces + cfgCopy.namespaces = nil cfg = &cfgCopy var result []Pod for _, ns := range namespaces { diff --git a/lib/promscrape/discovery/kubernetes/service.go b/lib/promscrape/discovery/kubernetes/service.go index 53c69172b..f14c8c9d1 100644 --- a/lib/promscrape/discovery/kubernetes/service.go +++ b/lib/promscrape/discovery/kubernetes/service.go @@ -21,14 +21,14 @@ func getServicesLabels(cfg *apiConfig) ([]map[string]string, error) { } func getServices(cfg *apiConfig) ([]Service, error) { - if len(cfg.Namespaces) == 0 { + if len(cfg.namespaces) == 0 { return getServicesByPath(cfg, "/api/v1/services") } // Query /api/v1/namespaces/* for each namespace. // This fixes authorization issue at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/432 cfgCopy := *cfg - namespaces := cfgCopy.Namespaces - cfgCopy.Namespaces = nil + namespaces := cfgCopy.namespaces + cfgCopy.namespaces = nil cfg = &cfgCopy var result []Service for _, ns := range namespaces {