diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3cbcb68f2..bdd5081ef 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -64,6 +64,7 @@ The sandbox cluster installation is running under the constant load generated by * BUGFIX: dashboards/vmalert: properly apply time series filter for panel `No data errors`. Before, the panel didn't respect `job` or `instance` filters. * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly discover Kubernetes targets via [kubernetes_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs). Previously some targets and some labels could be skipped during service discovery because of the bug introduced in [v1.93.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.5) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850). See [tis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5216) for more details. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not scrape targets if the corresponding [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) refer to files with invalid auth configs. Previously the targets were scraped without properly set auth headers in this case. Now targets are scraped only after the files are updated with valid auth configs. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly parse `ca`, `cert` and `key` options at `tls_config` section inside [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options). Previously string values couldn't be parsed for these options, since the parser was mistakenly expecting a list of `uint8` values instead. diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 39b017204..5894f6107 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -216,6 +216,16 @@ type groupWatcher struct { mu sync.Mutex m map[string]*urlWatcher + + // cancel is used for stopping all the urlWatcher instances inside m, + // which must check for ctx.Done() when performing their background watch work. + ctx context.Context + cancel context.CancelFunc + + // noAPIWatchers is set to true when there are no API watchers for the given groupWatcher. + // This field is used for determining when it is safe to stop all the urlWatcher instances + // for the given groupWatcher. + noAPIWatchers bool } func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher { @@ -237,6 +247,7 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, }, Timeout: *apiServerTimeout, } + ctx, cancel := context.WithCancel(context.Background()) return &groupWatcher{ apiServer: apiServer, namespaces: namespaces, @@ -248,6 +259,9 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, }, client: client, m: make(map[string]*urlWatcher), + + ctx: ctx, + cancel: cancel, } } @@ -298,8 +312,25 @@ func groupWatchersCleaner(gws map[string]*groupWatcher) { groupWatchersLock.Lock() for key, gw := range gws { gw.mu.Lock() - if len(gw.m) == 0 { - delete(gws, key) + // Calculate the number of apiWatcher instances subscribed to gw. + awsTotal := 0 + for _, uw := range gw.m { + awsTotal += len(uw.aws) + len(uw.awsPending) + } + + if awsTotal == 0 { + // There are no API watchers subscribed to gw. + // Stop all the urlWatcher instances at gw and drop gw from gws in this case, + // but do it only on the second iteration in order to reduce urlWatcher churn + // during scrape config reloads. + if gw.noAPIWatchers { + gw.cancel() + delete(gws, key) + } else { + gw.noAPIWatchers = true + } + } else { + gw.noAPIWatchers = false } gw.mu.Unlock() } @@ -448,21 +479,18 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http func (gw *groupWatcher) registerPendingAPIWatchers() { gw.mu.Lock() - defer gw.mu.Unlock() for _, uw := range gw.m { uw.registerPendingAPIWatchersLocked() } + gw.mu.Unlock() } func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { gw.mu.Lock() - defer gw.mu.Unlock() for _, uw := range gw.m { uw.unsubscribeAPIWatcherLocked(aw) - if len(uw.aws)+len(uw.awsPending) == 0 { - time.AfterFunc(10*time.Second, uw.stopIfNoUsers) - } } + gw.mu.Unlock() } // urlWatcher watches for an apiURL and updates object states in objectsByKey. @@ -474,9 +502,6 @@ type urlWatcher struct { apiURL string gw *groupWatcher - ctx context.Context - cancel context.CancelFunc - parseObject parseObjectFunc parseObjectList parseObjectListFunc @@ -508,15 +533,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { parseObject, parseObjectList := getObjectParsersForRole(role) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() - ctx, cancel := context.WithCancel(context.Background()) uw := &urlWatcher{ role: role, apiURL: apiURL, gw: gw, - ctx: ctx, - cancel: cancel, - parseObject: parseObject, parseObjectList: parseObjectList, @@ -534,21 +555,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { return uw } -func (uw *urlWatcher) stopIfNoUsers() { - gw := uw.gw - gw.mu.Lock() - if len(uw.aws)+len(uw.awsPending) == 0 { - uw.cancel() - delete(gw.m, uw.apiURL) - } - gw.mu.Unlock() -} - func (uw *urlWatcher) recreateScrapeWorks() { const minSleepTime = 5 * time.Second sleepTime := minSleepTime gw := uw.gw - stopCh := uw.ctx.Done() + stopCh := gw.ctx.Done() for { t := timerpool.Get(sleepTime) select { @@ -641,6 +652,7 @@ func (uw *urlWatcher) reloadObjects() string { return uw.resourceVersion } + gw := uw.gw startTime := time.Now() apiURL := uw.apiURL @@ -649,7 +661,7 @@ func (uw *urlWatcher) reloadObjects() string { // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 . delimiter := getQueryArgsDelimiter(apiURL) requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan" - resp, err := uw.gw.doRequest(uw.ctx, requestURL) + resp, err := gw.doRequest(gw.ctx, requestURL) if err != nil { if !errors.Is(err, context.Canceled) { logger.Errorf("cannot perform request to %q: %s", requestURL, err) @@ -669,7 +681,7 @@ func (uw *urlWatcher) reloadObjects() string { return "" } - uw.gw.mu.Lock() + gw.mu.Lock() objectsAdded := make(map[string]object) objectsUpdated := make(map[string]object) var objectsRemoved []string @@ -700,7 +712,7 @@ func (uw *urlWatcher) reloadObjects() string { if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 { uw.maybeUpdateDependedScrapeWorksLocked() } - uw.gw.mu.Unlock() + gw.mu.Unlock() uw.objectsUpdated.Add(len(objectsUpdated)) uw.objectsRemoved.Add(len(objectsRemoved)) @@ -717,7 +729,8 @@ func (uw *urlWatcher) reloadObjects() string { // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes func (uw *urlWatcher) watchForUpdates() { - stopCh := uw.ctx.Done() + gw := uw.gw + stopCh := gw.ctx.Done() backoffDelay := time.Second maxBackoffDelay := 30 * time.Second backoffSleep := func() { @@ -736,7 +749,7 @@ func (uw *urlWatcher) watchForUpdates() { } apiURL := uw.apiURL delimiter := getQueryArgsDelimiter(apiURL) - timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds() + timeoutSeconds := time.Duration(0.9 * float64(gw.client.Timeout)).Seconds() apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) for { select { @@ -753,7 +766,7 @@ func (uw *urlWatcher) watchForUpdates() { continue } requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion) - resp, err := uw.gw.doRequest(uw.ctx, requestURL) + resp, err := gw.doRequest(gw.ctx, requestURL) if err != nil { if !errors.Is(err, context.Canceled) { logger.Errorf("cannot perform request to %q: %s", requestURL, err) @@ -779,7 +792,7 @@ func (uw *urlWatcher) watchForUpdates() { err = uw.readObjectUpdateStream(resp.Body) _ = resp.Body.Close() if err != nil { - if !(errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) { + if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) uw.resourceVersion = "" } @@ -791,6 +804,7 @@ func (uw *urlWatcher) watchForUpdates() { // readObjectUpdateStream reads Kubernetes watch events from r and updates locally cached objects according to the received events. func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { + gw := uw.gw d := json.NewDecoder(r) var we WatchEvent for { @@ -804,18 +818,18 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { return fmt.Errorf("cannot parse %s object: %w", we.Type, err) } key := o.key() - uw.gw.mu.Lock() + gw.mu.Lock() uw.updateObjectLocked(key, o) - uw.gw.mu.Unlock() + gw.mu.Unlock() case "DELETED": o, err := uw.parseObject(we.Object) if err != nil { return fmt.Errorf("cannot parse %s object: %w", we.Type, err) } key := o.key() - uw.gw.mu.Lock() + gw.mu.Lock() uw.removeObjectLocked(key) - uw.gw.mu.Unlock() + gw.mu.Unlock() case "BOOKMARK": // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks bm, err := parseBookmark(we.Object)