From d5eb6afe262dbbc392b50657b9f2d32dd3fc3cbe Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Sat, 21 May 2022 00:01:37 +0200 Subject: [PATCH] lib/promscrape/discovery/kubernetes: fixes kubernetes service discovery (#2615) * lib/promscrape/discovery/kubernetes: properly updates discovered scrape works previously, added or updated scrapeworks may override previuosly discovered. it happens because swosByKey may contain small subset of kubernetes objects with it's labels. It happens for objectsUpdated and objectsAdded maps, which include only changed elements * Properly calculate vm_promscrape_discovery_kubernetes_scrape_works Co-authored-by: f41gh7 Co-authored-by: Aliaksandr Valialkin --- docs/CHANGELOG.md | 5 +- .../discovery/kubernetes/api_watcher.go | 148 ++++++++++++------ 2 files changed, 100 insertions(+), 53 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b8bcf6243..2f8fb2236 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,14 +19,15 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `influx-prometheus-mode` command-line flag, which allows to restore the original time series written from Prometheus into InfluxDB during data migration from InfluxDB to VictoriaMetrics. See [this feature request](https://github.com/VictoriaMetrics/vmctl/issues/8). Thanks to @mback2k for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2545). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to specify AWS service name when issuing requests to AWS api. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2605). Thanks to @transacid for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2604). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix a bug, which could lead to incomplete discovery of scrape targets in Kubernetes (aka `kubernetes_sd_config`). the bug has been introduced in [v1.77.0](https://docs.victoriametrics.com/CHANGELOG.html#v1770). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `scalar` result type in response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2607). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): support strings in `humanize.*` template function in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2569). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): proxy `/rules` requests to vmalert from Grafana's alerting UI. This removes errors in Grafana's UI for Grafana versions older than `8.5.*`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2583) +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not add `/api/v1/query` suffix to `-datasource.url` if `-remoteRead.disablePathAppend` command-line flag is set. Previously this flag was applied only to `-remoteRead.url`, which could confuse users. +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent from possible resource leak on config update, which could lead to the slowdown of `vmalert` over time. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2577). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not return values from [label_value()](https://docs.victoriametrics.com/MetricsQL.html#label_value) function if the original time series has no values at the selected timestamps. * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): limit the number of concurrently established connections from vmselect to vmstorage. This should prevent from potentially high spikes in the number of established connections after temporary slowdown in connection handshake procedure between vmselect and vmstorage because of spikes in workload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix build for Solaris / SmartOS. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1322#issuecomment-1120276146). -* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not add `/api/v1/query` suffix to `-datasource.url` if `-remoteRead.disablePathAppend` command-line flag is set. Previously this flag was applied only to `-remoteRead.url`, which could confuse users. -* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent from possible resource leak on config update, which could lead to the slowdown of `vmalert` over time. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2577). ## [v1.77.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.1) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index f1dbc5002..6f76d5d0e 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -55,7 +55,7 @@ type apiWatcher struct { gw *groupWatcher - // swos contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher + // swosByURLWatcher contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher swosByURLWatcher map[*urlWatcher]map[string][]interface{} swosByURLWatcherLock sync.Mutex @@ -91,23 +91,51 @@ func (aw *apiWatcher) mustStart() { aw.gw.startWatchersForRole(aw.role, aw) } +func (aw *apiWatcher) updateSwosCount(multiplier int, swosByKey map[string][]interface{}) { + n := 0 + for _, swos := range swosByKey { + n += len(swos) + } + n *= multiplier + aw.swosCount.Add(n) +} + func (aw *apiWatcher) mustStop() { aw.gw.unsubscribeAPIWatcher(aw) aw.swosByURLWatcherLock.Lock() for _, swosByKey := range aw.swosByURLWatcher { - aw.swosCount.Add(-len(swosByKey)) + aw.updateSwosCount(-1, swosByKey) } aw.swosByURLWatcher = make(map[*urlWatcher]map[string][]interface{}) aw.swosByURLWatcherLock.Unlock() } -func (aw *apiWatcher) reloadScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) { +func (aw *apiWatcher) replaceScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) { aw.swosByURLWatcherLock.Lock() - aw.swosCount.Add(len(swosByKey) - len(aw.swosByURLWatcher[uw])) + aw.updateSwosCount(-1, aw.swosByURLWatcher[uw]) + aw.updateSwosCount(1, swosByKey) aw.swosByURLWatcher[uw] = swosByKey aw.swosByURLWatcherLock.Unlock() } +func (aw *apiWatcher) updateScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) { + aw.swosByURLWatcherLock.Lock() + dst := aw.swosByURLWatcher[uw] + if dst == nil { + dst = make(map[string][]interface{}) + aw.swosByURLWatcher[uw] = dst + } + for key, swos := range swosByKey { + aw.swosCount.Add(len(swos) - len(dst[key])) + if len(swos) == 0 { + delete(dst, key) + } else { + dst[key] = swos + } + } + aw.swosByURLWatcherLock.Unlock() +} + func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[string]string) { swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) aw.swosByURLWatcherLock.Lock() @@ -117,10 +145,10 @@ func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[st aw.swosByURLWatcher[uw] = swosByKey } aw.swosCount.Add(len(swos) - len(swosByKey[key])) - if len(swos) > 0 { - swosByKey[key] = swos - } else { + if len(swos) == 0 { delete(swosByKey, key) + } else { + swosByKey[key] = swos } aw.swosByURLWatcherLock.Unlock() } @@ -250,6 +278,45 @@ var ( }) ) +type swosByKeyWithLock struct { + mu sync.Mutex + swosByKey map[string][]interface{} +} + +func (gw *groupWatcher) getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) map[*apiWatcher]*swosByKeyWithLock { + if len(awsMap) == 0 { + return nil + } + swosByAPIWatcher := make(map[*apiWatcher]*swosByKeyWithLock, len(awsMap)) + for aw := range awsMap { + swosByAPIWatcher[aw] = &swosByKeyWithLock{ + swosByKey: make(map[string][]interface{}), + } + } + + // Generate ScrapeWork objects in parallel on available CPU cores. + // This should reduce the time needed for their generation on systems with many CPU cores. + var wg sync.WaitGroup + limiterCh := make(chan struct{}, cgroup.AvailableCPUs()) + for key, o := range objectsByKey { + labels := o.getTargetLabels(gw) + wg.Add(1) + limiterCh <- struct{}{} + go func(key string, labels []map[string]string) { + for aw, e := range swosByAPIWatcher { + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + e.mu.Lock() + e.swosByKey[key] = swos + e.mu.Unlock() + } + wg.Done() + <-limiterCh + }(key, labels) + } + wg.Wait() + return swosByAPIWatcher +} + func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object { if role == "node" { // Node objects have no namespace @@ -310,9 +377,9 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { time.Sleep(sleepTime) startTime := time.Now() gw.mu.Lock() - if uw.needUpdateScrapeWorks { - uw.needUpdateScrapeWorks = false - uw.updateScrapeWorksLocked(uw.objectsByKey, uw.aws) + if uw.needRecreateScrapeWorks { + uw.needRecreateScrapeWorks = false + uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws) sleepTime = time.Since(startTime) if sleepTime < minSleepTime { sleepTime = minSleepTime @@ -401,7 +468,7 @@ type urlWatcher struct { // objectsByKey contains the latest state for objects obtained from apiURL objectsByKey map[string]object - needUpdateScrapeWorks bool + needRecreateScrapeWorks bool resourceVersion string @@ -450,7 +517,7 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() { if len(uw.awsPending) == 0 { return } - uw.updateScrapeWorksLocked(uw.objectsByKey, uw.awsPending) + uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.awsPending) for aw := range uw.awsPending { uw.aws[aw] = struct{}{} } @@ -460,44 +527,23 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() { metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-awsPendingLen) } -func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) { - if len(objectsByKey) == 0 || len(awsMap) == 0 { - return - } - aws := make([]*apiWatcher, 0, len(awsMap)) - for aw := range awsMap { - aws = append(aws, aw) - } - swosByKey := make([]map[string][]interface{}, len(aws)) - for i := range aws { - swosByKey[i] = make(map[string][]interface{}) - } - - // Generate ScrapeWork objects in parallel on available CPU cores. - // This should reduce the time needed for their generation on systems with many CPU cores. - var swosByKeyLock sync.Mutex - var wg sync.WaitGroup - limiterCh := make(chan struct{}, cgroup.AvailableCPUs()) - for key, o := range objectsByKey { - labels := o.getTargetLabels(uw.gw) - wg.Add(1) - limiterCh <- struct{}{} - go func(key string, labels []map[string]string) { - for i, aw := range aws { - swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) - if len(swos) > 0 { - swosByKeyLock.Lock() - swosByKey[i][key] = swos - swosByKeyLock.Unlock() - } +func (uw *urlWatcher) recreateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) { + es := uw.gw.getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey, awsMap) + for aw, e := range es { + swosByKey := e.swosByKey + for key, swos := range swosByKey { + if len(swos) == 0 { + delete(swosByKey, key) } - wg.Done() - <-limiterCh - }(key, labels) + } + aw.replaceScrapeWorks(uw, swosByKey) } - wg.Wait() - for i, aw := range aws { - aw.reloadScrapeWorks(uw, swosByKey[i]) +} + +func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) { + es := uw.gw.getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey, awsMap) + for aw, e := range es { + aw.updateScrapeWorks(uw, e.swosByKey) } } @@ -574,7 +620,7 @@ func (uw *urlWatcher) reloadObjects() string { uw.removeScrapeWorksLocked(objectsRemoved) uw.updateScrapeWorksLocked(objectsUpdated, uw.aws) uw.updateScrapeWorksLocked(objectsAdded, uw.aws) - uw.needUpdateScrapeWorks = false + uw.needRecreateScrapeWorks = false if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 { uw.maybeUpdateDependedScrapeWorksLocked() } @@ -756,12 +802,12 @@ func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() { } if (role == "pod" || role == "service") && (uwx.role == "endpoints" || uwx.role == "endpointslice") { // endpoints and endpointslice objects depend on pods and service objects - uwx.needUpdateScrapeWorks = true + uwx.needRecreateScrapeWorks = true continue } if attachNodeMetadata && role == "node" && uwx.role == "pod" { // pod objects depend on node objects if attachNodeMetadata is set - uwx.needUpdateScrapeWorks = true + uwx.needRecreateScrapeWorks = true continue } }