mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape/discovery/kubernetes: remove only unused client for API server during cleaning
This commit is contained in:
parent
636e1578de
commit
391fb0903e
1 changed files with 21 additions and 16 deletions
|
@ -56,7 +56,7 @@ func getAPIResponse(cfg *APIConfig, role, path string) ([]byte, error) {
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
|
func getHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) {
|
||||||
k := hcKey{
|
k := hcKey{
|
||||||
apiServer: apiServer,
|
apiServer: apiServer,
|
||||||
ac: ac,
|
ac: ac,
|
||||||
|
@ -68,8 +68,8 @@ func getHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
|
||||||
go hcMapCleaner()
|
go hcMapCleaner()
|
||||||
hasHCMapCleaner = true
|
hasHCMapCleaner = true
|
||||||
}
|
}
|
||||||
hcv, ok := hcMap[k]
|
hcv := hcMap[k]
|
||||||
if !ok {
|
if hcv == nil {
|
||||||
hcvNew, err := newHostClient(apiServer, ac)
|
hcvNew, err := newHostClient(apiServer, ac)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return hcv, fmt.Errorf("cannot create new HTTP client for %q: %s", apiServer, err)
|
return hcv, fmt.Errorf("cannot create new HTTP client for %q: %s", apiServer, err)
|
||||||
|
@ -77,14 +77,19 @@ func getHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
|
||||||
hcMap[k] = hcvNew
|
hcMap[k] = hcvNew
|
||||||
hcv = hcvNew
|
hcv = hcvNew
|
||||||
}
|
}
|
||||||
|
hcv.lastAccessTime = time.Now()
|
||||||
return hcv, nil
|
return hcv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func hcMapCleaner() {
|
func hcMapCleaner() {
|
||||||
tc := time.NewTicker(15 * time.Minute)
|
tc := time.NewTicker(15 * time.Minute)
|
||||||
for range tc.C {
|
for currentTime := range tc.C {
|
||||||
hcMapLock.Lock()
|
hcMapLock.Lock()
|
||||||
hcMap = make(map[hcKey]hcValue)
|
for k, v := range hcMap {
|
||||||
|
if currentTime.Sub(v.lastAccessTime) > 10*time.Minute {
|
||||||
|
delete(hcMap, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
hcMapLock.Unlock()
|
hcMapLock.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -95,31 +100,31 @@ type hcKey struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type hcValue struct {
|
type hcValue struct {
|
||||||
hc *fasthttp.HostClient
|
hc *fasthttp.HostClient
|
||||||
ac *promauth.Config
|
ac *promauth.Config
|
||||||
apiServer string
|
apiServer string
|
||||||
hostPort string
|
hostPort string
|
||||||
|
lastAccessTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
hasHCMapCleaner bool
|
hasHCMapCleaner bool
|
||||||
hcMap = make(map[hcKey]hcValue)
|
hcMap = make(map[hcKey]*hcValue)
|
||||||
hcMapLock sync.Mutex
|
hcMapLock sync.Mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
func newHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
|
func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) {
|
||||||
var hcv hcValue
|
|
||||||
if len(apiServer) == 0 {
|
if len(apiServer) == 0 {
|
||||||
// Assume we run at k8s pod.
|
// Assume we run at k8s pod.
|
||||||
// Discover apiServer and auth config according to k8s docs.
|
// Discover apiServer and auth config according to k8s docs.
|
||||||
// See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller
|
// See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller
|
||||||
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
|
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
|
||||||
if len(host) == 0 {
|
if len(host) == 0 {
|
||||||
return hcv, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " +
|
return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " +
|
||||||
"probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?")
|
"probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?")
|
||||||
}
|
}
|
||||||
if len(port) == 0 {
|
if len(port) == 0 {
|
||||||
return hcv, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+
|
return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_PORT env var; it must be defined when running in k8s; "+
|
||||||
"KUBERNETES_SERVICE_HOST=%q", host)
|
"KUBERNETES_SERVICE_HOST=%q", host)
|
||||||
}
|
}
|
||||||
apiServer = "https://" + net.JoinHostPort(host, port)
|
apiServer = "https://" + net.JoinHostPort(host, port)
|
||||||
|
@ -128,7 +133,7 @@ func newHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
|
||||||
}
|
}
|
||||||
acNew, err := promauth.NewConfig("/", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig)
|
acNew, err := promauth.NewConfig("/", nil, "", "/var/run/secrets/kubernetes.io/serviceaccount/token", &tlsConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return hcv, fmt.Errorf("cannot initialize service account auth: %s; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err)
|
return nil, fmt.Errorf("cannot initialize service account auth: %s; probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?", err)
|
||||||
}
|
}
|
||||||
ac = acNew
|
ac = acNew
|
||||||
}
|
}
|
||||||
|
@ -158,7 +163,7 @@ func newHostClient(apiServer string, ac *promauth.Config) (hcValue, error) {
|
||||||
WriteTimeout: 10 * time.Second,
|
WriteTimeout: 10 * time.Second,
|
||||||
MaxResponseBodySize: 300 * 1024 * 1024,
|
MaxResponseBodySize: 300 * 1024 * 1024,
|
||||||
}
|
}
|
||||||
return hcValue{
|
return &hcValue{
|
||||||
hc: hc,
|
hc: hc,
|
||||||
ac: ac,
|
ac: ac,
|
||||||
apiServer: apiServer,
|
apiServer: apiServer,
|
||||||
|
|
Loading…
Reference in a new issue