package kubernetes import ( "crypto/tls" "fmt" "net" "os" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/fasthttp" ) // apiConfig contains config for API server type apiConfig struct { client *fasthttp.HostClient server string hostPort string authConfig *promauth.Config namespaces []string selectors []Selector } func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { apiConfigMapLock.Lock() defer apiConfigMapLock.Unlock() if !hasAPIConfigMapCleaner { hasAPIConfigMapCleaner = true go apiConfigMapCleaner() } e := apiConfigMap[sdc] if e != nil { e.lastAccessTime = time.Now() return e.cfg, nil } cfg, err := newAPIConfig(sdc, baseDir) if err != nil { return nil, err } apiConfigMap[sdc] = &apiConfigMapEntry{ cfg: cfg, lastAccessTime: time.Now(), } return cfg, nil } func apiConfigMapCleaner() { tc := time.NewTicker(15 * time.Minute) for currentTime := range tc.C { apiConfigMapLock.Lock() for k, e := range apiConfigMap { if currentTime.Sub(e.lastAccessTime) > 10*time.Minute { delete(apiConfigMap, k) } } apiConfigMapLock.Unlock() } } type apiConfigMapEntry struct { cfg *apiConfig lastAccessTime time.Time } var ( apiConfigMap = make(map[*SDConfig]*apiConfigMapEntry) apiConfigMapLock sync.Mutex hasAPIConfigMapCleaner bool ) func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) if err != nil { return nil, fmt.Errorf("cannot parse auth config: %s", err) } hcv, err := newHostClient(sdc.APIServer, ac) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %s", sdc.APIServer, err) } cfg := &apiConfig{ client: hcv.hc, server: hcv.apiServer, hostPort: hcv.hostPort, authConfig: hcv.ac, namespaces: sdc.Namespaces.Names, selectors: sdc.Selectors, } return cfg, nil } func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) { query := joinSelectors(role, cfg.namespaces, cfg.selectors) if len(query) > 0 { path += "?" + query } requestURL := cfg.server + path var u fasthttp.URI u.Update(requestURL) var req fasthttp.Request req.SetRequestURIBytes(u.RequestURI()) req.SetHost(cfg.hostPort) req.Header.Set("Accept-Encoding", "gzip") if cfg.authConfig != nil && cfg.authConfig.Authorization != "" { req.Header.Set("Authorization", cfg.authConfig.Authorization) } var resp fasthttp.Response // There is no need in calling DoTimeout, since the timeout is already set in hc.ReadTimeout above. if err := cfg.client.Do(&req, &resp); err != nil { return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err) } var data []byte if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" { dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body()) if err != nil { return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err) } data = dst } else { data = append(data[:0], resp.Body()...) } statusCode := resp.StatusCode() if statusCode != fasthttp.StatusOK { return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q", requestURL, statusCode, fasthttp.StatusOK, data) } return data, nil } type hcValue struct { hc *fasthttp.HostClient ac *promauth.Config apiServer string hostPort string } func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) { if len(apiServer) == 0 { // Assume we run at k8s pod. // Discover apiServer and auth config according to k8s docs. // See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller host := os.Getenv("KUBERNETES_SERVICE_HOST") port := os.Getenv("KUBERNETES_SERVICE_PORT") if len(host) == 0 { return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " + "probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?") } if len(port) == 0 { return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+ "KUBERNETES_SERVICE_HOST=%q", host) } apiServer = "https://" + net.JoinHostPort(host, port) tlsConfig := promauth.TLSConfig{ CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", } acNew, err := promauth.NewConfig("/", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig) if err != nil { return nil, fmt.Errorf("cannot initialize service account auth: %s; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err) } ac = acNew } var u fasthttp.URI u.Update(apiServer) hostPort := string(u.Host()) isTLS := string(u.Scheme()) == "https" var tlsCfg *tls.Config if isTLS && ac != nil { tlsCfg = ac.NewTLSConfig() } if !strings.Contains(hostPort, ":") { port := "80" if isTLS { port = "443" } hostPort = net.JoinHostPort(hostPort, port) } hc := &fasthttp.HostClient{ Addr: hostPort, Name: "vm_promscrape/discovery", DialDualStack: netutil.TCP6Enabled(), IsTLS: isTLS, TLSConfig: tlsCfg, ReadTimeout: time.Minute, WriteTimeout: 10 * time.Second, MaxResponseBodySize: 300 * 1024 * 1024, } return &hcValue{ hc: hc, ac: ac, apiServer: apiServer, hostPort: hostPort, }, nil }