lib/promscrape/discovery/kubernetes: reload objects on object parse error

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
This commit is contained in:
Aliaksandr Valialkin 2021-05-18 23:25:42 +03:00
parent e8a6c6927d
commit eb8093ca6b

View file

@ -548,6 +548,7 @@ func (uw *urlWatcher) watchForUpdates() {
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
uw.resourceVersion = ""
}
backoffSleep()
continue
@ -567,19 +568,19 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
case "ADDED", "MODIFIED":
o, err := uw.parseObject(we.Object)
if err != nil {
return err
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; !ok {
if we.Type == "MODIFIED" {
return fmt.Errorf("cannot update object %q, since it is missing in local cache", key)
// This is expected condition after recovering from the bookmarked resourceVersion.
}
uw.objectsCount.Inc()
uw.objectsAdded.Inc()
} else {
if we.Type == "ADDED" {
return fmt.Errorf("cannot add object %q, since it already exists in local cache", key)
// This is expected condition after recovering from the bookmarked resourceVersion.
}
uw.objectsUpdated.Inc()
}
@ -594,16 +595,17 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
case "DELETED":
o, err := uw.parseObject(we.Object)
if err != nil {
return err
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; !ok {
return fmt.Errorf("cannot delete object %q, since it is missing in local cache", key)
// This is expected condition after recovering from the bookmarked resourceVersion.
} else {
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
}
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
}