lib/promscrape/discovery/kubernetes: fixes kubernetes service discovery (#2615)

* lib/promscrape/discovery/kubernetes: properly updates discovered scrape works
previously, added or updated scrapeworks may override previuosly
discovered.
it happens because swosByKey may contain small subset of kubernetes
objects with it's labels.
It happens for objectsUpdated and objectsAdded maps, which include only changed elements

* Properly calculate vm_promscrape_discovery_kubernetes_scrape_works

Co-authored-by: f41gh7 <nik@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-05-21 00:01:37 +02:00 committed by Aliaksandr Valialkin
parent 88c4c6f465
commit 7406665fc3
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 100 additions and 53 deletions

View file

@ -19,14 +19,15 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `influx-prometheus-mode` command-line flag, which allows to restore the original time series written from Prometheus into InfluxDB during data migration from InfluxDB to VictoriaMetrics. See [this feature request](https://github.com/VictoriaMetrics/vmctl/issues/8). Thanks to @mback2k for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2545).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to specify AWS service name when issuing requests to AWS api. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2605). Thanks to @transacid for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2604).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix a bug, which could lead to incomplete discovery of scrape targets in Kubernetes (aka `kubernetes_sd_config`). the bug has been introduced in [v1.77.0](https://docs.victoriametrics.com/CHANGELOG.html#v1770).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `scalar` result type in response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2607).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): support strings in `humanize.*` template function in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2569).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): proxy `/rules` requests to vmalert from Grafana's alerting UI. This removes errors in Grafana's UI for Grafana versions older than `8.5.*`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2583)
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not add `/api/v1/query` suffix to `-datasource.url` if `-remoteRead.disablePathAppend` command-line flag is set. Previously this flag was applied only to `-remoteRead.url`, which could confuse users.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent from possible resource leak on config update, which could lead to the slowdown of `vmalert` over time. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2577).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not return values from [label_value()](https://docs.victoriametrics.com/MetricsQL.html#label_value) function if the original time series has no values at the selected timestamps.
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): limit the number of concurrently established connections from vmselect to vmstorage. This should prevent from potentially high spikes in the number of established connections after temporary slowdown in connection handshake procedure between vmselect and vmstorage because of spikes in workload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix build for Solaris / SmartOS. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1322#issuecomment-1120276146).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not add `/api/v1/query` suffix to `-datasource.url` if `-remoteRead.disablePathAppend` command-line flag is set. Previously this flag was applied only to `-remoteRead.url`, which could confuse users.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent from possible resource leak on config update, which could lead to the slowdown of `vmalert` over time. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2577).
## [v1.77.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.1)

View file

@ -55,7 +55,7 @@ type apiWatcher struct {
gw *groupWatcher
// swos contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher
// swosByURLWatcher contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher
swosByURLWatcher map[*urlWatcher]map[string][]interface{}
swosByURLWatcherLock sync.Mutex
@ -91,23 +91,51 @@ func (aw *apiWatcher) mustStart() {
aw.gw.startWatchersForRole(aw.role, aw)
}
func (aw *apiWatcher) updateSwosCount(multiplier int, swosByKey map[string][]interface{}) {
n := 0
for _, swos := range swosByKey {
n += len(swos)
}
n *= multiplier
aw.swosCount.Add(n)
}
func (aw *apiWatcher) mustStop() {
aw.gw.unsubscribeAPIWatcher(aw)
aw.swosByURLWatcherLock.Lock()
for _, swosByKey := range aw.swosByURLWatcher {
aw.swosCount.Add(-len(swosByKey))
aw.updateSwosCount(-1, swosByKey)
}
aw.swosByURLWatcher = make(map[*urlWatcher]map[string][]interface{})
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) reloadScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) {
func (aw *apiWatcher) replaceScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) {
aw.swosByURLWatcherLock.Lock()
aw.swosCount.Add(len(swosByKey) - len(aw.swosByURLWatcher[uw]))
aw.updateSwosCount(-1, aw.swosByURLWatcher[uw])
aw.updateSwosCount(1, swosByKey)
aw.swosByURLWatcher[uw] = swosByKey
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) updateScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) {
aw.swosByURLWatcherLock.Lock()
dst := aw.swosByURLWatcher[uw]
if dst == nil {
dst = make(map[string][]interface{})
aw.swosByURLWatcher[uw] = dst
}
for key, swos := range swosByKey {
aw.swosCount.Add(len(swos) - len(dst[key]))
if len(swos) == 0 {
delete(dst, key)
} else {
dst[key] = swos
}
}
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[string]string) {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
aw.swosByURLWatcherLock.Lock()
@ -117,10 +145,10 @@ func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[st
aw.swosByURLWatcher[uw] = swosByKey
}
aw.swosCount.Add(len(swos) - len(swosByKey[key]))
if len(swos) > 0 {
swosByKey[key] = swos
} else {
if len(swos) == 0 {
delete(swosByKey, key)
} else {
swosByKey[key] = swos
}
aw.swosByURLWatcherLock.Unlock()
}
@ -250,6 +278,45 @@ var (
})
)
type swosByKeyWithLock struct {
mu sync.Mutex
swosByKey map[string][]interface{}
}
func (gw *groupWatcher) getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) map[*apiWatcher]*swosByKeyWithLock {
if len(awsMap) == 0 {
return nil
}
swosByAPIWatcher := make(map[*apiWatcher]*swosByKeyWithLock, len(awsMap))
for aw := range awsMap {
swosByAPIWatcher[aw] = &swosByKeyWithLock{
swosByKey: make(map[string][]interface{}),
}
}
// Generate ScrapeWork objects in parallel on available CPU cores.
// This should reduce the time needed for their generation on systems with many CPU cores.
var wg sync.WaitGroup
limiterCh := make(chan struct{}, cgroup.AvailableCPUs())
for key, o := range objectsByKey {
labels := o.getTargetLabels(gw)
wg.Add(1)
limiterCh <- struct{}{}
go func(key string, labels []map[string]string) {
for aw, e := range swosByAPIWatcher {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
e.mu.Lock()
e.swosByKey[key] = swos
e.mu.Unlock()
}
wg.Done()
<-limiterCh
}(key, labels)
}
wg.Wait()
return swosByAPIWatcher
}
func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object {
if role == "node" {
// Node objects have no namespace
@ -310,9 +377,9 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
time.Sleep(sleepTime)
startTime := time.Now()
gw.mu.Lock()
if uw.needUpdateScrapeWorks {
uw.needUpdateScrapeWorks = false
uw.updateScrapeWorksLocked(uw.objectsByKey, uw.aws)
if uw.needRecreateScrapeWorks {
uw.needRecreateScrapeWorks = false
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
sleepTime = time.Since(startTime)
if sleepTime < minSleepTime {
sleepTime = minSleepTime
@ -401,7 +468,7 @@ type urlWatcher struct {
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
needUpdateScrapeWorks bool
needRecreateScrapeWorks bool
resourceVersion string
@ -450,7 +517,7 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
if len(uw.awsPending) == 0 {
return
}
uw.updateScrapeWorksLocked(uw.objectsByKey, uw.awsPending)
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.awsPending)
for aw := range uw.awsPending {
uw.aws[aw] = struct{}{}
}
@ -460,44 +527,23 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-awsPendingLen)
}
func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) {
if len(objectsByKey) == 0 || len(awsMap) == 0 {
return
}
aws := make([]*apiWatcher, 0, len(awsMap))
for aw := range awsMap {
aws = append(aws, aw)
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
// Generate ScrapeWork objects in parallel on available CPU cores.
// This should reduce the time needed for their generation on systems with many CPU cores.
var swosByKeyLock sync.Mutex
var wg sync.WaitGroup
limiterCh := make(chan struct{}, cgroup.AvailableCPUs())
for key, o := range objectsByKey {
labels := o.getTargetLabels(uw.gw)
wg.Add(1)
limiterCh <- struct{}{}
go func(key string, labels []map[string]string) {
for i, aw := range aws {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKeyLock.Lock()
swosByKey[i][key] = swos
swosByKeyLock.Unlock()
}
func (uw *urlWatcher) recreateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) {
es := uw.gw.getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey, awsMap)
for aw, e := range es {
swosByKey := e.swosByKey
for key, swos := range swosByKey {
if len(swos) == 0 {
delete(swosByKey, key)
}
wg.Done()
<-limiterCh
}(key, labels)
}
aw.replaceScrapeWorks(uw, swosByKey)
}
wg.Wait()
for i, aw := range aws {
aw.reloadScrapeWorks(uw, swosByKey[i])
}
func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) {
es := uw.gw.getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey, awsMap)
for aw, e := range es {
aw.updateScrapeWorks(uw, e.swosByKey)
}
}
@ -574,7 +620,7 @@ func (uw *urlWatcher) reloadObjects() string {
uw.removeScrapeWorksLocked(objectsRemoved)
uw.updateScrapeWorksLocked(objectsUpdated, uw.aws)
uw.updateScrapeWorksLocked(objectsAdded, uw.aws)
uw.needUpdateScrapeWorks = false
uw.needRecreateScrapeWorks = false
if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 {
uw.maybeUpdateDependedScrapeWorksLocked()
}
@ -756,12 +802,12 @@ func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() {
}
if (role == "pod" || role == "service") && (uwx.role == "endpoints" || uwx.role == "endpointslice") {
// endpoints and endpointslice objects depend on pods and service objects
uwx.needUpdateScrapeWorks = true
uwx.needRecreateScrapeWorks = true
continue
}
if attachNodeMetadata && role == "node" && uwx.role == "pod" {
// pod objects depend on node objects if attachNodeMetadata is set
uwx.needUpdateScrapeWorks = true
uwx.needRecreateScrapeWorks = true
continue
}
}