lib/promscrape/discovery/kubernetes: add vm_promscrape_discovery_kubernetes_* metrics for monitoring internal state of k8s service discovery

This commit is contained in:
Aliaksandr Valialkin 2021-02-28 16:57:40 +02:00
parent f52bdbe2a3
commit 0ef7a94056

View file

@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/metrics"
) )
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 10*time.Minute, "How frequently to reload the full state from Kuberntes API server") var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 10*time.Minute, "How frequently to reload the full state from Kuberntes API server")
@ -226,12 +227,16 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseO
aw.watchersByURL[apiURL] = uw aw.watchersByURL[apiURL] = uw
aw.mu.Unlock() aw.mu.Unlock()
uw.watchersCount.Inc()
uw.watchersCreated.Inc()
resourceVersion := uw.reloadObjects() resourceVersion := uw.reloadObjects()
go func() { go func() {
uw.watchForUpdates(resourceVersion) uw.watchForUpdates(resourceVersion)
aw.mu.Lock() aw.mu.Lock()
delete(aw.watchersByURL, apiURL) delete(aw.watchersByURL, apiURL)
aw.mu.Unlock() aw.mu.Unlock()
uw.watchersCount.Dec()
uw.watchersStopped.Inc()
}() }()
} }
@ -271,6 +276,14 @@ type urlWatcher struct {
// the parent apiWatcher // the parent apiWatcher
aw *apiWatcher aw *apiWatcher
watchersCount *metrics.Counter
watchersCreated *metrics.Counter
watchersStopped *metrics.Counter
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
} }
func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher { func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher {
@ -285,6 +298,14 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject
labelsByKey: make(map[string][]map[string]string), labelsByKey: make(map[string][]map[string]string),
aw: aw, aw: aw,
watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)),
watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)),
watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
} }
} }
@ -312,6 +333,9 @@ func (uw *urlWatcher) reloadObjects() string {
labelsByKey[k] = o.getTargetLabels(uw.aw) labelsByKey[k] = o.getTargetLabels(uw.aw)
} }
uw.mu.Lock() uw.mu.Lock()
uw.objectsRemoved.Add(-len(uw.objectsByKey))
uw.objectsAdded.Add(len(objectsByKey))
uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey))
uw.objectsByKey = objectsByKey uw.objectsByKey = objectsByKey
uw.labelsByKey = labelsByKey uw.labelsByKey = labelsByKey
uw.mu.Unlock() uw.mu.Unlock()
@ -399,6 +423,10 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
switch we.Type { switch we.Type {
case "ADDED", "MODIFIED": case "ADDED", "MODIFIED":
uw.mu.Lock() uw.mu.Lock()
if uw.objectsByKey[key] == nil {
uw.objectsAdded.Inc()
uw.objectsCount.Inc()
}
uw.objectsByKey[key] = o uw.objectsByKey[key] = o
uw.mu.Unlock() uw.mu.Unlock()
labels := o.getTargetLabels(uw.aw) labels := o.getTargetLabels(uw.aw)
@ -407,6 +435,10 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.mu.Unlock() uw.mu.Unlock()
case "DELETED": case "DELETED":
uw.mu.Lock() uw.mu.Lock()
if uw.objectsByKey[key] != nil {
uw.objectsRemoved.Inc()
uw.objectsCount.Dec()
}
delete(uw.objectsByKey, key) delete(uw.objectsByKey, key)
delete(uw.labelsByKey, key) delete(uw.labelsByKey, key)
uw.mu.Unlock() uw.mu.Unlock()