diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2b2efeb0a..fd73b30bc 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,8 @@ - `histogram_stddev(buckets)` - returns standard deviation for the given buckets. * FEATURE: vmagent: add ability to replicate scrape targets among `vmagent` instances in the cluster with `-promscrape.cluster.replicationFactor` command-line flag. See [these docs](https://victoriametrics.github.io/vmagent.html#scraping-big-number-of-targets). +* BUGFIX: vmagent: reduce memory usage when Kubernetes service discovery is used in big number of distinct jobs by sharing the cache. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113 + # [v1.55.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.55.1) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 4ffc621f7..420af9ff8 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -124,9 +124,7 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { if uw.role != role { continue } - uw.mu.Lock() - o = uw.objectsByKey[key] - uw.mu.Unlock() + o = uw.objectsByKey.get(key) if o != nil { break } @@ -164,10 +162,7 @@ func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseO logger.Infof("started watcher for %q", apiURL) uw.watchForUpdates(resourceVersion) logger.Infof("stopped watcher for %q", apiURL) - uw.mu.Lock() - uw.objectsCount.Add(-len(uw.objectsByKey)) - uw.objectsRemoved.Add(len(uw.objectsByKey)) - uw.mu.Unlock() + uw.objectsByKey.decRef() aw.mu.Lock() delete(aw.watchersByURL, apiURL) @@ -207,12 +202,12 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc - // mu protects objectsByKey and swosByKey - mu sync.Mutex - // objectsByKey contains the latest state for objects obtained from apiURL - objectsByKey map[string]object - swosByKey map[string][]interface{} + objectsByKey *objectsMap + + // mu protects swosByKey + mu sync.Mutex + swosByKey map[string][]interface{} // the parent apiWatcher aw *apiWatcher @@ -220,10 +215,6 @@ type urlWatcher struct { watchersCount *metrics.Counter watchersCreated *metrics.Counter watchersStopped *metrics.Counter - - objectsCount *metrics.Counter - objectsAdded *metrics.Counter - objectsRemoved *metrics.Counter } func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher { @@ -234,7 +225,7 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject parseObject: parseObject, parseObjectList: parseObjectList, - objectsByKey: make(map[string]object), + objectsByKey: sharedObjectsGlobal.getByAPIURL(role, apiURL), swosByKey: make(map[string][]interface{}), aw: aw, @@ -242,10 +233,6 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)), watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)), watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)), - - objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), - objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), - objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), } } @@ -273,6 +260,7 @@ func (uw *urlWatcher) reloadObjects() string { } return "" } + uw.objectsByKey.reload(objectsByKey) swosByKey := make(map[string][]interface{}, len(objectsByKey)) for k, o := range objectsByKey { labels := o.getTargetLabels(aw) @@ -282,10 +270,6 @@ func (uw *urlWatcher) reloadObjects() string { } } uw.mu.Lock() - uw.objectsAdded.Add(len(objectsByKey)) - uw.objectsRemoved.Add(len(uw.objectsByKey)) - uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey)) - uw.objectsByKey = objectsByKey uw.swosByKey = swosByKey uw.mu.Unlock() return metadata.ResourceVersion @@ -388,13 +372,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { key := o.key() switch we.Type { case "ADDED", "MODIFIED": - uw.mu.Lock() - if uw.objectsByKey[key] == nil { - uw.objectsAdded.Inc() - uw.objectsCount.Inc() - } - uw.objectsByKey[key] = o - uw.mu.Unlock() + uw.objectsByKey.update(key, o) labels := o.getTargetLabels(aw) swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) uw.mu.Lock() @@ -405,12 +383,8 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { } uw.mu.Unlock() case "DELETED": + uw.objectsByKey.remove(key) uw.mu.Lock() - if uw.objectsByKey[key] != nil { - uw.objectsRemoved.Inc() - uw.objectsCount.Dec() - } - delete(uw.objectsByKey, key) delete(uw.swosByKey, key) uw.mu.Unlock() default: @@ -513,3 +487,105 @@ func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) return nil, nil } } + +type objectsMap struct { + mu sync.Mutex + refCount int + m map[string]object + + objectsAdded *metrics.Counter + objectsRemoved *metrics.Counter + objectsCount *metrics.Counter +} + +func (om *objectsMap) incRef() { + om.mu.Lock() + om.refCount++ + om.mu.Unlock() +} + +func (om *objectsMap) decRef() { + om.mu.Lock() + om.refCount-- + if om.refCount < 0 { + logger.Panicf("BUG: refCount cannot be smaller than 0; got %d", om.refCount) + } + if om.refCount == 0 { + // Free up memory occupied by om.m + om.objectsRemoved.Add(len(om.m)) + om.objectsCount.Add(-len(om.m)) + om.m = make(map[string]object) + } + om.mu.Unlock() +} + +func (om *objectsMap) reload(m map[string]object) { + om.mu.Lock() + om.objectsAdded.Add(len(m)) + om.objectsRemoved.Add(len(om.m)) + om.objectsCount.Add(len(m) - len(om.m)) + for k := range om.m { + delete(om.m, k) + } + for k, o := range m { + om.m[k] = o + } + om.mu.Unlock() +} + +func (om *objectsMap) update(key string, o object) { + om.mu.Lock() + if om.m[key] == nil { + om.objectsAdded.Inc() + om.objectsCount.Inc() + } + om.m[key] = o + om.mu.Unlock() +} + +func (om *objectsMap) remove(key string) { + om.mu.Lock() + if om.m[key] != nil { + om.objectsRemoved.Inc() + om.objectsCount.Dec() + delete(om.m, key) + } + om.mu.Unlock() +} + +func (om *objectsMap) get(key string) object { + om.mu.Lock() + o, ok := om.m[key] + om.mu.Unlock() + if !ok { + return nil + } + return o +} + +type sharedObjects struct { + mu sync.Mutex + oms map[string]*objectsMap +} + +func (so *sharedObjects) getByAPIURL(role, apiURL string) *objectsMap { + so.mu.Lock() + om := so.oms[apiURL] + if om == nil { + om = &objectsMap{ + m: make(map[string]object), + + objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), + objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), + objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), + } + so.oms[apiURL] = om + } + so.mu.Unlock() + om.incRef() + return om +} + +var sharedObjectsGlobal = &sharedObjects{ + oms: make(map[string]*objectsMap), +}