mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
lib/promscrape/discovery/kubernetes: remove a mutex at urlWatcher - use groupWatcher mutex for accessing all the urlWatcher children
This simplifies the code a bit and reduces the probability of improper mutex handling and deadlocks.
This commit is contained in:
parent
535b3ff618
commit
421a92983a
3 changed files with 62 additions and 140 deletions
|
@ -33,6 +33,8 @@ type WatchEvent struct {
|
||||||
// object is any Kubernetes object.
|
// object is any Kubernetes object.
|
||||||
type object interface {
|
type object interface {
|
||||||
key() string
|
key() string
|
||||||
|
|
||||||
|
// getTargetLabels must be called under gw.mu lock.
|
||||||
getTargetLabels(gw *groupWatcher) []map[string]string
|
getTargetLabels(gw *groupWatcher) []map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,62 +233,13 @@ var (
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
|
func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object {
|
||||||
func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
|
|
||||||
if gw == nil {
|
if gw == nil {
|
||||||
// this is needed for testing
|
// this is needed for testing
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
o := gw.getCachedObjectByRole(role, namespace, name)
|
|
||||||
if o != nil {
|
|
||||||
// Fast path: the object has been found in the cache.
|
|
||||||
return o
|
|
||||||
}
|
|
||||||
|
|
||||||
// The object wasn't found in the cache. Try querying it directly from API server.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details.
|
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_loads_total{role=%q}`, role)).Inc()
|
|
||||||
objectType := getObjectTypeByRole(role)
|
|
||||||
path := getAPIPath(objectType, namespace, "")
|
|
||||||
path += "/" + name
|
|
||||||
requestURL := gw.apiServer + path
|
|
||||||
resp, err := gw.doRequest(requestURL)
|
|
||||||
if err != nil {
|
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
||||||
logger.Errorf("cannot obtain data for object %s (namespace=%q, name=%q): %s", role, namespace, name, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
data, err := ioutil.ReadAll(resp.Body)
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
if err != nil {
|
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
||||||
logger.Errorf("cannot read response from %q: %s", requestURL, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_misses_total{role=%q}`, role)).Inc()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
||||||
logger.Errorf("unexpected status code when reading response from %q; got %d; want %d; response body: %q", requestURL, resp.StatusCode, http.StatusOK, data)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
parseObject, _ := getObjectParsersForRole(role)
|
|
||||||
o, err = parseObject(data)
|
|
||||||
if err != nil {
|
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
|
|
||||||
logger.Errorf("cannot parse object obtained from %q: %s; response body: %q", requestURL, err, data)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself.
|
|
||||||
return o
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object {
|
|
||||||
key := namespace + "/" + name
|
key := namespace + "/" + name
|
||||||
uws := gw.getURLWatchers()
|
for _, uw := range gw.m {
|
||||||
for _, uw := range uws {
|
|
||||||
if uw.role != role {
|
if uw.role != role {
|
||||||
// Role mismatch
|
// Role mismatch
|
||||||
continue
|
continue
|
||||||
|
@ -295,26 +248,26 @@ func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) obje
|
||||||
// Namespace mismatch
|
// Namespace mismatch
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
uw.mu.Lock()
|
if o := uw.objectsByKey[key]; o != nil {
|
||||||
o := uw.objectsByKey[key]
|
|
||||||
uw.mu.Unlock()
|
|
||||||
if o != nil {
|
|
||||||
return o
|
return o
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gw *groupWatcher) refreshEndpointsLabels(namespace, key string) {
|
func (gw *groupWatcher) refreshEndpointsLabelsLocked(namespace, key string) {
|
||||||
// Refresh endpoints and endpointslices labels for the corresponding service as Prometheus does.
|
// Refresh endpoints and endpointslices labels for the corresponding service as Prometheus does.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||||
gw.refreshObjectLabels("endpoints", namespace, key)
|
gw.refreshObjectLabelsLocked("endpoints", namespace, key)
|
||||||
gw.refreshObjectLabels("endpointslices", namespace, key)
|
gw.refreshObjectLabelsLocked("endpointslices", namespace, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) {
|
func (gw *groupWatcher) refreshObjectLabelsLocked(role, namespace, key string) {
|
||||||
uws := gw.getURLWatchers()
|
for _, uw := range gw.m {
|
||||||
for _, uw := range uws {
|
if len(uw.aws) == 0 {
|
||||||
|
// No apiWatchers to notify
|
||||||
|
continue
|
||||||
|
}
|
||||||
if uw.role != role {
|
if uw.role != role {
|
||||||
// Role mismatch
|
// Role mismatch
|
||||||
continue
|
continue
|
||||||
|
@ -323,16 +276,9 @@ func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) {
|
||||||
// Namespace mismatch
|
// Namespace mismatch
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var aws []*apiWatcher
|
if o := uw.objectsByKey[key]; o != nil {
|
||||||
uw.mu.Lock()
|
|
||||||
o := uw.objectsByKey[key]
|
|
||||||
if o != nil {
|
|
||||||
aws = uw.getAPIWatchersLocked()
|
|
||||||
}
|
|
||||||
uw.mu.Unlock()
|
|
||||||
if len(aws) > 0 {
|
|
||||||
labels := o.getTargetLabels(gw)
|
labels := o.getTargetLabels(gw)
|
||||||
for _, aw := range aws {
|
for aw := range uw.aws {
|
||||||
aw.setScrapeWorks(namespace, key, labels)
|
aw.setScrapeWorks(namespace, key, labels)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -350,14 +296,14 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
|
||||||
uw = newURLWatcher(role, namespaces[i], apiURL, gw)
|
uw = newURLWatcher(role, namespaces[i], apiURL, gw)
|
||||||
gw.m[apiURL] = uw
|
gw.m[apiURL] = uw
|
||||||
}
|
}
|
||||||
|
if aw != nil {
|
||||||
|
uw.subscribeAPIWatcherLocked(aw)
|
||||||
|
}
|
||||||
gw.mu.Unlock()
|
gw.mu.Unlock()
|
||||||
if needStart {
|
if needStart {
|
||||||
uw.reloadObjects()
|
uw.reloadObjects()
|
||||||
go uw.watchForUpdates()
|
go uw.watchForUpdates()
|
||||||
}
|
}
|
||||||
if aw != nil {
|
|
||||||
uw.subscribeAPIWatcher(aw)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,30 +320,24 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gw *groupWatcher) registerPendingAPIWatchers() {
|
func (gw *groupWatcher) registerPendingAPIWatchers() {
|
||||||
uws := gw.getURLWatchers()
|
|
||||||
for _, uw := range uws {
|
|
||||||
uw.registerPendingAPIWatchers()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gw *groupWatcher) getURLWatchers() []*urlWatcher {
|
|
||||||
gw.mu.Lock()
|
gw.mu.Lock()
|
||||||
uws := make([]*urlWatcher, 0, len(gw.m))
|
defer gw.mu.Unlock()
|
||||||
for _, uw := range gw.m {
|
for _, uw := range gw.m {
|
||||||
uws = append(uws, uw)
|
uw.registerPendingAPIWatchersLocked()
|
||||||
}
|
}
|
||||||
gw.mu.Unlock()
|
|
||||||
return uws
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
|
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
|
||||||
uws := gw.getURLWatchers()
|
gw.mu.Lock()
|
||||||
for _, uw := range uws {
|
defer gw.mu.Unlock()
|
||||||
uw.unsubscribeAPIWatcher(aw)
|
for _, uw := range gw.m {
|
||||||
|
uw.unsubscribeAPIWatcherLocked(aw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
|
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
|
||||||
|
//
|
||||||
|
// urlWatcher fields must be accessed under gw.mu lock.
|
||||||
type urlWatcher struct {
|
type urlWatcher struct {
|
||||||
role string
|
role string
|
||||||
namespace string
|
namespace string
|
||||||
|
@ -407,14 +347,11 @@ type urlWatcher struct {
|
||||||
parseObject parseObjectFunc
|
parseObject parseObjectFunc
|
||||||
parseObjectList parseObjectListFunc
|
parseObjectList parseObjectListFunc
|
||||||
|
|
||||||
// mu protects aws, awsPending, objectsByKey and resourceVersion
|
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
// awsPending contains pending apiWatcher objects, which are registered in a batch.
|
// awsPending contains pending apiWatcher objects, which are registered in a batch.
|
||||||
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
|
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
|
||||||
// shared among big number of scrape jobs, since per-object labels are generated only once
|
// shared among big number of scrape jobs, since per-object labels are generated only once
|
||||||
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
|
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
|
||||||
// See reloadScrapeWorksForAPIWatchers for details.
|
// See reloadScrapeWorksForAPIWatchersLocked for details.
|
||||||
awsPending map[*apiWatcher]struct{}
|
awsPending map[*apiWatcher]struct{}
|
||||||
|
|
||||||
// aws contains registered apiWatcher objects
|
// aws contains registered apiWatcher objects
|
||||||
|
@ -458,37 +395,31 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
|
||||||
return uw
|
return uw
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) {
|
func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) {
|
||||||
uw.mu.Lock()
|
|
||||||
if _, ok := uw.aws[aw]; !ok {
|
if _, ok := uw.aws[aw]; !ok {
|
||||||
if _, ok := uw.awsPending[aw]; !ok {
|
if _, ok := uw.awsPending[aw]; !ok {
|
||||||
uw.awsPending[aw] = struct{}{}
|
uw.awsPending[aw] = struct{}{}
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc()
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
uw.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uw *urlWatcher) registerPendingAPIWatchers() {
|
func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
|
||||||
uw.mu.Lock()
|
|
||||||
if len(uw.awsPending) == 0 {
|
if len(uw.awsPending) == 0 {
|
||||||
uw.mu.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
awsPending := make([]*apiWatcher, 0, len(uw.awsPending))
|
awsPending := make([]*apiWatcher, 0, len(uw.awsPending))
|
||||||
for aw := range uw.awsPending {
|
for aw := range uw.awsPending {
|
||||||
awsPending = append(awsPending, aw)
|
awsPending = append(awsPending, aw)
|
||||||
delete(uw.awsPending, aw)
|
|
||||||
uw.aws[aw] = struct{}{}
|
uw.aws[aw] = struct{}{}
|
||||||
}
|
}
|
||||||
uw.reloadScrapeWorksForAPIWatchers(awsPending, uw.objectsByKey)
|
uw.reloadScrapeWorksForAPIWatchersLocked(uw.awsPending)
|
||||||
uw.mu.Unlock()
|
uw.awsPending = make(map[*apiWatcher]struct{})
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending))
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending))
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending))
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
|
func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) {
|
||||||
uw.mu.Lock()
|
|
||||||
if _, ok := uw.awsPending[aw]; ok {
|
if _, ok := uw.awsPending[aw]; ok {
|
||||||
delete(uw.awsPending, aw)
|
delete(uw.awsPending, aw)
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec()
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec()
|
||||||
|
@ -497,20 +428,19 @@ func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
|
||||||
delete(uw.aws, aw)
|
delete(uw.aws, aw)
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec()
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec()
|
||||||
}
|
}
|
||||||
uw.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
|
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
|
||||||
uw.mu.Lock()
|
uw.gw.mu.Lock()
|
||||||
uw.resourceVersion = resourceVersion
|
uw.resourceVersion = resourceVersion
|
||||||
uw.mu.Unlock()
|
uw.gw.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
|
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
|
||||||
func (uw *urlWatcher) reloadObjects() string {
|
func (uw *urlWatcher) reloadObjects() string {
|
||||||
uw.mu.Lock()
|
uw.gw.mu.Lock()
|
||||||
resourceVersion := uw.resourceVersion
|
resourceVersion := uw.resourceVersion
|
||||||
uw.mu.Unlock()
|
uw.gw.mu.Unlock()
|
||||||
if resourceVersion != "" {
|
if resourceVersion != "" {
|
||||||
// Fast path - there is no need in reloading the objects.
|
// Fast path - there is no need in reloading the objects.
|
||||||
return resourceVersion
|
return resourceVersion
|
||||||
|
@ -535,7 +465,7 @@ func (uw *urlWatcher) reloadObjects() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
uw.mu.Lock()
|
uw.gw.mu.Lock()
|
||||||
var updated, removed, added int
|
var updated, removed, added int
|
||||||
for key := range uw.objectsByKey {
|
for key := range uw.objectsByKey {
|
||||||
if o, ok := objectsByKey[key]; ok {
|
if o, ok := objectsByKey[key]; ok {
|
||||||
|
@ -556,31 +486,34 @@ func (uw *urlWatcher) reloadObjects() string {
|
||||||
uw.objectsRemoved.Add(removed)
|
uw.objectsRemoved.Add(removed)
|
||||||
uw.objectsAdded.Add(added)
|
uw.objectsAdded.Add(added)
|
||||||
uw.objectsCount.Add(added - removed)
|
uw.objectsCount.Add(added - removed)
|
||||||
|
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
|
||||||
uw.resourceVersion = metadata.ResourceVersion
|
uw.resourceVersion = metadata.ResourceVersion
|
||||||
aws := uw.getAPIWatchersLocked()
|
|
||||||
uw.mu.Unlock()
|
|
||||||
|
|
||||||
uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey)
|
|
||||||
if uw.role == "service" {
|
if uw.role == "service" {
|
||||||
// Refresh endpoints labels for the corresponding services as Prometheus does.
|
// Refresh endpoints labels for the corresponding services as Prometheus does.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||||
for key := range objectsByKey {
|
for key := range objectsByKey {
|
||||||
uw.gw.refreshEndpointsLabels(uw.namespace, key)
|
uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
uw.gw.mu.Unlock()
|
||||||
|
|
||||||
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
|
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
|
||||||
return uw.resourceVersion
|
return uw.resourceVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) {
|
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatcher]struct{}) {
|
||||||
if len(aws) == 0 {
|
if len(awsMap) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
aws := make([]*apiWatcher, 0, len(awsMap))
|
||||||
|
for aw := range awsMap {
|
||||||
|
aws = append(aws, aw)
|
||||||
|
}
|
||||||
swosByKey := make([]map[string][]interface{}, len(aws))
|
swosByKey := make([]map[string][]interface{}, len(aws))
|
||||||
for i := range aws {
|
for i := range aws {
|
||||||
swosByKey[i] = make(map[string][]interface{})
|
swosByKey[i] = make(map[string][]interface{})
|
||||||
}
|
}
|
||||||
for key, o := range objectsByKey {
|
for key, o := range uw.objectsByKey {
|
||||||
labels := o.getTargetLabels(uw.gw)
|
labels := o.getTargetLabels(uw.gw)
|
||||||
for i, aw := range aws {
|
for i, aw := range aws {
|
||||||
swos := aw.getScrapeWorkObjectsForLabels(labels)
|
swos := aw.getScrapeWorkObjectsForLabels(labels)
|
||||||
|
@ -594,15 +527,6 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objects
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uw *urlWatcher) getAPIWatchersLocked() []*apiWatcher {
|
|
||||||
awsMap := uw.aws
|
|
||||||
aws := make([]*apiWatcher, 0, len(awsMap))
|
|
||||||
for aw := range awsMap {
|
|
||||||
aws = append(aws, aw)
|
|
||||||
}
|
|
||||||
return aws
|
|
||||||
}
|
|
||||||
|
|
||||||
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
|
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
|
||||||
//
|
//
|
||||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
|
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
|
||||||
|
@ -678,7 +602,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
key := o.key()
|
key := o.key()
|
||||||
uw.mu.Lock()
|
uw.gw.mu.Lock()
|
||||||
if _, ok := uw.objectsByKey[key]; !ok {
|
if _, ok := uw.objectsByKey[key]; !ok {
|
||||||
uw.objectsCount.Inc()
|
uw.objectsCount.Inc()
|
||||||
uw.objectsAdded.Inc()
|
uw.objectsAdded.Inc()
|
||||||
|
@ -686,41 +610,39 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
|
||||||
uw.objectsUpdated.Inc()
|
uw.objectsUpdated.Inc()
|
||||||
}
|
}
|
||||||
uw.objectsByKey[key] = o
|
uw.objectsByKey[key] = o
|
||||||
aws := uw.getAPIWatchersLocked()
|
if len(uw.aws) > 0 {
|
||||||
uw.mu.Unlock()
|
|
||||||
if len(aws) > 0 {
|
|
||||||
labels := o.getTargetLabels(uw.gw)
|
labels := o.getTargetLabels(uw.gw)
|
||||||
for _, aw := range aws {
|
for aw := range uw.aws {
|
||||||
aw.setScrapeWorks(uw.namespace, key, labels)
|
aw.setScrapeWorks(uw.namespace, key, labels)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if uw.role == "service" {
|
if uw.role == "service" {
|
||||||
// Refresh endpoints labels for the corresponding service as Prometheus does.
|
// Refresh endpoints labels for the corresponding service as Prometheus does.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||||
uw.gw.refreshEndpointsLabels(uw.namespace, key)
|
uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
|
||||||
}
|
}
|
||||||
|
uw.gw.mu.Unlock()
|
||||||
case "DELETED":
|
case "DELETED":
|
||||||
o, err := uw.parseObject(we.Object)
|
o, err := uw.parseObject(we.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
key := o.key()
|
key := o.key()
|
||||||
uw.mu.Lock()
|
uw.gw.mu.Lock()
|
||||||
if _, ok := uw.objectsByKey[key]; ok {
|
if _, ok := uw.objectsByKey[key]; ok {
|
||||||
uw.objectsCount.Dec()
|
uw.objectsCount.Dec()
|
||||||
uw.objectsRemoved.Inc()
|
uw.objectsRemoved.Inc()
|
||||||
delete(uw.objectsByKey, key)
|
delete(uw.objectsByKey, key)
|
||||||
}
|
}
|
||||||
aws := uw.getAPIWatchersLocked()
|
for aw := range uw.aws {
|
||||||
uw.mu.Unlock()
|
|
||||||
for _, aw := range aws {
|
|
||||||
aw.removeScrapeWorks(uw.namespace, key)
|
aw.removeScrapeWorks(uw.namespace, key)
|
||||||
}
|
}
|
||||||
if uw.role == "service" {
|
if uw.role == "service" {
|
||||||
// Refresh endpoints labels for the corresponding service as Prometheus does.
|
// Refresh endpoints labels for the corresponding service as Prometheus does.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||||
uw.gw.refreshEndpointsLabels(uw.namespace, key)
|
uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
|
||||||
}
|
}
|
||||||
|
uw.gw.mu.Unlock()
|
||||||
case "BOOKMARK":
|
case "BOOKMARK":
|
||||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
|
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
|
||||||
bm, err := parseBookmark(we.Object)
|
bm, err := parseBookmark(we.Object)
|
||||||
|
|
|
@ -92,7 +92,7 @@ type EndpointPort struct {
|
||||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
|
||||||
func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string {
|
func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string {
|
||||||
var svc *Service
|
var svc *Service
|
||||||
if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
|
if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
|
||||||
svc = o.(*Service)
|
svc = o.(*Service)
|
||||||
}
|
}
|
||||||
podPortsSeen := make(map[*Pod][]int)
|
podPortsSeen := make(map[*Pod][]int)
|
||||||
|
@ -140,7 +140,7 @@ func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher,
|
||||||
for _, ea := range eas {
|
for _, ea := range eas {
|
||||||
var p *Pod
|
var p *Pod
|
||||||
if ea.TargetRef.Name != "" {
|
if ea.TargetRef.Name != "" {
|
||||||
if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
|
if o := gw.getObjectByRoleLocked("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
|
||||||
p = o.(*Pod)
|
p = o.(*Pod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,14 +39,14 @@ func parseEndpointSlice(data []byte) (object, error) {
|
||||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices
|
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices
|
||||||
func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string {
|
func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string {
|
||||||
var svc *Service
|
var svc *Service
|
||||||
if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
|
if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
|
||||||
svc = o.(*Service)
|
svc = o.(*Service)
|
||||||
}
|
}
|
||||||
podPortsSeen := make(map[*Pod][]int)
|
podPortsSeen := make(map[*Pod][]int)
|
||||||
var ms []map[string]string
|
var ms []map[string]string
|
||||||
for _, ess := range eps.Endpoints {
|
for _, ess := range eps.Endpoints {
|
||||||
var p *Pod
|
var p *Pod
|
||||||
if o := gw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil {
|
if o := gw.getObjectByRoleLocked("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil {
|
||||||
p = o.(*Pod)
|
p = o.(*Pod)
|
||||||
}
|
}
|
||||||
for _, epp := range eps.Ports {
|
for _, epp := range eps.Ports {
|
||||||
|
|
Loading…
Reference in a new issue