Consul SD - update services on the watcher's start (#2202)

* lib/discovery/consul: update services on the watcher's start

Previously, watcher's start was only initing goroutines for discovery
but not waiting for the first iteration to end. It means first Consul
discovery wasn't returning discovered targets until the next iteration.

The change makes the watcher's start blocking until we get first discovery
iteration done and all registries updated.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: remove workarounds for consul SD

Now when consul SD lib properly updates services
on the first start, we don't need workarounds in vmalert.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* lib/discovery/consul: update after review

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-02-21 15:32:45 +02:00 committed by GitHub
parent f620f159a5
commit 69d1893f4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 89 additions and 81 deletions

View file

@ -73,35 +73,18 @@ func (cw *configWatcher) reload(path string) error {
}
// stop existing discovery
close(cw.syncCh)
cw.wg.Wait()
cw.mustStop()
// re-start cw with new config
cw.syncCh = make(chan struct{})
cw.cfg = cfg
cw.resetTargets()
return cw.start()
}
const (
addRetryBackoff = time.Millisecond * 100
addRetryCount = 2
)
func (cw *configWatcher) add(typeK TargetType, interval time.Duration, labelsFn getLabels) error {
var targets []Target
var errors []error
var count int
for { // retry addRetryCount times if first discovery attempts gave no results
targets, errors = targetsFromLabels(labelsFn, cw.cfg, cw.genFn)
for _, err := range errors {
return fmt.Errorf("failed to init notifier for %q: %s", typeK, err)
}
if len(targets) > 0 || count >= addRetryCount {
break
}
time.Sleep(addRetryBackoff)
targets, errors := targetsFromLabels(labelsFn, cw.cfg, cw.genFn)
for _, err := range errors {
return fmt.Errorf("failed to init notifier for %q: %s", typeK, err)
}
cw.setTargets(typeK, targets)
@ -215,7 +198,10 @@ func (cw *configWatcher) start() error {
return nil
}
func (cw *configWatcher) resetTargets() {
func (cw *configWatcher) mustStop() {
close(cw.syncCh)
cw.wg.Wait()
cw.targetsMu.Lock()
for _, targets := range cw.targets {
for _, t := range targets {
@ -224,6 +210,11 @@ func (cw *configWatcher) resetTargets() {
}
cw.targets = make(map[TargetType][]Target)
cw.targetsMu.Unlock()
for i := range cw.cfg.ConsulSDConfigs {
cw.cfg.ConsulSDConfigs[i].MustStop()
}
cw.cfg = nil
}
func (cw *configWatcher) setTargets(key TargetType, targets []Target) {

View file

@ -9,9 +9,6 @@ import (
"os"
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
)
func TestConfigWatcherReload(t *testing.T) {
@ -31,6 +28,7 @@ static_configs:
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
defer cw.mustStop()
ns := cw.notifiers()
if len(ns) != 2 {
t.Fatalf("expected to have 2 notifiers; got %d %#v", len(ns), ns)
@ -78,16 +76,11 @@ consul_sd_configs:
- alertmanager
`, consulSDServer.URL))
prevCheckInterval := *consul.SDCheckInterval
defer func() { *consul.SDCheckInterval = prevCheckInterval }()
*consul.SDCheckInterval = time.Millisecond * 100
cw, err := newWatcher(consulSDFile.Name(), nil)
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
time.Sleep(*consul.SDCheckInterval * 2)
defer cw.mustStop()
if len(cw.notifiers()) != 2 {
t.Fatalf("expected to get 2 notifiers; got %d", len(cw.notifiers()))
@ -161,6 +154,7 @@ consul_sd_configs:
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
defer cw.mustStop()
const workers = 500
const iterations = 10

View file

@ -25,6 +25,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: reduce memory usage during the first three hours after the upgrade from versions older than v1.73.0. The memory usage spike was related to the need of in-memory caches' re-population after the upgrade because of the fix for [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401). Now cache size limits are reduced in order to occupy less memory during the upgrade.
* BUGFIX: fix a bug, which could significantly slow down requests to `/api/v1/labels` and `/api/v1/label/<label_name>/values`. These APIs are used by Grafana for auto-completion of label names and label values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2200).
* BUGFIX: vmalert: add support for `$externalLabels` and `$externalURL` template vars in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2193).
* BUGFIX: vmalert: make sure notifiers are discovered during initialization if they are configured via `consul_sd_configs`. Previously they could be discovered in 30 seconds (the default value for `-promscrape.consulSDCheckInterval` command-line flag) after the initialization. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2202).
* BUGFIX: update default value for `-promscrape.fileSDCheckInterval`, so it matches default duration used by Prometheus for checking for updates in `file_sd_configs`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2187). Thanks to @corporate-gadfly for the fix.

View file

@ -42,7 +42,7 @@ type serviceWatcher struct {
stopCh chan struct{}
}
// newConsulWatcher creates new watcher and start background service discovery for Consul.
// newConsulWatcher creates new watcher and starts background service discovery for Consul.
func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, namespace string) *consulWatcher {
baseQueryArgs := "?dc=" + url.QueryEscape(datacenter)
if sdc.AllowStale {
@ -67,7 +67,10 @@ func newConsulWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter,
services: make(map[string]*serviceWatcher),
stopCh: make(chan struct{}),
}
go cw.watchForServicesUpdates()
initCh := make(chan struct{})
go cw.watchForServicesUpdates(initCh)
// wait for initialization to complete
<-initCh
return cw
}
@ -78,11 +81,57 @@ func (cw *consulWatcher) mustStop() {
// TODO: add ability to cancel blocking requests.
}
func (cw *consulWatcher) updateServices(serviceNames []string) {
var initWG sync.WaitGroup
// Start watchers for new services.
cw.servicesLock.Lock()
for _, serviceName := range serviceNames {
if _, ok := cw.services[serviceName]; ok {
// The watcher for serviceName already exists.
continue
}
sw := &serviceWatcher{
serviceName: serviceName,
stopCh: make(chan struct{}),
}
cw.services[serviceName] = sw
cw.wg.Add(1)
serviceWatchersCreated.Inc()
initWG.Add(1)
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceNodesUpdates(cw, &initWG)
serviceWatchersCount.Dec()
cw.wg.Done()
}()
}
// Stop watchers for removed services.
newServiceNamesMap := make(map[string]struct{}, len(serviceNames))
for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{}
}
for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
close(sw.stopCh)
delete(cw.services, serviceName)
serviceWatchersStopped.Inc()
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
// if it is blocked in Consul API request.
}
cw.servicesLock.Unlock()
// Wait for initialization to complete.
initWG.Wait()
}
// watchForServicesUpdates watches for new services and updates it in cw.
func (cw *consulWatcher) watchForServicesUpdates() {
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
//
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
func (cw *consulWatcher) watchForServicesUpdates(initCh chan struct{}) {
index := int64(0)
clientAddr := cw.client.Addr()
f := func() {
@ -95,51 +144,19 @@ func (cw *consulWatcher) watchForServicesUpdates() {
// Nothing changed.
return
}
cw.servicesLock.Lock()
// Start watchers for new services.
for _, serviceName := range serviceNames {
if _, ok := cw.services[serviceName]; ok {
// The watcher for serviceName already exists.
continue
}
sw := &serviceWatcher{
serviceName: serviceName,
stopCh: make(chan struct{}),
}
cw.services[serviceName] = sw
cw.wg.Add(1)
serviceWatchersCreated.Inc()
go func() {
serviceWatchersCount.Inc()
sw.watchForServiceNodesUpdates(cw)
serviceWatchersCount.Dec()
cw.wg.Done()
}()
}
// Stop watchers for removed services.
newServiceNamesMap := make(map[string]struct{}, len(serviceNames))
for _, serviceName := range serviceNames {
newServiceNamesMap[serviceName] = struct{}{}
}
for serviceName, sw := range cw.services {
if _, ok := newServiceNamesMap[serviceName]; ok {
continue
}
close(sw.stopCh)
delete(cw.services, serviceName)
serviceWatchersStopped.Inc()
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
// if it is blocked in Consul API request.
}
cw.servicesLock.Unlock()
cw.updateServices(serviceNames)
index = newIndex
}
logger.Infof("started Consul service watcher for %q", clientAddr)
f()
// send signal that initialization is complete
close(initCh)
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
@ -196,10 +213,9 @@ func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64,
}
// watchForServiceNodesUpdates watches for Consul serviceNode changes for the given serviceName.
func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher) {
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
//
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG *sync.WaitGroup) {
clientAddr := cw.client.Addr()
index := int64(0)
path := "/v1/health/service/" + sw.serviceName + cw.serviceNodesQueryArgs
@ -227,6 +243,12 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher) {
}
f()
// Notify caller that initialization is complete
initWG.Done()
checkInterval := getCheckInterval()
ticker := time.NewTicker(checkInterval / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C: