diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index cd20d89ee..5596c7f5b 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -1,21 +1,15 @@ package kubernetes import ( - "flag" "fmt" "net" - "net/http" - "net/url" "os" "strings" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) -var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kuberntes API server") - // apiConfig contains config for API server type apiConfig struct { aw *apiWatcher @@ -36,6 +30,11 @@ func getAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFu } func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) { + switch sdc.Role { + case "node", "pod", "service", "endpoints", "endpointslices", "ingress": + default: + return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`", sdc.Role) + } ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) if err != nil { return nil, fmt.Errorf("cannot parse auth config: %w", err) @@ -75,20 +74,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFu for strings.HasSuffix(apiServer, "/") { apiServer = apiServer[:len(apiServer)-1] } - var proxy func(*http.Request) (*url.URL, error) - if proxyURL := sdc.ProxyURL.URL(); proxyURL != nil { - proxy = http.ProxyURL(proxyURL) - } - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: ac.NewTLSConfig(), - Proxy: proxy, - TLSHandshakeTimeout: 10 * time.Second, - IdleConnTimeout: *apiServerTimeout, - }, - Timeout: *apiServerTimeout, - } - aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors, swcFunc) + aw := newAPIWatcher(apiServer, ac, sdc, swcFunc) cfg := &apiConfig{ aw: aw, } diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 5d4a30273..26e407690 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -1,9 +1,9 @@ package kubernetes import ( - "context" "encoding/json" "errors" + "flag" "fmt" "io" "io/ioutil" @@ -16,9 +16,12 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/metrics" ) +var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kuberntes API server") + // WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes @@ -36,275 +39,83 @@ type object interface { // parseObjectFunc must parse object from the given data. type parseObjectFunc func(data []byte) (object, error) -// parseObjectListFunc must parse objectList from the given data. -type parseObjectListFunc func(data []byte) (map[string]object, ListMeta, error) +// parseObjectListFunc must parse objectList from the given r. +type parseObjectListFunc func(r io.Reader) (map[string]object, ListMeta, error) // apiWatcher is used for watching for Kuberntes object changes and caching their latest states. type apiWatcher struct { - // The client used for watching for object changes - client *http.Client - // Kubenetes API server address in the form http://api-server apiServer string - // The contents for `Authorization` HTTP request header - authorization string + // ac contains auth config for communicating with apiServer + ac *promauth.Config - // Namespaces to watch - namespaces []string + // sdc contains the related SDConfig + sdc *SDConfig - // Selectors to apply during watch - selectors []Selector - - // Constructor for creating ScrapeWork objects from labels. + // Constructor for creating ScrapeWork objects from labels swcFunc ScrapeWorkConstructorFunc - // mu protects watchersByURL - mu sync.Mutex + // swos contains a map of ScrapeWork objects for the given apiWatcher + swosByKey map[string][]interface{} + swosByKeyLock sync.Mutex // a map of watchers keyed by request urls - watchersByURL map[string]*urlWatcher + watchersByURL map[string]*urlWatcher + watchersByURLLock sync.Mutex - stopFunc func() - stopCtx context.Context - wg sync.WaitGroup + stopCh chan struct{} + wg sync.WaitGroup +} + +func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc ScrapeWorkConstructorFunc) *apiWatcher { + return &apiWatcher{ + apiServer: apiServer, + ac: ac, + sdc: sdc, + swcFunc: swcFunc, + + swosByKey: make(map[string][]interface{}), + watchersByURL: make(map[string]*urlWatcher), + + stopCh: make(chan struct{}), + } } func (aw *apiWatcher) mustStop() { - aw.stopFunc() + close(aw.stopCh) aw.wg.Wait() } -func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher { - stopCtx, stopFunc := context.WithCancel(context.Background()) - return &apiWatcher{ - apiServer: apiServer, - authorization: authorization, - client: client, - namespaces: namespaces, - selectors: selectors, - swcFunc: swcFunc, - - watchersByURL: make(map[string]*urlWatcher), - - stopFunc: stopFunc, - stopCtx: stopCtx, - } -} - -// getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role. -func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} { - aw.startWatchersForRole(role) - var swos []interface{} - aw.mu.Lock() - for _, uw := range aw.watchersByURL { - if uw.role != role { - continue - } - uw.mu.Lock() - for _, swosLocal := range uw.swosByKey { - swos = append(swos, swosLocal...) - } - uw.mu.Unlock() - } - aw.mu.Unlock() - return swos -} - -// getObjectByRole returns an object with the given (namespace, name) key and the given role. -func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { - if aw == nil { - return nil - } - key := namespace + "/" + name - aw.startWatchersForRole(role) - var o object - aw.mu.Lock() - for _, uw := range aw.watchersByURL { - if uw.role != role { - continue - } - o = uw.objectsByKey.get(key) - if o != nil { - break - } - } - aw.mu.Unlock() - return o -} - -func (aw *apiWatcher) startWatchersForRole(role string) { - parseObject, parseObjectList := getObjectParsersForRole(role) - paths := getAPIPaths(role, aw.namespaces, aw.selectors) - for _, path := range paths { - apiURL := aw.apiServer + path - aw.startWatcherForURL(role, apiURL, parseObject, parseObjectList) - } -} - -func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) { - aw.mu.Lock() - if aw.watchersByURL[apiURL] != nil { - // Watcher for the given path already exists. - aw.mu.Unlock() - return - } - uw := aw.newURLWatcher(role, apiURL, parseObject, parseObjectList) - aw.watchersByURL[apiURL] = uw - aw.mu.Unlock() - - uw.watchersCount.Inc() - uw.watchersCreated.Inc() - uw.reloadObjects() - aw.wg.Add(1) - go func() { - defer aw.wg.Done() - uw.watchForUpdates() - uw.objectsByKey.decRef() - - aw.mu.Lock() - delete(aw.watchersByURL, apiURL) - aw.mu.Unlock() - uw.watchersCount.Dec() - uw.watchersStopped.Inc() - }() -} - -// needStop returns true if aw must be stopped. -func (aw *apiWatcher) needStop() bool { - select { - case <-aw.stopCtx.Done(): - return true - default: - return false - } -} - -// doRequest performs http request to the given requestURL. -func (aw *apiWatcher) doRequest(requestURL string) (*http.Response, error) { - req, err := http.NewRequestWithContext(aw.stopCtx, "GET", requestURL, nil) - if err != nil { - logger.Fatalf("cannot create a request for %q: %s", requestURL, err) - } - if aw.authorization != "" { - req.Header.Set("Authorization", aw.authorization) - } - return aw.client.Do(req) -} - -// urlWatcher watches for an apiURL and updates object states in objectsByKey. -type urlWatcher struct { - role string - apiURL string - - parseObject parseObjectFunc - parseObjectList parseObjectListFunc - - // objectsByKey contains the latest state for objects obtained from apiURL - objectsByKey *objectsMap - - // mu protects swosByKey and resourceVersion - mu sync.Mutex - swosByKey map[string][]interface{} - resourceVersion string - - // the parent apiWatcher - aw *apiWatcher - - watchersCount *metrics.Counter - watchersCreated *metrics.Counter - watchersStopped *metrics.Counter -} - -func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher { - return &urlWatcher{ - role: role, - apiURL: apiURL, - - parseObject: parseObject, - parseObjectList: parseObjectList, - - objectsByKey: sharedObjectsGlobal.getByAPIURL(role, apiURL), - swosByKey: make(map[string][]interface{}), - - aw: aw, - - watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)), - watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)), - watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)), - } -} - -// Limit the concurrency for per-role objects reloading to 1. -// -// This should reduce memory usage when big number of watchers simultaneously receive an update for objects of the same role. -var reloadObjectsLocksByRole = map[string]*sync.Mutex{ - "node": {}, - "pod": {}, - "service": {}, - "endpoints": {}, - "endpointslices": {}, - "ingress": {}, -} - -func (uw *urlWatcher) setResourceVersion(resourceVersion string) { - uw.mu.Lock() - uw.resourceVersion = resourceVersion - uw.mu.Unlock() -} - -// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. -func (uw *urlWatcher) reloadObjects() string { - lock := reloadObjectsLocksByRole[uw.role] - lock.Lock() - defer lock.Unlock() - - uw.mu.Lock() - resourceVersion := uw.resourceVersion - uw.mu.Unlock() - if resourceVersion != "" { - // Fast path - objects have been already reloaded by concurrent goroutines. - return resourceVersion - } - - aw := uw.aw - requestURL := uw.apiURL - resp, err := aw.doRequest(requestURL) - if err != nil { - if !aw.needStop() { - logger.Errorf("error when performing a request to %q: %s", requestURL, err) - } - return "" - } - body, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - if resp.StatusCode != http.StatusOK { - logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) - return "" - } - objectsByKey, metadata, err := uw.parseObjectList(body) - if err != nil { - if !aw.needStop() { - logger.Errorf("cannot parse response from %q: %s", requestURL, err) - } - return "" - } - logger.Infof("loaded %d objects from %q", len(objectsByKey), requestURL) - uw.objectsByKey.reload(objectsByKey) +func (aw *apiWatcher) reloadScrapeWorks(objectsByKey map[string]object) { swosByKey := make(map[string][]interface{}) - for k, o := range objectsByKey { + for key, o := range objectsByKey { labels := o.getTargetLabels(aw) swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) if len(swos) > 0 { - swosByKey[k] = swos + swosByKey[key] = swos } } - uw.mu.Lock() - uw.swosByKey = swosByKey - uw.resourceVersion = metadata.ResourceVersion - uw.mu.Unlock() + aw.swosByKeyLock.Lock() + aw.swosByKey = swosByKey + aw.swosByKeyLock.Unlock() +} - return metadata.ResourceVersion +func (aw *apiWatcher) setScrapeWorks(key string, labels []map[string]string) { + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + aw.swosByKeyLock.Lock() + if len(swos) > 0 { + aw.swosByKey[key] = swos + } else { + delete(aw.swosByKey, key) + } + aw.swosByKeyLock.Unlock() +} + +func (aw *apiWatcher) removeScrapeWorks(key string) { + aw.swosByKeyLock.Lock() + delete(aw.swosByKey, key) + aw.swosByKeyLock.Unlock() } func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} { @@ -319,11 +130,279 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss [] return swos } +// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw. +func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { + aw.startWatchersForRole(aw.sdc.Role) + aw.swosByKeyLock.Lock() + defer aw.swosByKeyLock.Unlock() + + size := 0 + for _, swosLocal := range aw.swosByKey { + size += len(swosLocal) + } + swos := make([]interface{}, 0, size) + for _, swosLocal := range aw.swosByKey { + swos = append(swos, swosLocal...) + } + return swos +} + +// getObjectByRole returns an object with the given (namespace, name) key and the given role. +func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { + if aw == nil { + // this is needed for testing + return nil + } + key := namespace + "/" + name + aw.startWatchersForRole(role) + aw.watchersByURLLock.Lock() + defer aw.watchersByURLLock.Unlock() + + for _, uw := range aw.watchersByURL { + if uw.role != role { + continue + } + uw.mu.Lock() + o := uw.objectsByKey[key] + uw.mu.Unlock() + if o != nil { + return o + } + } + return nil +} + +func (aw *apiWatcher) startWatchersForRole(role string) { + paths := getAPIPaths(role, aw.sdc.Namespaces.Names, aw.sdc.Selectors) + for _, path := range paths { + apiURL := aw.apiServer + path + aw.startWatcherForURL(role, apiURL) + } +} + +func (aw *apiWatcher) startWatcherForURL(role, apiURL string) { + aw.watchersByURLLock.Lock() + if aw.watchersByURL[apiURL] != nil { + // Watcher for the given path already exists. + aw.watchersByURLLock.Unlock() + return + } + uw := getURLWatcher(role, apiURL, aw.sdc.ProxyURL.URL(), aw.ac) + uw.addAPIWatcher(aw) + aw.watchersByURL[apiURL] = uw + aw.watchersByURLLock.Unlock() + + aw.wg.Add(1) + go func() { + defer aw.wg.Done() + <-aw.stopCh + aw.watchersByURLLock.Lock() + uw.removeAPIWatcher(aw) + delete(aw.watchersByURL, apiURL) + aw.watchersByURLLock.Unlock() + }() +} + +func getURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher { + key := fmt.Sprintf("url=%s, proxyURL=%v, authConfig=%s", apiURL, proxyURL, ac.String()) + urlWatchersLock.Lock() + uw := urlWatchers[key] + logger.Infof("found watcher for key=%s", key) + if uw == nil { + uw = newURLWatcher(role, apiURL, proxyURL, ac) + urlWatchers[key] = uw + logger.Infof("registered watcher for key=%s", key) + } + urlWatchersLock.Unlock() + return uw +} + +var ( + urlWatchersLock sync.Mutex + urlWatchers = make(map[string]*urlWatcher) +) + +// urlWatcher watches for an apiURL and updates object states in objectsByKey. +type urlWatcher struct { + role string + apiURL string + authorization string + client *http.Client + + parseObject parseObjectFunc + parseObjectList parseObjectListFunc + + // mu protects aws, objectsByKey and resourceVersion + mu sync.Mutex + + // aws contains registered apiWatcher objects + aws map[*apiWatcher]struct{} + + // objectsByKey contains the latest state for objects obtained from apiURL + objectsByKey map[string]object + + resourceVersion string + + objectsCount *metrics.Counter + objectsAdded *metrics.Counter + objectsRemoved *metrics.Counter + objectsUpdated *metrics.Counter +} + +func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher { + var proxy func(*http.Request) (*url.URL, error) + if proxyURL != nil { + proxy = http.ProxyURL(proxyURL) + } + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: ac.NewTLSConfig(), + Proxy: proxy, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: *apiServerTimeout, + }, + Timeout: *apiServerTimeout, + } + parseObject, parseObjectList := getObjectParsersForRole(role) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc() + uw := &urlWatcher{ + role: role, + apiURL: apiURL, + authorization: ac.Authorization, + client: client, + + parseObject: parseObject, + parseObjectList: parseObjectList, + + aws: make(map[*apiWatcher]struct{}), + objectsByKey: make(map[string]object), + + objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), + objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), + objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), + objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)), + } + uw.reloadObjects() + go uw.watchForUpdates() + return uw +} + +func (uw *urlWatcher) addAPIWatcher(aw *apiWatcher) { + uw.mu.Lock() + if _, ok := uw.aws[aw]; ok { + logger.Panicf("BUG: aw=%p has been already added", aw) + } + uw.aws[aw] = struct{}{} + objectsByKey := make(map[string]object) + for key, o := range uw.objectsByKey { + objectsByKey[key] = o + } + uw.mu.Unlock() + + aw.reloadScrapeWorks(objectsByKey) +} + +func (uw *urlWatcher) removeAPIWatcher(aw *apiWatcher) { + uw.mu.Lock() + if _, ok := uw.aws[aw]; !ok { + logger.Panicf("BUG: aw=%p is missing", aw) + } + delete(uw.aws, aw) + uw.mu.Unlock() +} + +// doRequest performs http request to the given requestURL. +func (uw *urlWatcher) doRequest(requestURL string) (*http.Response, error) { + req, err := http.NewRequest("GET", requestURL, nil) + if err != nil { + logger.Fatalf("cannot create a request for %q: %s", requestURL, err) + } + if uw.authorization != "" { + req.Header.Set("Authorization", uw.authorization) + } + return uw.client.Do(req) +} + +func (uw *urlWatcher) setResourceVersion(resourceVersion string) { + uw.mu.Lock() + uw.resourceVersion = resourceVersion + uw.mu.Unlock() +} + +// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. +func (uw *urlWatcher) reloadObjects() string { + uw.mu.Lock() + resourceVersion := uw.resourceVersion + uw.mu.Unlock() + if resourceVersion != "" { + // Fast path - there is no need in reloading the objects. + return resourceVersion + } + + requestURL := uw.apiURL + resp, err := uw.doRequest(requestURL) + if err != nil { + logger.Errorf("cannot perform request to %q: %s", requestURL, err) + return "" + } + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) + return "" + } + objectsByKey, metadata, err := uw.parseObjectList(resp.Body) + _ = resp.Body.Close() + if err != nil { + logger.Errorf("cannot parse objects from %q: %s", requestURL, err) + return "" + } + + uw.mu.Lock() + var updated, removed, added int + for key := range uw.objectsByKey { + if o, ok := objectsByKey[key]; ok { + uw.objectsByKey[key] = o + updated++ + } else { + delete(uw.objectsByKey, key) + removed++ + } + } + for key, o := range objectsByKey { + if _, ok := uw.objectsByKey[key]; !ok { + uw.objectsByKey[key] = o + added++ + } + } + uw.objectsUpdated.Add(updated) + uw.objectsRemoved.Add(removed) + uw.objectsAdded.Add(added) + uw.objectsCount.Add(added - removed) + uw.resourceVersion = metadata.ResourceVersion + uw.mu.Unlock() + + for _, aw := range uw.getAPIWatchers() { + aw.reloadScrapeWorks(objectsByKey) + } + logger.Infof("loaded %d objects from %q", len(objectsByKey), requestURL) + return metadata.ResourceVersion +} + +func (uw *urlWatcher) getAPIWatchers() []*apiWatcher { + uw.mu.Lock() + aws := make([]*apiWatcher, 0, len(uw.aws)) + for aw := range uw.aws { + aws = append(aws, aw) + } + uw.mu.Unlock() + return aws +} + // watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes func (uw *urlWatcher) watchForUpdates() { - aw := uw.aw backoffDelay := time.Second maxBackoffDelay := 30 * time.Second backoffSleep := func() { @@ -338,23 +417,17 @@ func (uw *urlWatcher) watchForUpdates() { if strings.Contains(apiURL, "?") { delimiter = "&" } - timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds() + timeoutSeconds := time.Duration(0.9 * float64(uw.client.Timeout)).Seconds() apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) for { - if aw.needStop() { - return - } resourceVersion := uw.reloadObjects() requestURL := apiURL if resourceVersion != "" { requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) } - resp, err := aw.doRequest(requestURL) + resp, err := uw.doRequest(requestURL) if err != nil { - if aw.needStop() { - return - } - logger.Errorf("error when performing a request to %q: %s", requestURL, err) + logger.Errorf("cannot performing request to %q: %s", requestURL, err) backoffSleep() continue } @@ -375,9 +448,6 @@ func (uw *urlWatcher) watchForUpdates() { err = uw.readObjectUpdateStream(resp.Body) _ = resp.Body.Close() if err != nil { - if aw.needStop() { - return - } if !errors.Is(err, io.EOF) { logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) } @@ -389,7 +459,6 @@ func (uw *urlWatcher) watchForUpdates() { // readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events. func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { - aw := uw.aw d := json.NewDecoder(r) var we WatchEvent for { @@ -403,26 +472,35 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { return err } key := o.key() - uw.objectsByKey.update(key, o) - labels := o.getTargetLabels(aw) - swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) uw.mu.Lock() - if len(swos) > 0 { - uw.swosByKey[key] = swos + if _, ok := uw.objectsByKey[key]; !ok { + uw.objectsCount.Inc() + uw.objectsAdded.Inc() } else { - delete(uw.swosByKey, key) + uw.objectsUpdated.Inc() } + uw.objectsByKey[key] = o uw.mu.Unlock() + for _, aw := range uw.getAPIWatchers() { + labels := o.getTargetLabels(aw) + aw.setScrapeWorks(key, labels) + } case "DELETED": o, err := uw.parseObject(we.Object) if err != nil { return err } key := o.key() - uw.objectsByKey.remove(key) uw.mu.Lock() - delete(uw.swosByKey, key) + if _, ok := uw.objectsByKey[key]; ok { + uw.objectsCount.Dec() + uw.objectsRemoved.Inc() + delete(uw.objectsByKey, key) + } uw.mu.Unlock() + for _, aw := range uw.getAPIWatchers() { + aw.removeScrapeWorks(key) + } case "BOOKMARK": // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks bm, err := parseBookmark(we.Object) @@ -546,105 +624,3 @@ func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) return nil, nil } } - -type objectsMap struct { - mu sync.Mutex - refCount int - m map[string]object - - objectsAdded *metrics.Counter - objectsRemoved *metrics.Counter - objectsCount *metrics.Counter -} - -func (om *objectsMap) incRef() { - om.mu.Lock() - om.refCount++ - om.mu.Unlock() -} - -func (om *objectsMap) decRef() { - om.mu.Lock() - om.refCount-- - if om.refCount < 0 { - logger.Panicf("BUG: refCount cannot be smaller than 0; got %d", om.refCount) - } - if om.refCount == 0 { - // Free up memory occupied by om.m - om.objectsRemoved.Add(len(om.m)) - om.objectsCount.Add(-len(om.m)) - om.m = make(map[string]object) - } - om.mu.Unlock() -} - -func (om *objectsMap) reload(m map[string]object) { - om.mu.Lock() - om.objectsAdded.Add(len(m)) - om.objectsRemoved.Add(len(om.m)) - om.objectsCount.Add(len(m) - len(om.m)) - for k := range om.m { - delete(om.m, k) - } - for k, o := range m { - om.m[k] = o - } - om.mu.Unlock() -} - -func (om *objectsMap) update(key string, o object) { - om.mu.Lock() - if om.m[key] == nil { - om.objectsAdded.Inc() - om.objectsCount.Inc() - } - om.m[key] = o - om.mu.Unlock() -} - -func (om *objectsMap) remove(key string) { - om.mu.Lock() - if om.m[key] != nil { - om.objectsRemoved.Inc() - om.objectsCount.Dec() - delete(om.m, key) - } - om.mu.Unlock() -} - -func (om *objectsMap) get(key string) object { - om.mu.Lock() - o, ok := om.m[key] - om.mu.Unlock() - if !ok { - return nil - } - return o -} - -type sharedObjects struct { - mu sync.Mutex - oms map[string]*objectsMap -} - -func (so *sharedObjects) getByAPIURL(role, apiURL string) *objectsMap { - so.mu.Lock() - om := so.oms[apiURL] - if om == nil { - om = &objectsMap{ - m: make(map[string]object), - - objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), - objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), - objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), - } - so.oms[apiURL] = om - } - so.mu.Unlock() - om.incRef() - return om -} - -var sharedObjectsGlobal = &sharedObjects{ - oms: make(map[string]*objectsMap), -} diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index 003cceca5..5119f513a 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -3,6 +3,7 @@ package kubernetes import ( "encoding/json" "fmt" + "io" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) @@ -11,10 +12,11 @@ func (eps *Endpoints) key() string { return eps.Metadata.key() } -func parseEndpointsList(data []byte) (map[string]object, ListMeta, error) { +func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) { var epsl EndpointsList - if err := json.Unmarshal(data, &epsl); err != nil { - return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList from %q: %w", data, err) + d := json.NewDecoder(r) + if err := d.Decode(&epsl); err != nil { + return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err) } objectsByKey := make(map[string]object) for _, eps := range epsl.Items { diff --git a/lib/promscrape/discovery/kubernetes/endpoints_test.go b/lib/promscrape/discovery/kubernetes/endpoints_test.go index c9649df92..51b30d091 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints_test.go +++ b/lib/promscrape/discovery/kubernetes/endpoints_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "bytes" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -10,7 +11,8 @@ import ( func TestParseEndpointsListFailure(t *testing.T) { f := func(s string) { t.Helper() - objectsByKey, _, err := parseEndpointsList([]byte(s)) + r := bytes.NewBufferString(s) + objectsByKey, _, err := parseEndpointsList(r) if err == nil { t.Fatalf("expecting non-nil error") } @@ -78,7 +80,8 @@ func TestParseEndpointsListSuccess(t *testing.T) { ] } ` - objectsByKey, meta, err := parseEndpointsList([]byte(data)) + r := bytes.NewBufferString(data) + objectsByKey, meta, err := parseEndpointsList(r) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/lib/promscrape/discovery/kubernetes/endpointslices.go b/lib/promscrape/discovery/kubernetes/endpointslices.go index cec87ce13..a8739ee5b 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices.go @@ -3,6 +3,7 @@ package kubernetes import ( "encoding/json" "fmt" + "io" "strconv" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" @@ -12,10 +13,11 @@ func (eps *EndpointSlice) key() string { return eps.Metadata.key() } -func parseEndpointSliceList(data []byte) (map[string]object, ListMeta, error) { +func parseEndpointSliceList(r io.Reader) (map[string]object, ListMeta, error) { var epsl EndpointSliceList - if err := json.Unmarshal(data, &epsl); err != nil { - return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList from %q: %w", data, err) + d := json.NewDecoder(r) + if err := d.Decode(&epsl); err != nil { + return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList: %w", err) } objectsByKey := make(map[string]object) for _, eps := range epsl.Items { diff --git a/lib/promscrape/discovery/kubernetes/endpointslices_test.go b/lib/promscrape/discovery/kubernetes/endpointslices_test.go index 3555e42f2..4114e0cff 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices_test.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "bytes" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -9,7 +10,8 @@ import ( func TestParseEndpointSliceListFail(t *testing.T) { f := func(data string) { - objectsByKey, _, err := parseEndpointSliceList([]byte(data)) + r := bytes.NewBufferString(data) + objectsByKey, _, err := parseEndpointSliceList(r) if err == nil { t.Errorf("unexpected result, test must fail! data: %s", data) } @@ -175,7 +177,8 @@ func TestParseEndpointSliceListSuccess(t *testing.T) { } ] }` - objectsByKey, meta, err := parseEndpointSliceList([]byte(data)) + r := bytes.NewBufferString(data) + objectsByKey, meta, err := parseEndpointSliceList(r) if err != nil { t.Errorf("cannot parse data for EndpointSliceList: %v", err) return diff --git a/lib/promscrape/discovery/kubernetes/ingress.go b/lib/promscrape/discovery/kubernetes/ingress.go index 38aef79d3..d7f5df66f 100644 --- a/lib/promscrape/discovery/kubernetes/ingress.go +++ b/lib/promscrape/discovery/kubernetes/ingress.go @@ -3,16 +3,18 @@ package kubernetes import ( "encoding/json" "fmt" + "io" ) func (ig *Ingress) key() string { return ig.Metadata.key() } -func parseIngressList(data []byte) (map[string]object, ListMeta, error) { +func parseIngressList(r io.Reader) (map[string]object, ListMeta, error) { var igl IngressList - if err := json.Unmarshal(data, &igl); err != nil { - return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList from %q: %w", data, err) + d := json.NewDecoder(r) + if err := d.Decode(&igl); err != nil { + return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList: %w", err) } objectsByKey := make(map[string]object) for _, ig := range igl.Items { diff --git a/lib/promscrape/discovery/kubernetes/ingress_test.go b/lib/promscrape/discovery/kubernetes/ingress_test.go index 5718758f5..f3440ac23 100644 --- a/lib/promscrape/discovery/kubernetes/ingress_test.go +++ b/lib/promscrape/discovery/kubernetes/ingress_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "bytes" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -10,7 +11,8 @@ import ( func TestParseIngressListFailure(t *testing.T) { f := func(s string) { t.Helper() - objectsByKey, _, err := parseIngressList([]byte(s)) + r := bytes.NewBufferString(s) + objectsByKey, _, err := parseIngressList(r) if err == nil { t.Fatalf("expecting non-nil error") } @@ -70,7 +72,8 @@ func TestParseIngressListSuccess(t *testing.T) { } ] }` - objectsByKey, meta, err := parseIngressList([]byte(data)) + r := bytes.NewBufferString(data) + objectsByKey, meta, err := parseIngressList(r) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index 2bab34550..b7dc29b64 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -48,12 +48,7 @@ func (sdc *SDConfig) GetScrapeWorkObjects(baseDir string, swcFunc ScrapeWorkCons if err != nil { return nil, fmt.Errorf("cannot create API config: %w", err) } - switch sdc.Role { - case "node", "pod", "service", "endpoints", "endpointslices", "ingress": - return cfg.aw.getScrapeWorkObjectsForRole(sdc.Role), nil - default: - return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role) - } + return cfg.aw.getScrapeWorkObjects(), nil } // MustStop stops further usage for sdc. diff --git a/lib/promscrape/discovery/kubernetes/node.go b/lib/promscrape/discovery/kubernetes/node.go index 653a99e73..a7e392d65 100644 --- a/lib/promscrape/discovery/kubernetes/node.go +++ b/lib/promscrape/discovery/kubernetes/node.go @@ -3,6 +3,7 @@ package kubernetes import ( "encoding/json" "fmt" + "io" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) @@ -12,10 +13,11 @@ func (n *Node) key() string { return n.Metadata.key() } -func parseNodeList(data []byte) (map[string]object, ListMeta, error) { +func parseNodeList(r io.Reader) (map[string]object, ListMeta, error) { var nl NodeList - if err := json.Unmarshal(data, &nl); err != nil { - return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList from %q: %w", data, err) + d := json.NewDecoder(r) + if err := d.Decode(&nl); err != nil { + return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList: %w", err) } objectsByKey := make(map[string]object) for _, n := range nl.Items { diff --git a/lib/promscrape/discovery/kubernetes/node_test.go b/lib/promscrape/discovery/kubernetes/node_test.go index 5b957e2f3..1e63ee69a 100644 --- a/lib/promscrape/discovery/kubernetes/node_test.go +++ b/lib/promscrape/discovery/kubernetes/node_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "bytes" "reflect" "sort" "strconv" @@ -13,7 +14,8 @@ import ( func TestParseNodeListFailure(t *testing.T) { f := func(s string) { t.Helper() - objectsByKey, _, err := parseNodeList([]byte(s)) + r := bytes.NewBufferString(s) + objectsByKey, _, err := parseNodeList(r) if err == nil { t.Fatalf("expecting non-nil error") } @@ -229,7 +231,8 @@ func TestParseNodeListSuccess(t *testing.T) { ] } ` - objectsByKey, meta, err := parseNodeList([]byte(data)) + r := bytes.NewBufferString(data) + objectsByKey, meta, err := parseNodeList(r) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/lib/promscrape/discovery/kubernetes/pod.go b/lib/promscrape/discovery/kubernetes/pod.go index 80864d25e..c3e3f314a 100644 --- a/lib/promscrape/discovery/kubernetes/pod.go +++ b/lib/promscrape/discovery/kubernetes/pod.go @@ -3,6 +3,7 @@ package kubernetes import ( "encoding/json" "fmt" + "io" "strconv" "strings" @@ -13,10 +14,11 @@ func (p *Pod) key() string { return p.Metadata.key() } -func parsePodList(data []byte) (map[string]object, ListMeta, error) { +func parsePodList(r io.Reader) (map[string]object, ListMeta, error) { var pl PodList - if err := json.Unmarshal(data, &pl); err != nil { - return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList from %q: %w", data, err) + d := json.NewDecoder(r) + if err := d.Decode(&pl); err != nil { + return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList: %w", err) } objectsByKey := make(map[string]object) for _, p := range pl.Items { diff --git a/lib/promscrape/discovery/kubernetes/pod_test.go b/lib/promscrape/discovery/kubernetes/pod_test.go index 54ebdec0f..f697d1b84 100644 --- a/lib/promscrape/discovery/kubernetes/pod_test.go +++ b/lib/promscrape/discovery/kubernetes/pod_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "bytes" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -10,7 +11,8 @@ import ( func TestParsePodListFailure(t *testing.T) { f := func(s string) { t.Helper() - objectsByKey, _, err := parsePodList([]byte(s)) + r := bytes.NewBufferString(s) + objectsByKey, _, err := parsePodList(r) if err == nil { t.Fatalf("expecting non-nil error") } @@ -227,7 +229,8 @@ func TestParsePodListSuccess(t *testing.T) { ] } ` - objectsByKey, meta, err := parsePodList([]byte(data)) + r := bytes.NewBufferString(data) + objectsByKey, meta, err := parsePodList(r) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/lib/promscrape/discovery/kubernetes/service.go b/lib/promscrape/discovery/kubernetes/service.go index c129816e6..c5c0ddd8c 100644 --- a/lib/promscrape/discovery/kubernetes/service.go +++ b/lib/promscrape/discovery/kubernetes/service.go @@ -3,6 +3,7 @@ package kubernetes import ( "encoding/json" "fmt" + "io" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) @@ -11,10 +12,11 @@ func (s *Service) key() string { return s.Metadata.key() } -func parseServiceList(data []byte) (map[string]object, ListMeta, error) { +func parseServiceList(r io.Reader) (map[string]object, ListMeta, error) { var sl ServiceList - if err := json.Unmarshal(data, &sl); err != nil { - return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList from %q: %w", data, err) + d := json.NewDecoder(r) + if err := d.Decode(&sl); err != nil { + return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList: %w", err) } objectsByKey := make(map[string]object) for _, s := range sl.Items { diff --git a/lib/promscrape/discovery/kubernetes/service_test.go b/lib/promscrape/discovery/kubernetes/service_test.go index ca1570ef7..050b2f70e 100644 --- a/lib/promscrape/discovery/kubernetes/service_test.go +++ b/lib/promscrape/discovery/kubernetes/service_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "bytes" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -10,7 +11,8 @@ import ( func TestParseServiceListFailure(t *testing.T) { f := func(s string) { t.Helper() - objectsByKey, _, err := parseServiceList([]byte(s)) + r := bytes.NewBufferString(s) + objectsByKey, _, err := parseServiceList(r) if err == nil { t.Fatalf("expecting non-nil error") } @@ -88,7 +90,8 @@ func TestParseServiceListSuccess(t *testing.T) { ] } ` - objectsByKey, meta, err := parseServiceList([]byte(data)) + r := bytes.NewBufferString(data) + objectsByKey, meta, err := parseServiceList(r) if err != nil { t.Fatalf("unexpected error: %s", err) }