mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape/discovery/kubernetes: fixes kubernetes service discovery (#2615)
* lib/promscrape/discovery/kubernetes: properly updates discovered scrape works previously, added or updated scrapeworks may override previuosly discovered. it happens because swosByKey may contain small subset of kubernetes objects with it's labels. It happens for objectsUpdated and objectsAdded maps, which include only changed elements * Properly calculate vm_promscrape_discovery_kubernetes_scrape_works Co-authored-by: f41gh7 <nik@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
2cf586da78
commit
d5eb6afe26
2 changed files with 100 additions and 53 deletions
|
@ -19,14 +19,15 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `influx-prometheus-mode` command-line flag, which allows to restore the original time series written from Prometheus into InfluxDB during data migration from InfluxDB to VictoriaMetrics. See [this feature request](https://github.com/VictoriaMetrics/vmctl/issues/8). Thanks to @mback2k for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2545).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to specify AWS service name when issuing requests to AWS api. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2605). Thanks to @transacid for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2604).
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix a bug, which could lead to incomplete discovery of scrape targets in Kubernetes (aka `kubernetes_sd_config`). the bug has been introduced in [v1.77.0](https://docs.victoriametrics.com/CHANGELOG.html#v1770).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `scalar` result type in response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2607).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): support strings in `humanize.*` template function in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2569).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): proxy `/rules` requests to vmalert from Grafana's alerting UI. This removes errors in Grafana's UI for Grafana versions older than `8.5.*`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2583)
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not add `/api/v1/query` suffix to `-datasource.url` if `-remoteRead.disablePathAppend` command-line flag is set. Previously this flag was applied only to `-remoteRead.url`, which could confuse users.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent from possible resource leak on config update, which could lead to the slowdown of `vmalert` over time. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2577).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not return values from [label_value()](https://docs.victoriametrics.com/MetricsQL.html#label_value) function if the original time series has no values at the selected timestamps.
|
||||
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): limit the number of concurrently established connections from vmselect to vmstorage. This should prevent from potentially high spikes in the number of established connections after temporary slowdown in connection handshake procedure between vmselect and vmstorage because of spikes in workload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552).
|
||||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix build for Solaris / SmartOS. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1322#issuecomment-1120276146).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not add `/api/v1/query` suffix to `-datasource.url` if `-remoteRead.disablePathAppend` command-line flag is set. Previously this flag was applied only to `-remoteRead.url`, which could confuse users.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent from possible resource leak on config update, which could lead to the slowdown of `vmalert` over time. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2577).
|
||||
|
||||
## [v1.77.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.1)
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ type apiWatcher struct {
|
|||
|
||||
gw *groupWatcher
|
||||
|
||||
// swos contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher
|
||||
// swosByURLWatcher contains per-urlWatcher maps of ScrapeWork objects for the given apiWatcher
|
||||
swosByURLWatcher map[*urlWatcher]map[string][]interface{}
|
||||
swosByURLWatcherLock sync.Mutex
|
||||
|
||||
|
@ -91,23 +91,51 @@ func (aw *apiWatcher) mustStart() {
|
|||
aw.gw.startWatchersForRole(aw.role, aw)
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) updateSwosCount(multiplier int, swosByKey map[string][]interface{}) {
|
||||
n := 0
|
||||
for _, swos := range swosByKey {
|
||||
n += len(swos)
|
||||
}
|
||||
n *= multiplier
|
||||
aw.swosCount.Add(n)
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) mustStop() {
|
||||
aw.gw.unsubscribeAPIWatcher(aw)
|
||||
aw.swosByURLWatcherLock.Lock()
|
||||
for _, swosByKey := range aw.swosByURLWatcher {
|
||||
aw.swosCount.Add(-len(swosByKey))
|
||||
aw.updateSwosCount(-1, swosByKey)
|
||||
}
|
||||
aw.swosByURLWatcher = make(map[*urlWatcher]map[string][]interface{})
|
||||
aw.swosByURLWatcherLock.Unlock()
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) reloadScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) {
|
||||
func (aw *apiWatcher) replaceScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) {
|
||||
aw.swosByURLWatcherLock.Lock()
|
||||
aw.swosCount.Add(len(swosByKey) - len(aw.swosByURLWatcher[uw]))
|
||||
aw.updateSwosCount(-1, aw.swosByURLWatcher[uw])
|
||||
aw.updateSwosCount(1, swosByKey)
|
||||
aw.swosByURLWatcher[uw] = swosByKey
|
||||
aw.swosByURLWatcherLock.Unlock()
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) updateScrapeWorks(uw *urlWatcher, swosByKey map[string][]interface{}) {
|
||||
aw.swosByURLWatcherLock.Lock()
|
||||
dst := aw.swosByURLWatcher[uw]
|
||||
if dst == nil {
|
||||
dst = make(map[string][]interface{})
|
||||
aw.swosByURLWatcher[uw] = dst
|
||||
}
|
||||
for key, swos := range swosByKey {
|
||||
aw.swosCount.Add(len(swos) - len(dst[key]))
|
||||
if len(swos) == 0 {
|
||||
delete(dst, key)
|
||||
} else {
|
||||
dst[key] = swos
|
||||
}
|
||||
}
|
||||
aw.swosByURLWatcherLock.Unlock()
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[string]string) {
|
||||
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
|
||||
aw.swosByURLWatcherLock.Lock()
|
||||
|
@ -117,10 +145,10 @@ func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[st
|
|||
aw.swosByURLWatcher[uw] = swosByKey
|
||||
}
|
||||
aw.swosCount.Add(len(swos) - len(swosByKey[key]))
|
||||
if len(swos) > 0 {
|
||||
swosByKey[key] = swos
|
||||
} else {
|
||||
if len(swos) == 0 {
|
||||
delete(swosByKey, key)
|
||||
} else {
|
||||
swosByKey[key] = swos
|
||||
}
|
||||
aw.swosByURLWatcherLock.Unlock()
|
||||
}
|
||||
|
@ -250,6 +278,45 @@ var (
|
|||
})
|
||||
)
|
||||
|
||||
type swosByKeyWithLock struct {
|
||||
mu sync.Mutex
|
||||
swosByKey map[string][]interface{}
|
||||
}
|
||||
|
||||
func (gw *groupWatcher) getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) map[*apiWatcher]*swosByKeyWithLock {
|
||||
if len(awsMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
swosByAPIWatcher := make(map[*apiWatcher]*swosByKeyWithLock, len(awsMap))
|
||||
for aw := range awsMap {
|
||||
swosByAPIWatcher[aw] = &swosByKeyWithLock{
|
||||
swosByKey: make(map[string][]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Generate ScrapeWork objects in parallel on available CPU cores.
|
||||
// This should reduce the time needed for their generation on systems with many CPU cores.
|
||||
var wg sync.WaitGroup
|
||||
limiterCh := make(chan struct{}, cgroup.AvailableCPUs())
|
||||
for key, o := range objectsByKey {
|
||||
labels := o.getTargetLabels(gw)
|
||||
wg.Add(1)
|
||||
limiterCh <- struct{}{}
|
||||
go func(key string, labels []map[string]string) {
|
||||
for aw, e := range swosByAPIWatcher {
|
||||
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
|
||||
e.mu.Lock()
|
||||
e.swosByKey[key] = swos
|
||||
e.mu.Unlock()
|
||||
}
|
||||
wg.Done()
|
||||
<-limiterCh
|
||||
}(key, labels)
|
||||
}
|
||||
wg.Wait()
|
||||
return swosByAPIWatcher
|
||||
}
|
||||
|
||||
func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object {
|
||||
if role == "node" {
|
||||
// Node objects have no namespace
|
||||
|
@ -310,9 +377,9 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
|
|||
time.Sleep(sleepTime)
|
||||
startTime := time.Now()
|
||||
gw.mu.Lock()
|
||||
if uw.needUpdateScrapeWorks {
|
||||
uw.needUpdateScrapeWorks = false
|
||||
uw.updateScrapeWorksLocked(uw.objectsByKey, uw.aws)
|
||||
if uw.needRecreateScrapeWorks {
|
||||
uw.needRecreateScrapeWorks = false
|
||||
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
|
||||
sleepTime = time.Since(startTime)
|
||||
if sleepTime < minSleepTime {
|
||||
sleepTime = minSleepTime
|
||||
|
@ -401,7 +468,7 @@ type urlWatcher struct {
|
|||
// objectsByKey contains the latest state for objects obtained from apiURL
|
||||
objectsByKey map[string]object
|
||||
|
||||
needUpdateScrapeWorks bool
|
||||
needRecreateScrapeWorks bool
|
||||
|
||||
resourceVersion string
|
||||
|
||||
|
@ -450,7 +517,7 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
|
|||
if len(uw.awsPending) == 0 {
|
||||
return
|
||||
}
|
||||
uw.updateScrapeWorksLocked(uw.objectsByKey, uw.awsPending)
|
||||
uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.awsPending)
|
||||
for aw := range uw.awsPending {
|
||||
uw.aws[aw] = struct{}{}
|
||||
}
|
||||
|
@ -460,44 +527,23 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
|
|||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-awsPendingLen)
|
||||
}
|
||||
|
||||
func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) {
|
||||
if len(objectsByKey) == 0 || len(awsMap) == 0 {
|
||||
return
|
||||
}
|
||||
aws := make([]*apiWatcher, 0, len(awsMap))
|
||||
for aw := range awsMap {
|
||||
aws = append(aws, aw)
|
||||
}
|
||||
swosByKey := make([]map[string][]interface{}, len(aws))
|
||||
for i := range aws {
|
||||
swosByKey[i] = make(map[string][]interface{})
|
||||
}
|
||||
|
||||
// Generate ScrapeWork objects in parallel on available CPU cores.
|
||||
// This should reduce the time needed for their generation on systems with many CPU cores.
|
||||
var swosByKeyLock sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
limiterCh := make(chan struct{}, cgroup.AvailableCPUs())
|
||||
for key, o := range objectsByKey {
|
||||
labels := o.getTargetLabels(uw.gw)
|
||||
wg.Add(1)
|
||||
limiterCh <- struct{}{}
|
||||
go func(key string, labels []map[string]string) {
|
||||
for i, aw := range aws {
|
||||
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
|
||||
if len(swos) > 0 {
|
||||
swosByKeyLock.Lock()
|
||||
swosByKey[i][key] = swos
|
||||
swosByKeyLock.Unlock()
|
||||
}
|
||||
func (uw *urlWatcher) recreateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) {
|
||||
es := uw.gw.getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey, awsMap)
|
||||
for aw, e := range es {
|
||||
swosByKey := e.swosByKey
|
||||
for key, swos := range swosByKey {
|
||||
if len(swos) == 0 {
|
||||
delete(swosByKey, key)
|
||||
}
|
||||
wg.Done()
|
||||
<-limiterCh
|
||||
}(key, labels)
|
||||
}
|
||||
aw.replaceScrapeWorks(uw, swosByKey)
|
||||
}
|
||||
wg.Wait()
|
||||
for i, aw := range aws {
|
||||
aw.reloadScrapeWorks(uw, swosByKey[i])
|
||||
}
|
||||
|
||||
func (uw *urlWatcher) updateScrapeWorksLocked(objectsByKey map[string]object, awsMap map[*apiWatcher]struct{}) {
|
||||
es := uw.gw.getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey, awsMap)
|
||||
for aw, e := range es {
|
||||
aw.updateScrapeWorks(uw, e.swosByKey)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -574,7 +620,7 @@ func (uw *urlWatcher) reloadObjects() string {
|
|||
uw.removeScrapeWorksLocked(objectsRemoved)
|
||||
uw.updateScrapeWorksLocked(objectsUpdated, uw.aws)
|
||||
uw.updateScrapeWorksLocked(objectsAdded, uw.aws)
|
||||
uw.needUpdateScrapeWorks = false
|
||||
uw.needRecreateScrapeWorks = false
|
||||
if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 {
|
||||
uw.maybeUpdateDependedScrapeWorksLocked()
|
||||
}
|
||||
|
@ -756,12 +802,12 @@ func (uw *urlWatcher) maybeUpdateDependedScrapeWorksLocked() {
|
|||
}
|
||||
if (role == "pod" || role == "service") && (uwx.role == "endpoints" || uwx.role == "endpointslice") {
|
||||
// endpoints and endpointslice objects depend on pods and service objects
|
||||
uwx.needUpdateScrapeWorks = true
|
||||
uwx.needRecreateScrapeWorks = true
|
||||
continue
|
||||
}
|
||||
if attachNodeMetadata && role == "node" && uwx.role == "pod" {
|
||||
// pod objects depend on node objects if attachNodeMetadata is set
|
||||
uwx.needUpdateScrapeWorks = true
|
||||
uwx.needRecreateScrapeWorks = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue