lib/promscrape/discovery/{consul,nomad}: wait until the deleted serviceWatchers are stopped inside updateServices() call

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3468
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367
This commit is contained in:
Aliaksandr Valialkin 2023-01-05 21:52:31 -08:00
parent f392913d00
commit 463b957e54
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 50 additions and 28 deletions

View file

@ -34,15 +34,15 @@ type consulWatcher struct {
servicesLock sync.Mutex
services map[string]*serviceWatcher
servicesWG sync.WaitGroup
wg sync.WaitGroup
stopCh chan struct{}
stopCh chan struct{}
stoppedCh chan struct{}
}
type serviceWatcher struct {
serviceName string
serviceNodes []ServiceNode
stopCh chan struct{}
stoppedCh chan struct{}
}
// newConsulWatcher creates new watcher and starts background service discovery for Consul.
@ -72,12 +72,12 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
watchTags: sdc.Tags,
services: make(map[string]*serviceWatcher),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
initCh := make(chan struct{})
cw.wg.Add(1)
go func() {
cw.watchForServicesUpdates(initCh)
cw.wg.Done()
close(cw.stoppedCh)
}()
// wait for initialization to complete
<-initCh
@ -87,11 +87,12 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
func (cw *consulWatcher) mustStop() {
close(cw.stopCh)
cw.client.Stop()
cw.wg.Wait()
<-cw.stoppedCh
}
func (cw *consulWatcher) updateServices(serviceNames []string) {
var initWG sync.WaitGroup
// Start watchers for new services.
cw.servicesLock.Lock()
for _, serviceName := range serviceNames {
@ -102,16 +103,16 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
sw := &serviceWatcher{
serviceName: serviceName,
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
cw.services[serviceName] = sw
cw.servicesWG.Add(1)
serviceWatchersCreated.Inc()
initWG.Add(1)
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceNodesUpdates(cw, &initWG)
serviceWatchersCount.Dec()
cw.servicesWG.Done()
close(sw.stoppedCh)
}()
}
@ -120,20 +121,24 @@ func (cw *consulWatcher) updateServices(serviceNames []string) {
for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{}
}
var swsStopped []*serviceWatcher
for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
close(sw.stopCh)
delete(cw.services, serviceName)
serviceWatchersStopped.Inc()
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
// if it is blocked in Consul API request.
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
// Wait for initialization to complete.
// Wait until deleted service watchers are stopped.
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
// Wait until added service watchers are initialized.
initWG.Wait()
}
@ -175,12 +180,18 @@ func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
case <-cw.stopCh:
logger.Infof("stopping Consul service watchers for %q", apiServer)
startTime := time.Now()
var swsStopped []*serviceWatcher
cw.servicesLock.Lock()
for _, sw := range cw.services {
close(sw.stopCh)
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
cw.servicesWG.Wait()
for _, sw := range swsStopped {
<-sw.stoppedCh
}
logger.Infof("stopped Consul service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return
}

View file

@ -33,15 +33,15 @@ type nomadWatcher struct {
servicesLock sync.Mutex
services map[string]*serviceWatcher
servicesWG sync.WaitGroup
wg sync.WaitGroup
stopCh chan struct{}
stopCh chan struct{}
stoppedCh chan struct{}
}
type serviceWatcher struct {
serviceName string
services []Service
stopCh chan struct{}
stoppedCh chan struct{}
}
// newNomadWatcher creates new watcher and starts background service discovery for Nomad.
@ -62,12 +62,12 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
watchTags: sdc.Tags,
services: make(map[string]*serviceWatcher),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
initCh := make(chan struct{})
cw.wg.Add(1)
go func() {
cw.watchForServicesUpdates(initCh)
cw.wg.Done()
close(cw.stoppedCh)
}()
// wait for initialization to complete
<-initCh
@ -77,11 +77,12 @@ func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, n
func (cw *nomadWatcher) mustStop() {
close(cw.stopCh)
cw.client.Stop()
cw.wg.Wait()
<-cw.stoppedCh
}
func (cw *nomadWatcher) updateServices(serviceNames []string) {
var initWG sync.WaitGroup
// Start watchers for new services.
cw.servicesLock.Lock()
for _, serviceName := range serviceNames {
@ -92,16 +93,16 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
sw := &serviceWatcher{
serviceName: serviceName,
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
cw.services[serviceName] = sw
cw.servicesWG.Add(1)
serviceWatchersCreated.Inc()
initWG.Add(1)
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceAddressUpdates(cw, &initWG)
serviceWatchersCount.Dec()
cw.servicesWG.Done()
close(sw.stoppedCh)
}()
}
@ -110,20 +111,24 @@ func (cw *nomadWatcher) updateServices(serviceNames []string) {
for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{}
}
var swsStopped []*serviceWatcher
for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
close(sw.stopCh)
delete(cw.services, serviceName)
serviceWatchersStopped.Inc()
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
// if it is blocked in Nomad API request.
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
// Wait for initialization to complete.
// Wait until deleted service watchers are stopped.
for _, sw := range swsStopped {
<-sw.stoppedCh
serviceWatchersStopped.Inc()
}
// Wait until added service watchers are initialized.
initWG.Wait()
}
@ -165,12 +170,18 @@ func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
case <-cw.stopCh:
logger.Infof("stopping Nomad service watchers for %q", apiServer)
startTime := time.Now()
var swsStopped []*serviceWatcher
cw.servicesLock.Lock()
for _, sw := range cw.services {
close(sw.stopCh)
swsStopped = append(swsStopped, sw)
}
cw.servicesLock.Unlock()
cw.servicesWG.Wait()
for _, sw := range swsStopped {
<-sw.stoppedCh
}
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
return
}