From 9761ffd1611b1268bda22061a41cc04cbdf3b6ac Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Fri, 26 Mar 2021 12:28:10 +0200
Subject: [PATCH] lib/promscrape/discovery/kubernetes: properly handle `too old
 resource version` error message from Kubernetes watch API

---
 docs/CHANGELOG.md                             |  2 +
 .../discovery/kubernetes/api_watcher.go       | 54 ++++++++++++++-----
 2 files changed, 43 insertions(+), 13 deletions(-)

diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 22727f9d24..aef0e210e5 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -11,10 +11,12 @@
 * FEATURE: accept and enforce `extra_label=<label_name>=<label_value>` query arg at [Graphite APIs](https://victoriametrics.github.io/#graphite-api-usage).
 * FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139
 * FEATURE: vmagent: add `-promscrape.consul.waitTime` command-line flag for tuning the maximum wait time for Consul service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1144).
+* FEATURE: vmagent: add `vm_promscrape_discovery_kubernetes_stale_resource_versions_total` metric for monitoring the frequency of `too old resource version` errors during Kubernetes service discovery.
 
 * BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars.
 * BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars.
 * BUGFIX: do not merge duplicate time series during requests to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1141
+* BUGFIX: vmagent: properly handle `too old resource version` error messages from Kubernetes watch API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1150
 
 
 # [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0)
diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go
index d0cc9bb6ca..68902587aa 100644
--- a/lib/promscrape/discovery/kubernetes/api_watcher.go
+++ b/lib/promscrape/discovery/kubernetes/api_watcher.go
@@ -308,10 +308,11 @@ type urlWatcher struct {
 
 	resourceVersion string
 
-	objectsCount   *metrics.Counter
-	objectsAdded   *metrics.Counter
-	objectsRemoved *metrics.Counter
-	objectsUpdated *metrics.Counter
+	objectsCount          *metrics.Counter
+	objectsAdded          *metrics.Counter
+	objectsRemoved        *metrics.Counter
+	objectsUpdated        *metrics.Counter
+	staleResourceVersions *metrics.Counter
 }
 
 func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
@@ -329,10 +330,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
 		awsPending:   make(map[*apiWatcher]struct{}),
 		objectsByKey: make(map[string]object),
 
-		objectsCount:   metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
-		objectsAdded:   metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
-		objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
-		objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
+		objectsCount:          metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
+		objectsAdded:          metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
+		objectsRemoved:        metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
+		objectsUpdated:        metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
+		staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)),
 	}
 	logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
 	go uw.watchForUpdates()
@@ -502,14 +504,15 @@ func (uw *urlWatcher) watchForUpdates() {
 			continue
 		}
 		if resp.StatusCode != http.StatusOK {
-			body, _ := ioutil.ReadAll(resp.Body)
-			_ = resp.Body.Close()
-			logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
 			if resp.StatusCode == 410 {
 				// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
 				backoffDelay = time.Second
+				uw.staleResourceVersions.Inc()
 				uw.setResourceVersion("")
 			} else {
+				body, _ := ioutil.ReadAll(resp.Body)
+				_ = resp.Body.Close()
+				logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
 				backoffSleep()
 			}
 			continue
@@ -580,13 +583,25 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
 				return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err)
 			}
 			uw.setResourceVersion(bm.Metadata.ResourceVersion)
+		case "ERROR":
+			em, err := parseError(we.Object)
+			if err != nil {
+				return fmt.Errorf("cannot parse error message for from %q: %w", we.Object, err)
+			}
+			if em.Code == 410 {
+				// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
+				uw.staleResourceVersions.Inc()
+				uw.setResourceVersion("")
+				return nil
+			}
+			return fmt.Errorf("unexpected error message: %q", we.Object)
 		default:
-			return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)
+			return fmt.Errorf("unexpected WatchEvent type %q: %q", we.Type, we.Object)
 		}
 	}
 }
 
-// Bookmark is a bookmark from Kubernetes Watch API.
+// Bookmark is a bookmark message from Kubernetes Watch API.
 // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
 type Bookmark struct {
 	Metadata struct {
@@ -602,6 +617,19 @@ func parseBookmark(data []byte) (*Bookmark, error) {
 	return &bm, nil
 }
 
+// Error is an error message from Kubernetes Watch API.
+type Error struct {
+	Code int
+}
+
+func parseError(data []byte) (*Error, error) {
+	var em Error
+	if err := json.Unmarshal(data, &em); err != nil {
+		return nil, err
+	}
+	return &em, nil
+}
+
 func getAPIPaths(role string, namespaces []string, selectors []Selector) []string {
 	objectName := getObjectNameByRole(role)
 	if objectName == "nodes" || len(namespaces) == 0 {