lib/promscrape/discovery/kubernetes: log errors and stop service discovery when unexpected updates are received from Kubernetes API server

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
This commit is contained in:
Aliaksandr Valialkin 2021-05-18 15:10:45 +03:00
parent 66aba00549
commit 3166994244

View file

@ -412,20 +412,11 @@ func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) {
}
}
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.gw.mu.Lock()
uw.resourceVersion = resourceVersion
uw.gw.mu.Unlock()
}
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func (uw *urlWatcher) reloadObjects() string {
uw.gw.mu.Lock()
resourceVersion := uw.resourceVersion
uw.gw.mu.Unlock()
if resourceVersion != "" {
if uw.resourceVersion != "" {
// Fast path - there is no need in reloading the objects.
return resourceVersion
return uw.resourceVersion
}
requestURL := uw.apiURL
@ -464,13 +455,14 @@ func (uw *urlWatcher) reloadObjects() string {
added++
}
}
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
uw.gw.mu.Unlock()
uw.objectsUpdated.Add(updated)
uw.objectsRemoved.Add(removed)
uw.objectsAdded.Add(added)
uw.objectsCount.Add(added - removed)
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
uw.resourceVersion = metadata.ResourceVersion
uw.gw.mu.Unlock()
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
return uw.resourceVersion
@ -540,7 +532,7 @@ func (uw *urlWatcher) watchForUpdates() {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time.Second
uw.staleResourceVersions.Inc()
uw.setResourceVersion("")
uw.resourceVersion = ""
} else {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
@ -579,9 +571,15 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; !ok {
if we.Type == "MODIFIED" {
return fmt.Errorf("cannot update object %q, since it is missing in local cache", key)
}
uw.objectsCount.Inc()
uw.objectsAdded.Inc()
} else {
if we.Type == "ADDED" {
return fmt.Errorf("cannot add object %q, since it already exists in local cache", key)
}
uw.objectsUpdated.Inc()
}
uw.objectsByKey[key] = o
@ -599,11 +597,12 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
}
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; ok {
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
if _, ok := uw.objectsByKey[key]; !ok {
return fmt.Errorf("cannot delete object %q, since it is missing in local cache", key)
}
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
}
@ -614,7 +613,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
if err != nil {
return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err)
}
uw.setResourceVersion(bm.Metadata.ResourceVersion)
uw.resourceVersion = bm.Metadata.ResourceVersion
case "ERROR":
em, err := parseError(we.Object)
if err != nil {
@ -623,7 +622,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
if em.Code == 410 {
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
uw.staleResourceVersions.Inc()
uw.setResourceVersion("")
uw.resourceVersion = ""
return nil
}
return fmt.Errorf("unexpected error message: %q", we.Object)