lib/promscrape/discovery/kubernetes: properly handle too old resource version error message from Kubernetes watch API

This commit is contained in:
Aliaksandr Valialkin 2021-03-26 12:28:10 +02:00
parent 9c2be144cf
commit a920e71809
2 changed files with 43 additions and 13 deletions

View file

@ -11,10 +11,12 @@
* FEATURE: accept and enforce `extra_label=<label_name>=<label_value>` query arg at [Graphite APIs](https://victoriametrics.github.io/#graphite-api-usage).
* FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139
* FEATURE: vmagent: add `-promscrape.consul.waitTime` command-line flag for tuning the maximum wait time for Consul service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1144).
* FEATURE: vmagent: add `vm_promscrape_discovery_kubernetes_stale_resource_versions_total` metric for monitoring the frequency of `too old resource version` errors during Kubernetes service discovery.
* BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars.
* BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars.
* BUGFIX: do not merge duplicate time series during requests to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1141
* BUGFIX: vmagent: properly handle `too old resource version` error messages from Kubernetes watch API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1150
# [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0)

View file

@ -308,10 +308,11 @@ type urlWatcher struct {
resourceVersion string
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsUpdated *metrics.Counter
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsUpdated *metrics.Counter
staleResourceVersions *metrics.Counter
}
func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
@ -329,10 +330,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
awsPending: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)),
}
logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
go uw.watchForUpdates()
@ -502,14 +504,15 @@ func (uw *urlWatcher) watchForUpdates() {
continue
}
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
if resp.StatusCode == 410 {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time.Second
uw.staleResourceVersions.Inc()
uw.setResourceVersion("")
} else {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
backoffSleep()
}
continue
@ -580,13 +583,25 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err)
}
uw.setResourceVersion(bm.Metadata.ResourceVersion)
case "ERROR":
em, err := parseError(we.Object)
if err != nil {
return fmt.Errorf("cannot parse error message for from %q: %w", we.Object, err)
}
if em.Code == 410 {
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
uw.staleResourceVersions.Inc()
uw.setResourceVersion("")
return nil
}
return fmt.Errorf("unexpected error message: %q", we.Object)
default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)
return fmt.Errorf("unexpected WatchEvent type %q: %q", we.Type, we.Object)
}
}
}
// Bookmark is a bookmark from Kubernetes Watch API.
// Bookmark is a bookmark message from Kubernetes Watch API.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct {
Metadata struct {
@ -602,6 +617,19 @@ func parseBookmark(data []byte) (*Bookmark, error) {
return &bm, nil
}
// Error is an error message from Kubernetes Watch API.
type Error struct {
Code int
}
func parseError(data []byte) (*Error, error) {
var em Error
if err := json.Unmarshal(data, &em); err != nil {
return nil, err
}
return &em, nil
}
func getAPIPaths(role string, namespaces []string, selectors []Selector) []string {
objectName := getObjectNameByRole(role)
if objectName == "nodes" || len(namespaces) == 0 {