lib/promscrape/discovery/kubernetes: fix leaking api watcher (#4861)

* lib/promscrape/discovery/kubernetes: fix leaking api watcher

goroutine which was polling k8s API had no execution control. This leaded to leaking goroutines during config reload.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape/discovery/kubernetes: use reference counting for urlWatcher cleanup

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape/discovery/kubernetes: remove waitgroup sync for goroutines polling API server

This is unnecessary since context will is cancelled and new requests will not be sent. Also, using waitgroup will increase time required to perform reload which might result in missed scrapes.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape/discovery/kubernetes: clarify comment

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* Apply suggestions from code review

* lib/promscrape/discovery/kubernetes: address review feedback

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-09-15 21:40:13 +04:00 committed by Aliaksandr Valialkin
parent c1a8a2d54c
commit 55d25fb844
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 35 additions and 8 deletions

View file

@ -103,6 +103,7 @@ The v1.93.x line will be supported for at least 12 months since [v1.93.0](https:
* BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071). * BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837). * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): don't interrupt the migration process if no metrics were found for a specific tenant. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4796). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): don't interrupt the migration process if no metrics were found for a specific tenant. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4796).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix the thread leak that occurs during the polling of the Kubernetes API server after a configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850).
## [v1.93.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.0) ## [v1.93.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.0)

View file

@ -1,6 +1,7 @@
package kubernetes package kubernetes
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"flag" "flag"
@ -16,11 +17,12 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
) )
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server") var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server")
@ -402,7 +404,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
} }
// doRequest performs http request to the given requestURL. // doRequest performs http request to the given requestURL.
func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http.Response, error) {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 { if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 {
// Update networking URL for old Kubernetes API, which supports only v1beta1 path. // Update networking URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1) requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1)
@ -411,7 +413,7 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
// Update discovery URL for old Kubernetes API, which supports only v1beta1 path. // Update discovery URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1) requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1)
} }
req, err := http.NewRequest(http.MethodGet, requestURL, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
if err != nil { if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err) logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
} }
@ -423,11 +425,11 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
if resp.StatusCode == http.StatusNotFound { if resp.StatusCode == http.StatusNotFound {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 { if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 {
atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1) atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1)
return gw.doRequest(requestURL) return gw.doRequest(ctx, requestURL)
} }
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 { if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 {
atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1) atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1)
return gw.doRequest(requestURL) return gw.doRequest(ctx, requestURL)
} }
} }
return resp, nil return resp, nil
@ -444,8 +446,12 @@ func (gw *groupWatcher) registerPendingAPIWatchers() {
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
gw.mu.Lock() gw.mu.Lock()
defer gw.mu.Unlock() defer gw.mu.Unlock()
for _, uw := range gw.m { for key, uw := range gw.m {
uw.unsubscribeAPIWatcherLocked(aw) uw.unsubscribeAPIWatcherLocked(aw)
if (len(uw.aws) + len(uw.awsPending)) == 0 {
uw.cancel()
delete(gw.m, key)
}
} }
} }
@ -458,6 +464,9 @@ type urlWatcher struct {
apiURL string apiURL string
gw *groupWatcher gw *groupWatcher
ctx context.Context
cancel context.CancelFunc
parseObject parseObjectFunc parseObject parseObjectFunc
parseObjectList parseObjectListFunc parseObjectList parseObjectListFunc
@ -488,11 +497,17 @@ type urlWatcher struct {
func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role) parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
ctx, cancel := context.WithCancel(context.Background())
uw := &urlWatcher{ uw := &urlWatcher{
role: role, role: role,
apiURL: apiURL, apiURL: apiURL,
gw: gw, gw: gw,
refCount: 0,
ctx: ctx,
cancel: cancel,
parseObject: parseObject, parseObject: parseObject,
parseObjectList: parseObjectList, parseObjectList: parseObjectList,
@ -587,7 +602,7 @@ func (uw *urlWatcher) reloadObjects() string {
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 . // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 .
delimiter := getQueryArgsDelimiter(apiURL) delimiter := getQueryArgsDelimiter(apiURL)
requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan" requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan"
resp, err := uw.gw.doRequest(requestURL) resp, err := uw.gw.doRequest(uw.ctx, requestURL)
if err != nil { if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err) logger.Errorf("cannot perform request to %q: %s", requestURL, err)
return "" return ""
@ -667,13 +682,19 @@ func (uw *urlWatcher) watchForUpdates() {
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds() timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for { for {
select {
case <-uw.ctx.Done():
return
default:
}
resourceVersion := uw.reloadObjects() resourceVersion := uw.reloadObjects()
if resourceVersion == "" { if resourceVersion == "" {
backoffSleep() backoffSleep()
continue continue
} }
requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion) requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion)
resp, err := uw.gw.doRequest(requestURL) resp, err := uw.gw.doRequest(uw.ctx, requestURL)
if err != nil { if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err) logger.Errorf("cannot perform request to %q: %s", requestURL, err)
backoffSleep() backoffSleep()
@ -823,6 +844,11 @@ func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() {
} }
} }
// close cancels context used for API polling
func (uw *urlWatcher) close() {
uw.cancel()
}
// Bookmark is a bookmark message from Kubernetes Watch API. // Bookmark is a bookmark message from Kubernetes Watch API.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct { type Bookmark struct {