mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/promscrape/discovery/kubernetes: properly track objects with the same names in multiple namespaces
This is a follow-up for 12e4785fe8
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170
This commit is contained in:
parent
12e4785fe8
commit
5b08e6fb16
8 changed files with 95 additions and 89 deletions
|
@ -32,7 +32,7 @@ type WatchEvent struct {
|
|||
|
||||
// object is any Kubernetes object.
|
||||
type object interface {
|
||||
name() string
|
||||
key() string
|
||||
getTargetLabels(gw *groupWatcher) []map[string]string
|
||||
}
|
||||
|
||||
|
@ -79,37 +79,37 @@ func (aw *apiWatcher) mustStop() {
|
|||
aw.swosByNamespaceLock.Unlock()
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByName map[string][]interface{}) {
|
||||
func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][]interface{}) {
|
||||
aw.swosByNamespaceLock.Lock()
|
||||
aw.swosCount.Add(len(swosByName) - len(aw.swosByNamespace[namespace]))
|
||||
aw.swosByNamespace[namespace] = swosByName
|
||||
aw.swosCount.Add(len(swosByKey) - len(aw.swosByNamespace[namespace]))
|
||||
aw.swosByNamespace[namespace] = swosByKey
|
||||
aw.swosByNamespaceLock.Unlock()
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) setScrapeWorks(namespace, name string, labels []map[string]string) {
|
||||
func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) {
|
||||
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
|
||||
aw.swosByNamespaceLock.Lock()
|
||||
swosByName := aw.swosByNamespace[namespace]
|
||||
if swosByName == nil {
|
||||
swosByName = make(map[string][]interface{})
|
||||
aw.swosByNamespace[namespace] = swosByName
|
||||
swosByKey := aw.swosByNamespace[namespace]
|
||||
if swosByKey == nil {
|
||||
swosByKey = make(map[string][]interface{})
|
||||
aw.swosByNamespace[namespace] = swosByKey
|
||||
}
|
||||
if len(swos) > 0 {
|
||||
aw.swosCount.Add(len(swos) - len(swosByName[name]))
|
||||
swosByName[name] = swos
|
||||
aw.swosCount.Add(len(swos) - len(swosByKey[key]))
|
||||
swosByKey[key] = swos
|
||||
} else {
|
||||
aw.swosCount.Add(-len(swosByName[name]))
|
||||
delete(swosByName, name)
|
||||
aw.swosCount.Add(-len(swosByKey[key]))
|
||||
delete(swosByKey, key)
|
||||
}
|
||||
aw.swosByNamespaceLock.Unlock()
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) removeScrapeWorks(namespace, name string) {
|
||||
func (aw *apiWatcher) removeScrapeWorks(namespace, key string) {
|
||||
aw.swosByNamespaceLock.Lock()
|
||||
swosByName := aw.swosByNamespace[namespace]
|
||||
if len(swosByName) > 0 {
|
||||
aw.swosCount.Add(-len(swosByName[name]))
|
||||
delete(swosByName, name)
|
||||
swosByKey := aw.swosByNamespace[namespace]
|
||||
if len(swosByKey) > 0 {
|
||||
aw.swosCount.Add(-len(swosByKey[key]))
|
||||
delete(swosByKey, key)
|
||||
}
|
||||
aw.swosByNamespaceLock.Unlock()
|
||||
}
|
||||
|
@ -133,14 +133,14 @@ func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
|
|||
defer aw.swosByNamespaceLock.Unlock()
|
||||
|
||||
size := 0
|
||||
for _, swosByName := range aw.swosByNamespace {
|
||||
for _, swosLocal := range swosByName {
|
||||
for _, swosByKey := range aw.swosByNamespace {
|
||||
for _, swosLocal := range swosByKey {
|
||||
size += len(swosLocal)
|
||||
}
|
||||
}
|
||||
swos := make([]interface{}, 0, size)
|
||||
for _, swosByName := range aw.swosByNamespace {
|
||||
for _, swosLocal := range swosByName {
|
||||
for _, swosByKey := range aw.swosByNamespace {
|
||||
for _, swosLocal := range swosByKey {
|
||||
swos = append(swos, swosLocal...)
|
||||
}
|
||||
}
|
||||
|
@ -223,6 +223,7 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
|
|||
// this is needed for testing
|
||||
return nil
|
||||
}
|
||||
key := namespace + "/" + name
|
||||
gw.startWatchersForRole(role, nil)
|
||||
gw.mu.Lock()
|
||||
defer gw.mu.Unlock()
|
||||
|
@ -237,7 +238,7 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
|
|||
continue
|
||||
}
|
||||
uw.mu.Lock()
|
||||
o := uw.objectsByName[name]
|
||||
o := uw.objectsByKey[key]
|
||||
uw.mu.Unlock()
|
||||
if o != nil {
|
||||
return o
|
||||
|
@ -261,25 +262,25 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
|
|||
}
|
||||
}
|
||||
|
||||
func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(namespace string, aws []*apiWatcher, objectsByName map[string]object) {
|
||||
func (gw *groupWatcher) reloadScrapeWorksForAPIWatchers(namespace string, aws []*apiWatcher, objectsByKey map[string]object) {
|
||||
if len(aws) == 0 {
|
||||
return
|
||||
}
|
||||
swosByName := make([]map[string][]interface{}, len(aws))
|
||||
swosByKey := make([]map[string][]interface{}, len(aws))
|
||||
for i := range aws {
|
||||
swosByName[i] = make(map[string][]interface{})
|
||||
swosByKey[i] = make(map[string][]interface{})
|
||||
}
|
||||
for name, o := range objectsByName {
|
||||
for key, o := range objectsByKey {
|
||||
labels := o.getTargetLabels(gw)
|
||||
for i, aw := range aws {
|
||||
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
|
||||
if len(swos) > 0 {
|
||||
swosByName[i][name] = swos
|
||||
swosByKey[i][key] = swos
|
||||
}
|
||||
}
|
||||
}
|
||||
for i, aw := range aws {
|
||||
aw.reloadScrapeWorks(namespace, swosByName[i])
|
||||
aw.reloadScrapeWorks(namespace, swosByKey[i])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -303,7 +304,7 @@ func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
|
|||
gw.mu.Unlock()
|
||||
}
|
||||
|
||||
// urlWatcher watches for an apiURL and updates object states in objectsByName.
|
||||
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
|
||||
type urlWatcher struct {
|
||||
role string
|
||||
namespace string
|
||||
|
@ -313,7 +314,7 @@ type urlWatcher struct {
|
|||
parseObject parseObjectFunc
|
||||
parseObjectList parseObjectListFunc
|
||||
|
||||
// mu protects aws, awsPending, objectsByName and resourceVersion
|
||||
// mu protects aws, awsPending, objectsByKey and resourceVersion
|
||||
mu sync.Mutex
|
||||
|
||||
// aws contains registered apiWatcher objects
|
||||
|
@ -322,8 +323,8 @@ type urlWatcher struct {
|
|||
// awsPending contains pending apiWatcher objects, which must be moved to aws in a batch
|
||||
awsPending map[*apiWatcher]struct{}
|
||||
|
||||
// objectsByName contains the latest state for objects obtained from apiURL
|
||||
objectsByName map[string]object
|
||||
// objectsByKey contains the latest state for objects obtained from apiURL
|
||||
objectsByKey map[string]object
|
||||
|
||||
resourceVersion string
|
||||
|
||||
|
@ -346,9 +347,9 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
|
|||
parseObject: parseObject,
|
||||
parseObjectList: parseObjectList,
|
||||
|
||||
aws: make(map[*apiWatcher]struct{}),
|
||||
awsPending: make(map[*apiWatcher]struct{}),
|
||||
objectsByName: make(map[string]object),
|
||||
aws: make(map[*apiWatcher]struct{}),
|
||||
awsPending: make(map[*apiWatcher]struct{}),
|
||||
objectsByKey: make(map[string]object),
|
||||
|
||||
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
|
||||
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
|
||||
|
@ -392,7 +393,7 @@ func (uw *urlWatcher) processPendingSubscribers() {
|
|||
t := time.NewTicker(time.Second)
|
||||
for range t.C {
|
||||
var awsPending []*apiWatcher
|
||||
var objectsByName map[string]object
|
||||
var objectsByKey map[string]object
|
||||
|
||||
uw.mu.Lock()
|
||||
if len(uw.awsPending) > 0 {
|
||||
|
@ -404,16 +405,16 @@ func (uw *urlWatcher) processPendingSubscribers() {
|
|||
uw.aws[aw] = struct{}{}
|
||||
delete(uw.awsPending, aw)
|
||||
}
|
||||
objectsByName = make(map[string]object, len(uw.objectsByName))
|
||||
for name, o := range uw.objectsByName {
|
||||
objectsByName[name] = o
|
||||
objectsByKey = make(map[string]object, len(uw.objectsByKey))
|
||||
for key, o := range uw.objectsByKey {
|
||||
objectsByKey[key] = o
|
||||
}
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="pending"}`, uw.role)).Add(-len(awsPending))
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscibers{role=%q,type="permanent"}`, uw.role)).Add(len(awsPending))
|
||||
uw.mu.Unlock()
|
||||
|
||||
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, awsPending, objectsByName)
|
||||
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, awsPending, objectsByKey)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -445,7 +446,7 @@ func (uw *urlWatcher) reloadObjects() string {
|
|||
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
|
||||
return ""
|
||||
}
|
||||
objectsByName, metadata, err := uw.parseObjectList(resp.Body)
|
||||
objectsByKey, metadata, err := uw.parseObjectList(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse objects from %q: %s", requestURL, err)
|
||||
|
@ -454,18 +455,18 @@ func (uw *urlWatcher) reloadObjects() string {
|
|||
|
||||
uw.mu.Lock()
|
||||
var updated, removed, added int
|
||||
for name := range uw.objectsByName {
|
||||
if o, ok := objectsByName[name]; ok {
|
||||
uw.objectsByName[name] = o
|
||||
for key := range uw.objectsByKey {
|
||||
if o, ok := objectsByKey[key]; ok {
|
||||
uw.objectsByKey[key] = o
|
||||
updated++
|
||||
} else {
|
||||
delete(uw.objectsByName, name)
|
||||
delete(uw.objectsByKey, key)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
for name, o := range objectsByName {
|
||||
if _, ok := uw.objectsByName[name]; !ok {
|
||||
uw.objectsByName[name] = o
|
||||
for key, o := range objectsByKey {
|
||||
if _, ok := uw.objectsByKey[key]; !ok {
|
||||
uw.objectsByKey[key] = o
|
||||
added++
|
||||
}
|
||||
}
|
||||
|
@ -477,8 +478,8 @@ func (uw *urlWatcher) reloadObjects() string {
|
|||
aws := getAPIWatchers(uw.aws)
|
||||
uw.mu.Unlock()
|
||||
|
||||
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, aws, objectsByName)
|
||||
logger.Infof("reloaded %d objects from %q", len(objectsByName), requestURL)
|
||||
uw.gw.reloadScrapeWorksForAPIWatchers(uw.namespace, aws, objectsByKey)
|
||||
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
|
||||
return metadata.ResourceVersion
|
||||
}
|
||||
|
||||
|
@ -564,37 +565,37 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
name := o.name()
|
||||
key := o.key()
|
||||
uw.mu.Lock()
|
||||
if _, ok := uw.objectsByName[name]; !ok {
|
||||
if _, ok := uw.objectsByKey[key]; !ok {
|
||||
uw.objectsCount.Inc()
|
||||
uw.objectsAdded.Inc()
|
||||
} else {
|
||||
uw.objectsUpdated.Inc()
|
||||
}
|
||||
uw.objectsByName[name] = o
|
||||
uw.objectsByKey[key] = o
|
||||
aws := getAPIWatchers(uw.aws)
|
||||
uw.mu.Unlock()
|
||||
labels := o.getTargetLabels(uw.gw)
|
||||
for _, aw := range aws {
|
||||
aw.setScrapeWorks(uw.namespace, name, labels)
|
||||
aw.setScrapeWorks(uw.namespace, key, labels)
|
||||
}
|
||||
case "DELETED":
|
||||
o, err := uw.parseObject(we.Object)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
name := o.name()
|
||||
key := o.key()
|
||||
uw.mu.Lock()
|
||||
if _, ok := uw.objectsByName[name]; ok {
|
||||
if _, ok := uw.objectsByKey[key]; ok {
|
||||
uw.objectsCount.Dec()
|
||||
uw.objectsRemoved.Inc()
|
||||
delete(uw.objectsByName, name)
|
||||
delete(uw.objectsByKey, key)
|
||||
}
|
||||
aws := getAPIWatchers(uw.aws)
|
||||
uw.mu.Unlock()
|
||||
for _, aw := range aws {
|
||||
aw.removeScrapeWorks(uw.namespace, name)
|
||||
aw.removeScrapeWorks(uw.namespace, key)
|
||||
}
|
||||
case "BOOKMARK":
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
|
||||
|
|
|
@ -16,6 +16,10 @@ type ObjectMeta struct {
|
|||
OwnerReferences []OwnerReference
|
||||
}
|
||||
|
||||
func (om *ObjectMeta) key() string {
|
||||
return om.Namespace + "/" + om.Name
|
||||
}
|
||||
|
||||
// ListMeta is a Kubernetes list metadata
|
||||
// https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#listmeta-v1-meta
|
||||
type ListMeta struct {
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
func (eps *Endpoints) name() string {
|
||||
return eps.Metadata.Name
|
||||
func (eps *Endpoints) key() string {
|
||||
return eps.Metadata.key()
|
||||
}
|
||||
|
||||
func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) {
|
||||
|
@ -18,11 +18,11 @@ func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) {
|
|||
if err := d.Decode(&epsl); err != nil {
|
||||
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err)
|
||||
}
|
||||
objectsByName := make(map[string]object)
|
||||
objectsByKey := make(map[string]object)
|
||||
for _, eps := range epsl.Items {
|
||||
objectsByName[eps.name()] = eps
|
||||
objectsByKey[eps.key()] = eps
|
||||
}
|
||||
return objectsByName, epsl.Metadata, nil
|
||||
return objectsByKey, epsl.Metadata, nil
|
||||
}
|
||||
|
||||
func parseEndpoints(data []byte) (object, error) {
|
||||
|
|
|
@ -9,8 +9,8 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
func (eps *EndpointSlice) name() string {
|
||||
return eps.Metadata.Name
|
||||
func (eps *EndpointSlice) key() string {
|
||||
return eps.Metadata.key()
|
||||
}
|
||||
|
||||
func parseEndpointSliceList(r io.Reader) (map[string]object, ListMeta, error) {
|
||||
|
@ -19,11 +19,11 @@ func parseEndpointSliceList(r io.Reader) (map[string]object, ListMeta, error) {
|
|||
if err := d.Decode(&epsl); err != nil {
|
||||
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList: %w", err)
|
||||
}
|
||||
objectsByName := make(map[string]object)
|
||||
objectsByKey := make(map[string]object)
|
||||
for _, eps := range epsl.Items {
|
||||
objectsByName[eps.name()] = eps
|
||||
objectsByKey[eps.key()] = eps
|
||||
}
|
||||
return objectsByName, epsl.Metadata, nil
|
||||
return objectsByKey, epsl.Metadata, nil
|
||||
}
|
||||
|
||||
func parseEndpointSlice(data []byte) (object, error) {
|
||||
|
|
|
@ -6,8 +6,8 @@ import (
|
|||
"io"
|
||||
)
|
||||
|
||||
func (ig *Ingress) name() string {
|
||||
return ig.Metadata.Name
|
||||
func (ig *Ingress) key() string {
|
||||
return ig.Metadata.key()
|
||||
}
|
||||
|
||||
func parseIngressList(r io.Reader) (map[string]object, ListMeta, error) {
|
||||
|
@ -16,11 +16,11 @@ func parseIngressList(r io.Reader) (map[string]object, ListMeta, error) {
|
|||
if err := d.Decode(&igl); err != nil {
|
||||
return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList: %w", err)
|
||||
}
|
||||
objectsByName := make(map[string]object)
|
||||
objectsByKey := make(map[string]object)
|
||||
for _, ig := range igl.Items {
|
||||
objectsByName[ig.name()] = ig
|
||||
objectsByKey[ig.key()] = ig
|
||||
}
|
||||
return objectsByName, igl.Metadata, nil
|
||||
return objectsByKey, igl.Metadata, nil
|
||||
}
|
||||
|
||||
func parseIngress(data []byte) (object, error) {
|
||||
|
|
|
@ -8,8 +8,9 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
func (n *Node) name() string {
|
||||
return n.Metadata.Name
|
||||
// getNodesLabels returns labels for k8s nodes obtained from the given cfg
|
||||
func (n *Node) key() string {
|
||||
return n.Metadata.key()
|
||||
}
|
||||
|
||||
func parseNodeList(r io.Reader) (map[string]object, ListMeta, error) {
|
||||
|
@ -18,11 +19,11 @@ func parseNodeList(r io.Reader) (map[string]object, ListMeta, error) {
|
|||
if err := d.Decode(&nl); err != nil {
|
||||
return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList: %w", err)
|
||||
}
|
||||
objectsByName := make(map[string]object)
|
||||
objectsByKey := make(map[string]object)
|
||||
for _, n := range nl.Items {
|
||||
objectsByName[n.name()] = n
|
||||
objectsByKey[n.key()] = n
|
||||
}
|
||||
return objectsByName, nl.Metadata, nil
|
||||
return objectsByKey, nl.Metadata, nil
|
||||
}
|
||||
|
||||
func parseNode(data []byte) (object, error) {
|
||||
|
|
|
@ -10,8 +10,8 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
func (p *Pod) name() string {
|
||||
return p.Metadata.Name
|
||||
func (p *Pod) key() string {
|
||||
return p.Metadata.key()
|
||||
}
|
||||
|
||||
func parsePodList(r io.Reader) (map[string]object, ListMeta, error) {
|
||||
|
@ -20,11 +20,11 @@ func parsePodList(r io.Reader) (map[string]object, ListMeta, error) {
|
|||
if err := d.Decode(&pl); err != nil {
|
||||
return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList: %w", err)
|
||||
}
|
||||
objectsByName := make(map[string]object)
|
||||
objectsByKey := make(map[string]object)
|
||||
for _, p := range pl.Items {
|
||||
objectsByName[p.name()] = p
|
||||
objectsByKey[p.key()] = p
|
||||
}
|
||||
return objectsByName, pl.Metadata, nil
|
||||
return objectsByKey, pl.Metadata, nil
|
||||
}
|
||||
|
||||
func parsePod(data []byte) (object, error) {
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
func (s *Service) name() string {
|
||||
return s.Metadata.Name
|
||||
func (s *Service) key() string {
|
||||
return s.Metadata.key()
|
||||
}
|
||||
|
||||
func parseServiceList(r io.Reader) (map[string]object, ListMeta, error) {
|
||||
|
@ -18,11 +18,11 @@ func parseServiceList(r io.Reader) (map[string]object, ListMeta, error) {
|
|||
if err := d.Decode(&sl); err != nil {
|
||||
return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList: %w", err)
|
||||
}
|
||||
objectsByName := make(map[string]object)
|
||||
objectsByKey := make(map[string]object)
|
||||
for _, s := range sl.Items {
|
||||
objectsByName[s.name()] = s
|
||||
objectsByKey[s.key()] = s
|
||||
}
|
||||
return objectsByName, sl.Metadata, nil
|
||||
return objectsByKey, sl.Metadata, nil
|
||||
}
|
||||
|
||||
func parseService(data []byte) (object, error) {
|
||||
|
|
Loading…
Reference in a new issue