lib/promscrape/discovery/kubernetes: fix leaking api watcher (#4861)

* lib/promscrape/discovery/kubernetes: fix leaking api watcher

goroutine which was polling k8s API had no execution control. This leaded to leaking goroutines during config reload.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape/discovery/kubernetes: use reference counting for urlWatcher cleanup

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape/discovery/kubernetes: remove waitgroup sync for goroutines polling API server

This is unnecessary since context will is cancelled and new requests will not be sent. Also, using waitgroup will increase time required to perform reload which might result in missed scrapes.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promscrape/discovery/kubernetes: clarify comment

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* Apply suggestions from code review

* lib/promscrape/discovery/kubernetes: address review feedback

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-09-15 21:40:13 +04:00 committed by Aliaksandr Valialkin
parent a583d2df25
commit 669ad1ca55
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 35 additions and 8 deletions

View file

@ -67,6 +67,7 @@ The v1.93.x line will be supported for at least 12 months since [v1.93.0](https:
* BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): don't interrupt the migration process if no metrics were found for a specific tenant. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4796).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix the thread leak that occurs during the polling of the Kubernetes API server after a configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850).
## [v1.93.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.0)

View file

@ -1,6 +1,7 @@
package kubernetes
import (
"context"
"encoding/json"
"errors"
"flag"
@ -16,11 +17,12 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
)
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server")
@ -402,7 +404,7 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
}
// doRequest performs http request to the given requestURL.
func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http.Response, error) {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 {
// Update networking URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1)
@ -411,7 +413,7 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
// Update discovery URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1)
}
req, err := http.NewRequest(http.MethodGet, requestURL, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
@ -423,11 +425,11 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
if resp.StatusCode == http.StatusNotFound {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 {
atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1)
return gw.doRequest(requestURL)
return gw.doRequest(ctx, requestURL)
}
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 {
atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1)
return gw.doRequest(requestURL)
return gw.doRequest(ctx, requestURL)
}
}
return resp, nil
@ -444,8 +446,12 @@ func (gw *groupWatcher) registerPendingAPIWatchers() {
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
for key, uw := range gw.m {
uw.unsubscribeAPIWatcherLocked(aw)
if (len(uw.aws) + len(uw.awsPending)) == 0 {
uw.cancel()
delete(gw.m, key)
}
}
}
@ -458,6 +464,9 @@ type urlWatcher struct {
apiURL string
gw *groupWatcher
ctx context.Context
cancel context.CancelFunc
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
@ -488,11 +497,17 @@ type urlWatcher struct {
func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
ctx, cancel := context.WithCancel(context.Background())
uw := &urlWatcher{
role: role,
apiURL: apiURL,
gw: gw,
refCount: 0,
ctx: ctx,
cancel: cancel,
parseObject: parseObject,
parseObjectList: parseObjectList,
@ -587,7 +602,7 @@ func (uw *urlWatcher) reloadObjects() string {
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 .
delimiter := getQueryArgsDelimiter(apiURL)
requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan"
resp, err := uw.gw.doRequest(requestURL)
resp, err := uw.gw.doRequest(uw.ctx, requestURL)
if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
return ""
@ -667,13 +682,19 @@ func (uw *urlWatcher) watchForUpdates() {
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
select {
case <-uw.ctx.Done():
return
default:
}
resourceVersion := uw.reloadObjects()
if resourceVersion == "" {
backoffSleep()
continue
}
requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion)
resp, err := uw.gw.doRequest(requestURL)
resp, err := uw.gw.doRequest(uw.ctx, requestURL)
if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
backoffSleep()
@ -823,6 +844,11 @@ func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() {
}
}
// close cancels context used for API polling
func (uw *urlWatcher) close() {
uw.cancel()
}
// Bookmark is a bookmark message from Kubernetes Watch API.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct {