diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ee953dd3e..612a58fb2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -10,6 +10,7 @@ sort: 14 * FEATURE: add OpenTSDB migration option to vmctl. See more details [here](https://docs.victoriametrics.com/vmctl#migrating-data-from-opentsdb). Thanks to @johnseekins! +* BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmstorage: remove empty directories on startup. Such directories can be left after unclean shutdown on NFS storage. Previously such directories could lead to crashloop until manually removed. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142). diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 9e1c80830..ed654579a 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -301,6 +301,36 @@ func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) obje return nil } +func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) { + // There is no need in starting url watcher for the given role, + // since there is no (namespace, key) object yet for this role. + // gw.startWatchersForRole(role, nil) + uws := gw.getURLWatchers() + for _, uw := range uws { + if uw.role != role { + // Role mismatch + continue + } + if uw.namespace != "" && uw.namespace != namespace { + // Namespace mismatch + continue + } + var aws []*apiWatcher + uw.mu.Lock() + o := uw.objectsByKey[key] + if o != nil { + aws = uw.getAPIWatchersLocked() + } + uw.mu.Unlock() + if len(aws) > 0 { + labels := o.getTargetLabels(gw) + for _, aw := range aws { + aw.setScrapeWorks(namespace, key, labels) + } + } + } +} + func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { paths, namespaces := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors) for i, path := range paths { @@ -515,10 +545,20 @@ func (uw *urlWatcher) reloadObjects() string { uw.objectsAdded.Add(added) uw.objectsCount.Add(added - removed) uw.resourceVersion = metadata.ResourceVersion - aws := getAPIWatchers(uw.aws) + aws := uw.getAPIWatchersLocked() uw.mu.Unlock() uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey) + if uw.role == "service" { + // Update endpoints and endpointslices for the corresponding service as Prometheus does. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 + gw := uw.gw + namespace := uw.namespace + for key := range objectsByKey { + gw.refreshObjectLabels("endpoints", namespace, key) + gw.refreshObjectLabels("endpointslices", namespace, key) + } + } logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) return uw.resourceVersion } @@ -545,7 +585,8 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objects } } -func getAPIWatchers(awsMap map[*apiWatcher]struct{}) []*apiWatcher { +func (uw *urlWatcher) getAPIWatchersLocked() []*apiWatcher { + awsMap := uw.aws aws := make([]*apiWatcher, 0, len(awsMap)) for aw := range awsMap { aws = append(aws, aw) @@ -636,11 +677,19 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.objectsUpdated.Inc() } uw.objectsByKey[key] = o - aws := getAPIWatchers(uw.aws) + aws := uw.getAPIWatchersLocked() uw.mu.Unlock() - labels := o.getTargetLabels(uw.gw) - for _, aw := range aws { - aw.setScrapeWorks(uw.namespace, key, labels) + if len(aws) > 0 { + labels := o.getTargetLabels(uw.gw) + for _, aw := range aws { + aw.setScrapeWorks(uw.namespace, key, labels) + } + } + if uw.role == "service" { + // Update endpoints and endpointslices for the corresponding service as Prometheus does. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 + uw.gw.refreshObjectLabels("endpoints", uw.namespace, key) + uw.gw.refreshObjectLabels("endpointslices", uw.namespace, key) } case "DELETED": o, err := uw.parseObject(we.Object) @@ -654,7 +703,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.objectsRemoved.Inc() delete(uw.objectsByKey, key) } - aws := getAPIWatchers(uw.aws) + aws := uw.getAPIWatchersLocked() uw.mu.Unlock() for _, aw := range aws { aw.removeScrapeWorks(uw.namespace, key)