From 6c9cd3f7c1ec9b43d44e12bcf772b62f0f24f909 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 10 Mar 2021 15:06:33 +0200 Subject: [PATCH] lib/promscrape/discovery/kubernetes: reduce load on Kubernetes API server by using watch bookmarks This allows continuing object watch from the last bookbark instead of reloading all the objects on watch errors or timeouts. See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks --- lib/promscrape/discovery/kubernetes/api_watcher.go | 14 ++++++++------ .../discovery/kubernetes/common_types.go | 1 + lib/promscrape/discovery/kubernetes/endpoints.go | 4 ++++ .../discovery/kubernetes/endpointslices.go | 4 ++++ lib/promscrape/discovery/kubernetes/ingress.go | 4 ++++ lib/promscrape/discovery/kubernetes/node.go | 4 ++++ lib/promscrape/discovery/kubernetes/pod.go | 4 ++++ lib/promscrape/discovery/kubernetes/service.go | 4 ++++ 8 files changed, 33 insertions(+), 6 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index b997f2832..17ad2d636 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -31,6 +31,7 @@ type WatchEvent struct { type object interface { key() string getTargetLabels(aw *apiWatcher) []map[string]string + resourceVersion() string } // parseObjectFunc must parse object from the given data. @@ -247,9 +248,9 @@ var reloadObjectsLocksByRole = map[string]*sync.Mutex{ "ingress": {}, } -func (uw *urlWatcher) resetResourceVersion() { +func (uw *urlWatcher) setResourceVersion(resourceVersion string) { uw.mu.Lock() - uw.resourceVersion = "" + uw.resourceVersion = resourceVersion uw.mu.Unlock() } @@ -339,7 +340,7 @@ func (uw *urlWatcher) watchForUpdates() { delimiter = "&" } timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds() - apiURL += delimiter + "watch=1&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) + apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) for { if aw.needStop() { return @@ -356,7 +357,6 @@ func (uw *urlWatcher) watchForUpdates() { } logger.Errorf("error when performing a request to %q: %s", requestURL, err) backoffSleep() - uw.resetResourceVersion() continue } if resp.StatusCode != http.StatusOK { @@ -366,10 +366,10 @@ func (uw *urlWatcher) watchForUpdates() { if resp.StatusCode == 410 { // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses backoffDelay = time.Second + uw.setResourceVersion("") } else { backoffSleep() } - uw.resetResourceVersion() continue } backoffDelay = time.Second @@ -383,7 +383,6 @@ func (uw *urlWatcher) watchForUpdates() { logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) } backoffSleep() - uw.resetResourceVersion() continue } } @@ -420,6 +419,9 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.mu.Lock() delete(uw.swosByKey, key) uw.mu.Unlock() + case "BOOKMARK": + // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks + uw.setResourceVersion(o.resourceVersion()) default: return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) } diff --git a/lib/promscrape/discovery/kubernetes/common_types.go b/lib/promscrape/discovery/kubernetes/common_types.go index be93bbb4a..0fffe5df4 100644 --- a/lib/promscrape/discovery/kubernetes/common_types.go +++ b/lib/promscrape/discovery/kubernetes/common_types.go @@ -8,6 +8,7 @@ import ( // // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectmeta-v1-meta type ObjectMeta struct { + ResourceVersion string Name string Namespace string UID string diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index 003cceca5..b3f8c2bff 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -47,6 +47,10 @@ type Endpoints struct { Subsets []EndpointSubset } +func (eps *Endpoints) resourceVersion() string { + return eps.Metadata.ResourceVersion +} + // EndpointSubset implements k8s endpoint subset. // // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointsubset-v1-core diff --git a/lib/promscrape/discovery/kubernetes/endpointslices.go b/lib/promscrape/discovery/kubernetes/endpointslices.go index cec87ce13..14629a819 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices.go @@ -161,6 +161,10 @@ type EndpointSlice struct { Ports []EndpointPort } +func (eps *EndpointSlice) resourceVersion() string { + return eps.Metadata.ResourceVersion +} + // Endpoint implements kubernetes object endpoint for endpoint slice. // https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpoint-v1beta1-discovery-k8s-io type Endpoint struct { diff --git a/lib/promscrape/discovery/kubernetes/ingress.go b/lib/promscrape/discovery/kubernetes/ingress.go index 38aef79d3..e71aa234f 100644 --- a/lib/promscrape/discovery/kubernetes/ingress.go +++ b/lib/promscrape/discovery/kubernetes/ingress.go @@ -45,6 +45,10 @@ type Ingress struct { Spec IngressSpec } +func (ig *Ingress) resourceVersion() string { + return ig.Metadata.ResourceVersion +} + // IngressSpec represents ingress spec in k8s. // // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingressspec-v1beta1-extensions diff --git a/lib/promscrape/discovery/kubernetes/node.go b/lib/promscrape/discovery/kubernetes/node.go index 653a99e73..08ca03388 100644 --- a/lib/promscrape/discovery/kubernetes/node.go +++ b/lib/promscrape/discovery/kubernetes/node.go @@ -48,6 +48,10 @@ type Node struct { Status NodeStatus } +func (n *Node) resourceVersion() string { + return n.Metadata.ResourceVersion +} + // NodeStatus represents NodeStatus from k8s API. // // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodestatus-v1-core diff --git a/lib/promscrape/discovery/kubernetes/pod.go b/lib/promscrape/discovery/kubernetes/pod.go index 80864d25e..041f17760 100644 --- a/lib/promscrape/discovery/kubernetes/pod.go +++ b/lib/promscrape/discovery/kubernetes/pod.go @@ -50,6 +50,10 @@ type Pod struct { Status PodStatus } +func (p *Pod) resourceVersion() string { + return p.Metadata.ResourceVersion +} + // PodSpec implements k8s pod spec. // // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podspec-v1-core diff --git a/lib/promscrape/discovery/kubernetes/service.go b/lib/promscrape/discovery/kubernetes/service.go index c129816e6..fe209281e 100644 --- a/lib/promscrape/discovery/kubernetes/service.go +++ b/lib/promscrape/discovery/kubernetes/service.go @@ -47,6 +47,10 @@ type Service struct { Spec ServiceSpec } +func (s *Service) resourceVersion() string { + return s.Metadata.ResourceVersion +} + // ServiceSpec is k8s service spec. // // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicespec-v1-core