lib/promscrape/discovery/kubernetes: refresh endpoints and endpointslices scrape targets every 5 seconds, since they may depend on changed service and pod objects

This should make endpoints and endpointslices scrape targets eventually consistent with the maximum delay of 5 seconds after the related service or pod object changes.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
This commit is contained in:
Aliaksandr Valialkin 2021-05-12 14:10:10 +03:00
parent 1d32b008c6
commit 027607db3e

View file

@ -75,11 +75,6 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
}
func (aw *apiWatcher) mustStart() {
if aw.role == "endpoints" || aw.role == "endpointslices" {
// endpoints and endpointslices watchers query pod and service objects. So start watchers for these roles as well.
aw.gw.startWatchersForRole("pod", nil)
aw.gw.startWatchersForRole("service", nil)
}
aw.gw.startWatchersForRole(aw.role, aw)
}
@ -255,37 +250,12 @@ func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) obje
return nil
}
func (gw *groupWatcher) refreshEndpointsLabelsLocked(namespace, key string) {
// Refresh endpoints and endpointslices labels for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
gw.refreshObjectLabelsLocked("endpoints", namespace, key)
gw.refreshObjectLabelsLocked("endpointslices", namespace, key)
}
func (gw *groupWatcher) refreshObjectLabelsLocked(role, namespace, key string) {
for _, uw := range gw.m {
if len(uw.aws) == 0 {
// No apiWatchers to notify
continue
}
if uw.role != role {
// Role mismatch
continue
}
if uw.namespace != "" && uw.namespace != namespace {
// Namespace mismatch
continue
}
if o := uw.objectsByKey[key]; o != nil {
labels := o.getTargetLabels(gw)
for aw := range uw.aws {
aw.setScrapeWorks(namespace, key, labels)
}
}
}
}
func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
if role == "endpoints" || role == "endpointslices" {
// endpoints and endpointslices watchers query pod and service objects. So start watchers for these roles as well.
gw.startWatchersForRole("pod", nil)
gw.startWatchersForRole("service", nil)
}
paths, namespaces := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors)
for i, path := range paths {
apiURL := gw.apiServer + path
@ -303,6 +273,18 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
if needStart {
uw.reloadObjects()
go uw.watchForUpdates()
if role == "endpoints" || role == "endpointslices" {
// Refresh endpoints and enpointslices targets in background, since they depend on other object types such as pod and service.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
go func() {
for {
time.Sleep(5 * time.Second)
gw.mu.Lock()
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
gw.mu.Unlock()
}
}()
}
}
}
}
@ -488,13 +470,6 @@ func (uw *urlWatcher) reloadObjects() string {
uw.objectsCount.Add(added - removed)
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
uw.resourceVersion = metadata.ResourceVersion
if uw.role == "service" {
// Refresh endpoints labels for the corresponding services as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
for key := range objectsByKey {
uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
}
}
uw.gw.mu.Unlock()
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
@ -616,11 +591,6 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
aw.setScrapeWorks(uw.namespace, key, labels)
}
}
if uw.role == "service" {
// Refresh endpoints labels for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
}
uw.gw.mu.Unlock()
case "DELETED":
o, err := uw.parseObject(we.Object)
@ -637,11 +607,6 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
for aw := range uw.aws {
aw.removeScrapeWorks(uw.namespace, key)
}
if uw.role == "service" {
// Refresh endpoints labels for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
}
uw.gw.mu.Unlock()
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks