lib/promscrape/discovery/kubernetes: stop all the url watchers, which belong to a particular groupWatcher, at once

Previously url watchers for pod, service and node objects could be mistakenly closed
when service discovery was set up only for endpoints and endpointslice roles,
since watchers for these roles may start start pod, service and node url watchers
with nil apiWatcher passed to groupWatcher.startWatchersForRole().

Now all the url watchers, which belong to a particular groupWatcher, are stopped at once
when this groupWatcher has no apiWatcher subscribers.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5216

The issue has been introduced in v1.93.5 when addressing https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850
This commit is contained in:
Aliaksandr Valialkin 2023-10-27 13:35:27 +02:00
parent 3362774ab2
commit 67284377f3
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 56 additions and 39 deletions

View file

@ -11,6 +11,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
## v1.93.x long-time support release (LTS)
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly discover Kubernetes targets via [kubernetes_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs). Previously some targets and some labels could be skipped during service discovery because of the bug introduced in [v1.93.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.5) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850). See [tis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5216) for more details.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly parse `ca`, `cert` and `key` options at `tls_config` section inside [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options). Previously string values couldn't be parsed for these options, since the parser was mistakenly expecting a list of `uint8` values instead.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly drop samples if `-streamAggr.dropInput` command-line flag is set and `-remoteWrite.streamAggr.config` contains an empty file. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5207).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not print redundant error logs when failed to scrape consul or nomad target. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5239).

View file

@ -216,6 +216,16 @@ type groupWatcher struct {
mu sync.Mutex
m map[string]*urlWatcher
// cancel is used for stopping all the urlWatcher instances inside m,
// which must check for ctx.Done() when performing their background watch work.
ctx context.Context
cancel context.CancelFunc
// noAPIWatchers is set to true when there are no API watchers for the given groupWatcher.
// This field is used for determining when it is safe to stop all the urlWatcher instances
// for the given groupWatcher.
noAPIWatchers bool
}
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher {
@ -233,15 +243,21 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
},
Timeout: *apiServerTimeout,
}
ctx, cancel := context.WithCancel(context.Background())
return &groupWatcher{
apiServer: apiServer,
namespaces: namespaces,
selectors: selectors,
attachNodeMetadata: attachNodeMetadata,
setHeaders: func(req *http.Request) { ac.SetHeaders(req, true) },
client: client,
m: make(map[string]*urlWatcher),
setHeaders: func(req *http.Request) {
ac.SetHeaders(req, true)
},
client: client,
m: make(map[string]*urlWatcher),
ctx: ctx,
cancel: cancel,
}
}
@ -292,8 +308,25 @@ func groupWatchersCleaner(gws map[string]*groupWatcher) {
groupWatchersLock.Lock()
for key, gw := range gws {
gw.mu.Lock()
if len(gw.m) == 0 {
delete(gws, key)
// Calculate the number of apiWatcher instances subscribed to gw.
awsTotal := 0
for _, uw := range gw.m {
awsTotal += len(uw.aws) + len(uw.awsPending)
}
if awsTotal == 0 {
// There are no API watchers subscribed to gw.
// Stop all the urlWatcher instances at gw and drop gw from gws in this case,
// but do it only on the second iteration in order to reduce urlWatcher churn
// during scrape config reloads.
if gw.noAPIWatchers {
gw.cancel()
delete(gws, key)
} else {
gw.noAPIWatchers = true
}
} else {
gw.noAPIWatchers = false
}
gw.mu.Unlock()
}
@ -440,21 +473,18 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http
func (gw *groupWatcher) registerPendingAPIWatchers() {
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
uw.registerPendingAPIWatchersLocked()
}
gw.mu.Unlock()
}
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
uw.unsubscribeAPIWatcherLocked(aw)
if len(uw.aws)+len(uw.awsPending) == 0 {
time.AfterFunc(10*time.Second, uw.stopIfNoUsers)
}
}
gw.mu.Unlock()
}
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
@ -466,9 +496,6 @@ type urlWatcher struct {
apiURL string
gw *groupWatcher
ctx context.Context
cancel context.CancelFunc
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
@ -500,15 +527,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
ctx, cancel := context.WithCancel(context.Background())
uw := &urlWatcher{
role: role,
apiURL: apiURL,
gw: gw,
ctx: ctx,
cancel: cancel,
parseObject: parseObject,
parseObjectList: parseObjectList,
@ -526,21 +549,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
return uw
}
func (uw *urlWatcher) stopIfNoUsers() {
gw := uw.gw
gw.mu.Lock()
if len(uw.aws)+len(uw.awsPending) == 0 {
uw.cancel()
delete(gw.m, uw.apiURL)
}
gw.mu.Unlock()
}
func (uw *urlWatcher) recreateScrapeWorks() {
const minSleepTime = 5 * time.Second
sleepTime := minSleepTime
gw := uw.gw
stopCh := uw.ctx.Done()
stopCh := gw.ctx.Done()
for {
t := timerpool.Get(sleepTime)
select {
@ -633,6 +646,7 @@ func (uw *urlWatcher) reloadObjects() string {
return uw.resourceVersion
}
gw := uw.gw
startTime := time.Now()
apiURL := uw.apiURL
@ -641,7 +655,7 @@ func (uw *urlWatcher) reloadObjects() string {
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 .
delimiter := getQueryArgsDelimiter(apiURL)
requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan"
resp, err := uw.gw.doRequest(uw.ctx, requestURL)
resp, err := gw.doRequest(gw.ctx, requestURL)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
@ -661,7 +675,7 @@ func (uw *urlWatcher) reloadObjects() string {
return ""
}
uw.gw.mu.Lock()
gw.mu.Lock()
objectsAdded := make(map[string]object)
objectsUpdated := make(map[string]object)
var objectsRemoved []string
@ -692,7 +706,7 @@ func (uw *urlWatcher) reloadObjects() string {
if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 {
uw.maybeUpdateDependedScrapeWorksLocked()
}
uw.gw.mu.Unlock()
gw.mu.Unlock()
uw.objectsUpdated.Add(len(objectsUpdated))
uw.objectsRemoved.Add(len(objectsRemoved))
@ -709,7 +723,8 @@ func (uw *urlWatcher) reloadObjects() string {
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates() {
stopCh := uw.ctx.Done()
gw := uw.gw
stopCh := gw.ctx.Done()
backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second
backoffSleep := func() {
@ -728,7 +743,7 @@ func (uw *urlWatcher) watchForUpdates() {
}
apiURL := uw.apiURL
delimiter := getQueryArgsDelimiter(apiURL)
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
timeoutSeconds := time.Duration(0.9 * float64(gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
select {
@ -745,7 +760,7 @@ func (uw *urlWatcher) watchForUpdates() {
continue
}
requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion)
resp, err := uw.gw.doRequest(uw.ctx, requestURL)
resp, err := gw.doRequest(gw.ctx, requestURL)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
@ -771,7 +786,7 @@ func (uw *urlWatcher) watchForUpdates() {
err = uw.readObjectUpdateStream(resp.Body)
_ = resp.Body.Close()
if err != nil {
if !(errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
uw.resourceVersion = ""
}
@ -783,6 +798,7 @@ func (uw *urlWatcher) watchForUpdates() {
// readObjectUpdateStream reads Kubernetes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
gw := uw.gw
d := json.NewDecoder(r)
var we WatchEvent
for {
@ -796,18 +812,18 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
key := o.key()
uw.gw.mu.Lock()
gw.mu.Lock()
uw.updateObjectLocked(key, o)
uw.gw.mu.Unlock()
gw.mu.Unlock()
case "DELETED":
o, err := uw.parseObject(we.Object)
if err != nil {
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
key := o.key()
uw.gw.mu.Lock()
gw.mu.Lock()
uw.removeObjectLocked(key)
uw.gw.mu.Unlock()
gw.mu.Unlock()
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
bm, err := parseBookmark(we.Object)