From eb8093ca6bd1725c188f2e2da9c9f124bc529e32 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Tue, 18 May 2021 23:25:42 +0300 Subject: [PATCH] lib/promscrape/discovery/kubernetes: reload objects on object parse error Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 --- .../discovery/kubernetes/api_watcher.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 6d2ca76660..98ac71d139 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -548,6 +548,7 @@ func (uw *urlWatcher) watchForUpdates() { if err != nil { if !errors.Is(err, io.EOF) { logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) + uw.resourceVersion = "" } backoffSleep() continue @@ -567,19 +568,19 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { case "ADDED", "MODIFIED": o, err := uw.parseObject(we.Object) if err != nil { - return err + return fmt.Errorf("cannot parse %s object: %w", we.Type, err) } key := o.key() uw.gw.mu.Lock() if _, ok := uw.objectsByKey[key]; !ok { if we.Type == "MODIFIED" { - return fmt.Errorf("cannot update object %q, since it is missing in local cache", key) + // This is expected condition after recovering from the bookmarked resourceVersion. } uw.objectsCount.Inc() uw.objectsAdded.Inc() } else { if we.Type == "ADDED" { - return fmt.Errorf("cannot add object %q, since it already exists in local cache", key) + // This is expected condition after recovering from the bookmarked resourceVersion. } uw.objectsUpdated.Inc() } @@ -594,16 +595,17 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { case "DELETED": o, err := uw.parseObject(we.Object) if err != nil { - return err + return fmt.Errorf("cannot parse %s object: %w", we.Type, err) } key := o.key() uw.gw.mu.Lock() if _, ok := uw.objectsByKey[key]; !ok { - return fmt.Errorf("cannot delete object %q, since it is missing in local cache", key) + // This is expected condition after recovering from the bookmarked resourceVersion. + } else { + uw.objectsCount.Dec() + uw.objectsRemoved.Inc() + delete(uw.objectsByKey, key) } - uw.objectsCount.Dec() - uw.objectsRemoved.Inc() - delete(uw.objectsByKey, key) for aw := range uw.aws { aw.removeScrapeWorks(uw, key) }