lib/promscrape/discovery/kubernetes: use a single watcher per apiURL

Previously multiple scrape jobs could create multiple watchers for the same apiURL. Now only a single watcher is used.
This should reduce load on Kubernetes API server when many scrape job configs use Kubernetes service discovery.
This commit is contained in:
Aliaksandr Valialkin 2021-03-11 16:41:09 +02:00
parent 19af3b4f38
commit 8fc29ffc67
15 changed files with 411 additions and 424 deletions

View file

@ -1,21 +1,15 @@
package kubernetes
import (
"flag"
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kuberntes API server")
// apiConfig contains config for API server
type apiConfig struct {
aw *apiWatcher
@ -36,6 +30,11 @@ func getAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFu
}
func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFunc) (*apiConfig, error) {
switch sdc.Role {
case "node", "pod", "service", "endpoints", "endpointslices", "ingress":
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`", sdc.Role)
}
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
@ -75,20 +74,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFu
for strings.HasSuffix(apiServer, "/") {
apiServer = apiServer[:len(apiServer)-1]
}
var proxy func(*http.Request) (*url.URL, error)
if proxyURL := sdc.ProxyURL.URL(); proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: ac.NewTLSConfig(),
Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: *apiServerTimeout,
},
Timeout: *apiServerTimeout,
}
aw := newAPIWatcher(client, apiServer, ac.Authorization, sdc.Namespaces.Names, sdc.Selectors, swcFunc)
aw := newAPIWatcher(apiServer, ac, sdc, swcFunc)
cfg := &apiConfig{
aw: aw,
}

View file

@ -1,9 +1,9 @@
package kubernetes
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
@ -16,9 +16,12 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/metrics"
)
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kuberntes API server")
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
@ -36,275 +39,83 @@ type object interface {
// parseObjectFunc must parse object from the given data.
type parseObjectFunc func(data []byte) (object, error)
// parseObjectListFunc must parse objectList from the given data.
type parseObjectListFunc func(data []byte) (map[string]object, ListMeta, error)
// parseObjectListFunc must parse objectList from the given r.
type parseObjectListFunc func(r io.Reader) (map[string]object, ListMeta, error)
// apiWatcher is used for watching for Kuberntes object changes and caching their latest states.
type apiWatcher struct {
// The client used for watching for object changes
client *http.Client
// Kubenetes API server address in the form http://api-server
apiServer string
// The contents for `Authorization` HTTP request header
authorization string
// ac contains auth config for communicating with apiServer
ac *promauth.Config
// Namespaces to watch
namespaces []string
// sdc contains the related SDConfig
sdc *SDConfig
// Selectors to apply during watch
selectors []Selector
// Constructor for creating ScrapeWork objects from labels.
// Constructor for creating ScrapeWork objects from labels
swcFunc ScrapeWorkConstructorFunc
// mu protects watchersByURL
mu sync.Mutex
// swos contains a map of ScrapeWork objects for the given apiWatcher
swosByKey map[string][]interface{}
swosByKeyLock sync.Mutex
// a map of watchers keyed by request urls
watchersByURL map[string]*urlWatcher
watchersByURL map[string]*urlWatcher
watchersByURLLock sync.Mutex
stopFunc func()
stopCtx context.Context
wg sync.WaitGroup
stopCh chan struct{}
wg sync.WaitGroup
}
func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
return &apiWatcher{
apiServer: apiServer,
ac: ac,
sdc: sdc,
swcFunc: swcFunc,
swosByKey: make(map[string][]interface{}),
watchersByURL: make(map[string]*urlWatcher),
stopCh: make(chan struct{}),
}
}
func (aw *apiWatcher) mustStop() {
aw.stopFunc()
close(aw.stopCh)
aw.wg.Wait()
}
func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
stopCtx, stopFunc := context.WithCancel(context.Background())
return &apiWatcher{
apiServer: apiServer,
authorization: authorization,
client: client,
namespaces: namespaces,
selectors: selectors,
swcFunc: swcFunc,
watchersByURL: make(map[string]*urlWatcher),
stopFunc: stopFunc,
stopCtx: stopCtx,
}
}
// getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role.
func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} {
aw.startWatchersForRole(role)
var swos []interface{}
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
for _, swosLocal := range uw.swosByKey {
swos = append(swos, swosLocal...)
}
uw.mu.Unlock()
}
aw.mu.Unlock()
return swos
}
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
if aw == nil {
return nil
}
key := namespace + "/" + name
aw.startWatchersForRole(role)
var o object
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
o = uw.objectsByKey.get(key)
if o != nil {
break
}
}
aw.mu.Unlock()
return o
}
func (aw *apiWatcher) startWatchersForRole(role string) {
parseObject, parseObjectList := getObjectParsersForRole(role)
paths := getAPIPaths(role, aw.namespaces, aw.selectors)
for _, path := range paths {
apiURL := aw.apiServer + path
aw.startWatcherForURL(role, apiURL, parseObject, parseObjectList)
}
}
func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) {
aw.mu.Lock()
if aw.watchersByURL[apiURL] != nil {
// Watcher for the given path already exists.
aw.mu.Unlock()
return
}
uw := aw.newURLWatcher(role, apiURL, parseObject, parseObjectList)
aw.watchersByURL[apiURL] = uw
aw.mu.Unlock()
uw.watchersCount.Inc()
uw.watchersCreated.Inc()
uw.reloadObjects()
aw.wg.Add(1)
go func() {
defer aw.wg.Done()
uw.watchForUpdates()
uw.objectsByKey.decRef()
aw.mu.Lock()
delete(aw.watchersByURL, apiURL)
aw.mu.Unlock()
uw.watchersCount.Dec()
uw.watchersStopped.Inc()
}()
}
// needStop returns true if aw must be stopped.
func (aw *apiWatcher) needStop() bool {
select {
case <-aw.stopCtx.Done():
return true
default:
return false
}
}
// doRequest performs http request to the given requestURL.
func (aw *apiWatcher) doRequest(requestURL string) (*http.Response, error) {
req, err := http.NewRequestWithContext(aw.stopCtx, "GET", requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
if aw.authorization != "" {
req.Header.Set("Authorization", aw.authorization)
}
return aw.client.Do(req)
}
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
type urlWatcher struct {
role string
apiURL string
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey *objectsMap
// mu protects swosByKey and resourceVersion
mu sync.Mutex
swosByKey map[string][]interface{}
resourceVersion string
// the parent apiWatcher
aw *apiWatcher
watchersCount *metrics.Counter
watchersCreated *metrics.Counter
watchersStopped *metrics.Counter
}
func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher {
return &urlWatcher{
role: role,
apiURL: apiURL,
parseObject: parseObject,
parseObjectList: parseObjectList,
objectsByKey: sharedObjectsGlobal.getByAPIURL(role, apiURL),
swosByKey: make(map[string][]interface{}),
aw: aw,
watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)),
watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)),
watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)),
}
}
// Limit the concurrency for per-role objects reloading to 1.
//
// This should reduce memory usage when big number of watchers simultaneously receive an update for objects of the same role.
var reloadObjectsLocksByRole = map[string]*sync.Mutex{
"node": {},
"pod": {},
"service": {},
"endpoints": {},
"endpointslices": {},
"ingress": {},
}
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.mu.Lock()
uw.resourceVersion = resourceVersion
uw.mu.Unlock()
}
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func (uw *urlWatcher) reloadObjects() string {
lock := reloadObjectsLocksByRole[uw.role]
lock.Lock()
defer lock.Unlock()
uw.mu.Lock()
resourceVersion := uw.resourceVersion
uw.mu.Unlock()
if resourceVersion != "" {
// Fast path - objects have been already reloaded by concurrent goroutines.
return resourceVersion
}
aw := uw.aw
requestURL := uw.apiURL
resp, err := aw.doRequest(requestURL)
if err != nil {
if !aw.needStop() {
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
}
return ""
}
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
return ""
}
objectsByKey, metadata, err := uw.parseObjectList(body)
if err != nil {
if !aw.needStop() {
logger.Errorf("cannot parse response from %q: %s", requestURL, err)
}
return ""
}
logger.Infof("loaded %d objects from %q", len(objectsByKey), requestURL)
uw.objectsByKey.reload(objectsByKey)
func (aw *apiWatcher) reloadScrapeWorks(objectsByKey map[string]object) {
swosByKey := make(map[string][]interface{})
for k, o := range objectsByKey {
for key, o := range objectsByKey {
labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[k] = swos
swosByKey[key] = swos
}
}
uw.mu.Lock()
uw.swosByKey = swosByKey
uw.resourceVersion = metadata.ResourceVersion
uw.mu.Unlock()
aw.swosByKeyLock.Lock()
aw.swosByKey = swosByKey
aw.swosByKeyLock.Unlock()
}
return metadata.ResourceVersion
func (aw *apiWatcher) setScrapeWorks(key string, labels []map[string]string) {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
aw.swosByKeyLock.Lock()
if len(swos) > 0 {
aw.swosByKey[key] = swos
} else {
delete(aw.swosByKey, key)
}
aw.swosByKeyLock.Unlock()
}
func (aw *apiWatcher) removeScrapeWorks(key string) {
aw.swosByKeyLock.Lock()
delete(aw.swosByKey, key)
aw.swosByKeyLock.Unlock()
}
func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} {
@ -319,11 +130,279 @@ func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []
return swos
}
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
aw.startWatchersForRole(aw.sdc.Role)
aw.swosByKeyLock.Lock()
defer aw.swosByKeyLock.Unlock()
size := 0
for _, swosLocal := range aw.swosByKey {
size += len(swosLocal)
}
swos := make([]interface{}, 0, size)
for _, swosLocal := range aw.swosByKey {
swos = append(swos, swosLocal...)
}
return swos
}
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
if aw == nil {
// this is needed for testing
return nil
}
key := namespace + "/" + name
aw.startWatchersForRole(role)
aw.watchersByURLLock.Lock()
defer aw.watchersByURLLock.Unlock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
o := uw.objectsByKey[key]
uw.mu.Unlock()
if o != nil {
return o
}
}
return nil
}
func (aw *apiWatcher) startWatchersForRole(role string) {
paths := getAPIPaths(role, aw.sdc.Namespaces.Names, aw.sdc.Selectors)
for _, path := range paths {
apiURL := aw.apiServer + path
aw.startWatcherForURL(role, apiURL)
}
}
func (aw *apiWatcher) startWatcherForURL(role, apiURL string) {
aw.watchersByURLLock.Lock()
if aw.watchersByURL[apiURL] != nil {
// Watcher for the given path already exists.
aw.watchersByURLLock.Unlock()
return
}
uw := getURLWatcher(role, apiURL, aw.sdc.ProxyURL.URL(), aw.ac)
uw.addAPIWatcher(aw)
aw.watchersByURL[apiURL] = uw
aw.watchersByURLLock.Unlock()
aw.wg.Add(1)
go func() {
defer aw.wg.Done()
<-aw.stopCh
aw.watchersByURLLock.Lock()
uw.removeAPIWatcher(aw)
delete(aw.watchersByURL, apiURL)
aw.watchersByURLLock.Unlock()
}()
}
func getURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher {
key := fmt.Sprintf("url=%s, proxyURL=%v, authConfig=%s", apiURL, proxyURL, ac.String())
urlWatchersLock.Lock()
uw := urlWatchers[key]
logger.Infof("found watcher for key=%s", key)
if uw == nil {
uw = newURLWatcher(role, apiURL, proxyURL, ac)
urlWatchers[key] = uw
logger.Infof("registered watcher for key=%s", key)
}
urlWatchersLock.Unlock()
return uw
}
var (
urlWatchersLock sync.Mutex
urlWatchers = make(map[string]*urlWatcher)
)
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
type urlWatcher struct {
role string
apiURL string
authorization string
client *http.Client
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// mu protects aws, objectsByKey and resourceVersion
mu sync.Mutex
// aws contains registered apiWatcher objects
aws map[*apiWatcher]struct{}
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
resourceVersion string
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsUpdated *metrics.Counter
}
func newURLWatcher(role, apiURL string, proxyURL *url.URL, ac *promauth.Config) *urlWatcher {
var proxy func(*http.Request) (*url.URL, error)
if proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: ac.NewTLSConfig(),
Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: *apiServerTimeout,
},
Timeout: *apiServerTimeout,
}
parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
uw := &urlWatcher{
role: role,
apiURL: apiURL,
authorization: ac.Authorization,
client: client,
parseObject: parseObject,
parseObjectList: parseObjectList,
aws: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
}
uw.reloadObjects()
go uw.watchForUpdates()
return uw
}
func (uw *urlWatcher) addAPIWatcher(aw *apiWatcher) {
uw.mu.Lock()
if _, ok := uw.aws[aw]; ok {
logger.Panicf("BUG: aw=%p has been already added", aw)
}
uw.aws[aw] = struct{}{}
objectsByKey := make(map[string]object)
for key, o := range uw.objectsByKey {
objectsByKey[key] = o
}
uw.mu.Unlock()
aw.reloadScrapeWorks(objectsByKey)
}
func (uw *urlWatcher) removeAPIWatcher(aw *apiWatcher) {
uw.mu.Lock()
if _, ok := uw.aws[aw]; !ok {
logger.Panicf("BUG: aw=%p is missing", aw)
}
delete(uw.aws, aw)
uw.mu.Unlock()
}
// doRequest performs http request to the given requestURL.
func (uw *urlWatcher) doRequest(requestURL string) (*http.Response, error) {
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
if uw.authorization != "" {
req.Header.Set("Authorization", uw.authorization)
}
return uw.client.Do(req)
}
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.mu.Lock()
uw.resourceVersion = resourceVersion
uw.mu.Unlock()
}
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func (uw *urlWatcher) reloadObjects() string {
uw.mu.Lock()
resourceVersion := uw.resourceVersion
uw.mu.Unlock()
if resourceVersion != "" {
// Fast path - there is no need in reloading the objects.
return resourceVersion
}
requestURL := uw.apiURL
resp, err := uw.doRequest(requestURL)
if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
return ""
}
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
return ""
}
objectsByKey, metadata, err := uw.parseObjectList(resp.Body)
_ = resp.Body.Close()
if err != nil {
logger.Errorf("cannot parse objects from %q: %s", requestURL, err)
return ""
}
uw.mu.Lock()
var updated, removed, added int
for key := range uw.objectsByKey {
if o, ok := objectsByKey[key]; ok {
uw.objectsByKey[key] = o
updated++
} else {
delete(uw.objectsByKey, key)
removed++
}
}
for key, o := range objectsByKey {
if _, ok := uw.objectsByKey[key]; !ok {
uw.objectsByKey[key] = o
added++
}
}
uw.objectsUpdated.Add(updated)
uw.objectsRemoved.Add(removed)
uw.objectsAdded.Add(added)
uw.objectsCount.Add(added - removed)
uw.resourceVersion = metadata.ResourceVersion
uw.mu.Unlock()
for _, aw := range uw.getAPIWatchers() {
aw.reloadScrapeWorks(objectsByKey)
}
logger.Infof("loaded %d objects from %q", len(objectsByKey), requestURL)
return metadata.ResourceVersion
}
func (uw *urlWatcher) getAPIWatchers() []*apiWatcher {
uw.mu.Lock()
aws := make([]*apiWatcher, 0, len(uw.aws))
for aw := range uw.aws {
aws = append(aws, aw)
}
uw.mu.Unlock()
return aws
}
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates() {
aw := uw.aw
backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second
backoffSleep := func() {
@ -338,23 +417,17 @@ func (uw *urlWatcher) watchForUpdates() {
if strings.Contains(apiURL, "?") {
delimiter = "&"
}
timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds()
timeoutSeconds := time.Duration(0.9 * float64(uw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
if aw.needStop() {
return
}
resourceVersion := uw.reloadObjects()
requestURL := apiURL
if resourceVersion != "" {
requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion)
}
resp, err := aw.doRequest(requestURL)
resp, err := uw.doRequest(requestURL)
if err != nil {
if aw.needStop() {
return
}
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
logger.Errorf("cannot performing request to %q: %s", requestURL, err)
backoffSleep()
continue
}
@ -375,9 +448,6 @@ func (uw *urlWatcher) watchForUpdates() {
err = uw.readObjectUpdateStream(resp.Body)
_ = resp.Body.Close()
if err != nil {
if aw.needStop() {
return
}
if !errors.Is(err, io.EOF) {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
}
@ -389,7 +459,6 @@ func (uw *urlWatcher) watchForUpdates() {
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
aw := uw.aw
d := json.NewDecoder(r)
var we WatchEvent
for {
@ -403,26 +472,35 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
return err
}
key := o.key()
uw.objectsByKey.update(key, o)
labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
uw.mu.Lock()
if len(swos) > 0 {
uw.swosByKey[key] = swos
if _, ok := uw.objectsByKey[key]; !ok {
uw.objectsCount.Inc()
uw.objectsAdded.Inc()
} else {
delete(uw.swosByKey, key)
uw.objectsUpdated.Inc()
}
uw.objectsByKey[key] = o
uw.mu.Unlock()
for _, aw := range uw.getAPIWatchers() {
labels := o.getTargetLabels(aw)
aw.setScrapeWorks(key, labels)
}
case "DELETED":
o, err := uw.parseObject(we.Object)
if err != nil {
return err
}
key := o.key()
uw.objectsByKey.remove(key)
uw.mu.Lock()
delete(uw.swosByKey, key)
if _, ok := uw.objectsByKey[key]; ok {
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
}
uw.mu.Unlock()
for _, aw := range uw.getAPIWatchers() {
aw.removeScrapeWorks(key)
}
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
bm, err := parseBookmark(we.Object)
@ -546,105 +624,3 @@ func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc)
return nil, nil
}
}
type objectsMap struct {
mu sync.Mutex
refCount int
m map[string]object
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsCount *metrics.Counter
}
func (om *objectsMap) incRef() {
om.mu.Lock()
om.refCount++
om.mu.Unlock()
}
func (om *objectsMap) decRef() {
om.mu.Lock()
om.refCount--
if om.refCount < 0 {
logger.Panicf("BUG: refCount cannot be smaller than 0; got %d", om.refCount)
}
if om.refCount == 0 {
// Free up memory occupied by om.m
om.objectsRemoved.Add(len(om.m))
om.objectsCount.Add(-len(om.m))
om.m = make(map[string]object)
}
om.mu.Unlock()
}
func (om *objectsMap) reload(m map[string]object) {
om.mu.Lock()
om.objectsAdded.Add(len(m))
om.objectsRemoved.Add(len(om.m))
om.objectsCount.Add(len(m) - len(om.m))
for k := range om.m {
delete(om.m, k)
}
for k, o := range m {
om.m[k] = o
}
om.mu.Unlock()
}
func (om *objectsMap) update(key string, o object) {
om.mu.Lock()
if om.m[key] == nil {
om.objectsAdded.Inc()
om.objectsCount.Inc()
}
om.m[key] = o
om.mu.Unlock()
}
func (om *objectsMap) remove(key string) {
om.mu.Lock()
if om.m[key] != nil {
om.objectsRemoved.Inc()
om.objectsCount.Dec()
delete(om.m, key)
}
om.mu.Unlock()
}
func (om *objectsMap) get(key string) object {
om.mu.Lock()
o, ok := om.m[key]
om.mu.Unlock()
if !ok {
return nil
}
return o
}
type sharedObjects struct {
mu sync.Mutex
oms map[string]*objectsMap
}
func (so *sharedObjects) getByAPIURL(role, apiURL string) *objectsMap {
so.mu.Lock()
om := so.oms[apiURL]
if om == nil {
om = &objectsMap{
m: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
}
so.oms[apiURL] = om
}
so.mu.Unlock()
om.incRef()
return om
}
var sharedObjectsGlobal = &sharedObjects{
oms: make(map[string]*objectsMap),
}

View file

@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
@ -11,10 +12,11 @@ func (eps *Endpoints) key() string {
return eps.Metadata.key()
}
func parseEndpointsList(data []byte) (map[string]object, ListMeta, error) {
func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) {
var epsl EndpointsList
if err := json.Unmarshal(data, &epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList from %q: %w", data, err)
d := json.NewDecoder(r)
if err := d.Decode(&epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err)
}
objectsByKey := make(map[string]object)
for _, eps := range epsl.Items {

View file

@ -1,6 +1,7 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -10,7 +11,8 @@ import (
func TestParseEndpointsListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
objectsByKey, _, err := parseEndpointsList([]byte(s))
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseEndpointsList(r)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -78,7 +80,8 @@ func TestParseEndpointsListSuccess(t *testing.T) {
]
}
`
objectsByKey, meta, err := parseEndpointsList([]byte(data))
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseEndpointsList(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View file

@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
@ -12,10 +13,11 @@ func (eps *EndpointSlice) key() string {
return eps.Metadata.key()
}
func parseEndpointSliceList(data []byte) (map[string]object, ListMeta, error) {
func parseEndpointSliceList(r io.Reader) (map[string]object, ListMeta, error) {
var epsl EndpointSliceList
if err := json.Unmarshal(data, &epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList from %q: %w", data, err)
d := json.NewDecoder(r)
if err := d.Decode(&epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList: %w", err)
}
objectsByKey := make(map[string]object)
for _, eps := range epsl.Items {

View file

@ -1,6 +1,7 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -9,7 +10,8 @@ import (
func TestParseEndpointSliceListFail(t *testing.T) {
f := func(data string) {
objectsByKey, _, err := parseEndpointSliceList([]byte(data))
r := bytes.NewBufferString(data)
objectsByKey, _, err := parseEndpointSliceList(r)
if err == nil {
t.Errorf("unexpected result, test must fail! data: %s", data)
}
@ -175,7 +177,8 @@ func TestParseEndpointSliceListSuccess(t *testing.T) {
}
]
}`
objectsByKey, meta, err := parseEndpointSliceList([]byte(data))
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseEndpointSliceList(r)
if err != nil {
t.Errorf("cannot parse data for EndpointSliceList: %v", err)
return

View file

@ -3,16 +3,18 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
)
func (ig *Ingress) key() string {
return ig.Metadata.key()
}
func parseIngressList(data []byte) (map[string]object, ListMeta, error) {
func parseIngressList(r io.Reader) (map[string]object, ListMeta, error) {
var igl IngressList
if err := json.Unmarshal(data, &igl); err != nil {
return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList from %q: %w", data, err)
d := json.NewDecoder(r)
if err := d.Decode(&igl); err != nil {
return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList: %w", err)
}
objectsByKey := make(map[string]object)
for _, ig := range igl.Items {

View file

@ -1,6 +1,7 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -10,7 +11,8 @@ import (
func TestParseIngressListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
objectsByKey, _, err := parseIngressList([]byte(s))
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseIngressList(r)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -70,7 +72,8 @@ func TestParseIngressListSuccess(t *testing.T) {
}
]
}`
objectsByKey, meta, err := parseIngressList([]byte(data))
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseIngressList(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View file

@ -48,12 +48,7 @@ func (sdc *SDConfig) GetScrapeWorkObjects(baseDir string, swcFunc ScrapeWorkCons
if err != nil {
return nil, fmt.Errorf("cannot create API config: %w", err)
}
switch sdc.Role {
case "node", "pod", "service", "endpoints", "endpointslices", "ingress":
return cfg.aw.getScrapeWorkObjectsForRole(sdc.Role), nil
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role)
}
return cfg.aw.getScrapeWorkObjects(), nil
}
// MustStop stops further usage for sdc.

View file

@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
@ -12,10 +13,11 @@ func (n *Node) key() string {
return n.Metadata.key()
}
func parseNodeList(data []byte) (map[string]object, ListMeta, error) {
func parseNodeList(r io.Reader) (map[string]object, ListMeta, error) {
var nl NodeList
if err := json.Unmarshal(data, &nl); err != nil {
return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList from %q: %w", data, err)
d := json.NewDecoder(r)
if err := d.Decode(&nl); err != nil {
return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList: %w", err)
}
objectsByKey := make(map[string]object)
for _, n := range nl.Items {

View file

@ -1,6 +1,7 @@
package kubernetes
import (
"bytes"
"reflect"
"sort"
"strconv"
@ -13,7 +14,8 @@ import (
func TestParseNodeListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
objectsByKey, _, err := parseNodeList([]byte(s))
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseNodeList(r)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -229,7 +231,8 @@ func TestParseNodeListSuccess(t *testing.T) {
]
}
`
objectsByKey, meta, err := parseNodeList([]byte(data))
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseNodeList(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View file

@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
@ -13,10 +14,11 @@ func (p *Pod) key() string {
return p.Metadata.key()
}
func parsePodList(data []byte) (map[string]object, ListMeta, error) {
func parsePodList(r io.Reader) (map[string]object, ListMeta, error) {
var pl PodList
if err := json.Unmarshal(data, &pl); err != nil {
return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList from %q: %w", data, err)
d := json.NewDecoder(r)
if err := d.Decode(&pl); err != nil {
return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList: %w", err)
}
objectsByKey := make(map[string]object)
for _, p := range pl.Items {

View file

@ -1,6 +1,7 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -10,7 +11,8 @@ import (
func TestParsePodListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
objectsByKey, _, err := parsePodList([]byte(s))
r := bytes.NewBufferString(s)
objectsByKey, _, err := parsePodList(r)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -227,7 +229,8 @@ func TestParsePodListSuccess(t *testing.T) {
]
}
`
objectsByKey, meta, err := parsePodList([]byte(data))
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parsePodList(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View file

@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
@ -11,10 +12,11 @@ func (s *Service) key() string {
return s.Metadata.key()
}
func parseServiceList(data []byte) (map[string]object, ListMeta, error) {
func parseServiceList(r io.Reader) (map[string]object, ListMeta, error) {
var sl ServiceList
if err := json.Unmarshal(data, &sl); err != nil {
return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList from %q: %w", data, err)
d := json.NewDecoder(r)
if err := d.Decode(&sl); err != nil {
return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList: %w", err)
}
objectsByKey := make(map[string]object)
for _, s := range sl.Items {

View file

@ -1,6 +1,7 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -10,7 +11,8 @@ import (
func TestParseServiceListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
objectsByKey, _, err := parseServiceList([]byte(s))
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseServiceList(r)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -88,7 +90,8 @@ func TestParseServiceListSuccess(t *testing.T) {
]
}
`
objectsByKey, meta, err := parseServiceList([]byte(data))
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseServiceList(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}