lib/promscrape/discovery/kubernetes: reduce load on Kubernetes API server by using watch bookmarks

This allows continuing object watch from the last bookbark instead of reloading all the objects
on watch errors or timeouts.

See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
This commit is contained in:
Aliaksandr Valialkin 2021-03-10 15:06:33 +02:00
parent bd8b7a88a7
commit 6c9cd3f7c1
8 changed files with 33 additions and 6 deletions

View file

@ -31,6 +31,7 @@ type WatchEvent struct {
type object interface {
key() string
getTargetLabels(aw *apiWatcher) []map[string]string
resourceVersion() string
}
// parseObjectFunc must parse object from the given data.
@ -247,9 +248,9 @@ var reloadObjectsLocksByRole = map[string]*sync.Mutex{
"ingress": {},
}
func (uw *urlWatcher) resetResourceVersion() {
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.mu.Lock()
uw.resourceVersion = ""
uw.resourceVersion = resourceVersion
uw.mu.Unlock()
}
@ -339,7 +340,7 @@ func (uw *urlWatcher) watchForUpdates() {
delimiter = "&"
}
timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
if aw.needStop() {
return
@ -356,7 +357,6 @@ func (uw *urlWatcher) watchForUpdates() {
}
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
backoffSleep()
uw.resetResourceVersion()
continue
}
if resp.StatusCode != http.StatusOK {
@ -366,10 +366,10 @@ func (uw *urlWatcher) watchForUpdates() {
if resp.StatusCode == 410 {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time.Second
uw.setResourceVersion("")
} else {
backoffSleep()
}
uw.resetResourceVersion()
continue
}
backoffDelay = time.Second
@ -383,7 +383,6 @@ func (uw *urlWatcher) watchForUpdates() {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
}
backoffSleep()
uw.resetResourceVersion()
continue
}
}
@ -420,6 +419,9 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.mu.Lock()
delete(uw.swosByKey, key)
uw.mu.Unlock()
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
uw.setResourceVersion(o.resourceVersion())
default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)
}

View file

@ -8,6 +8,7 @@ import (
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectmeta-v1-meta
type ObjectMeta struct {
ResourceVersion string
Name string
Namespace string
UID string

View file

@ -47,6 +47,10 @@ type Endpoints struct {
Subsets []EndpointSubset
}
func (eps *Endpoints) resourceVersion() string {
return eps.Metadata.ResourceVersion
}
// EndpointSubset implements k8s endpoint subset.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpointsubset-v1-core

View file

@ -161,6 +161,10 @@ type EndpointSlice struct {
Ports []EndpointPort
}
func (eps *EndpointSlice) resourceVersion() string {
return eps.Metadata.ResourceVersion
}
// Endpoint implements kubernetes object endpoint for endpoint slice.
// https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#endpoint-v1beta1-discovery-k8s-io
type Endpoint struct {

View file

@ -45,6 +45,10 @@ type Ingress struct {
Spec IngressSpec
}
func (ig *Ingress) resourceVersion() string {
return ig.Metadata.ResourceVersion
}
// IngressSpec represents ingress spec in k8s.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ingressspec-v1beta1-extensions

View file

@ -48,6 +48,10 @@ type Node struct {
Status NodeStatus
}
func (n *Node) resourceVersion() string {
return n.Metadata.ResourceVersion
}
// NodeStatus represents NodeStatus from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodestatus-v1-core

View file

@ -50,6 +50,10 @@ type Pod struct {
Status PodStatus
}
func (p *Pod) resourceVersion() string {
return p.Metadata.ResourceVersion
}
// PodSpec implements k8s pod spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#podspec-v1-core

View file

@ -47,6 +47,10 @@ type Service struct {
Spec ServiceSpec
}
func (s *Service) resourceVersion() string {
return s.Metadata.ResourceVersion
}
// ServiceSpec is k8s service spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicespec-v1-core