mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
vmalert: fix potential race during configuration reloads (#497)
Configuration reload and rules evaluation can't be executed in same time now. This may make reload time longer but prevents from potential races.
This commit is contained in:
parent
7a8ef517ae
commit
de60ad0cd6
5 changed files with 45 additions and 27 deletions
|
@ -52,7 +52,9 @@ publish-vmalert:
|
||||||
APP_NAME=vmalert $(MAKE) publish-via-docker
|
APP_NAME=vmalert $(MAKE) publish-via-docker
|
||||||
|
|
||||||
test-vmalert:
|
test-vmalert:
|
||||||
go test -race -cover ./app/vmalert
|
go test -v -race -cover ./app/vmalert -loggerLevel=ERROR
|
||||||
|
go test -v -race -cover ./app/vmalert/datasource
|
||||||
|
go test -v -race -cover ./app/vmalert/notifier
|
||||||
|
|
||||||
run-vmalert: vmalert
|
run-vmalert: vmalert
|
||||||
./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \
|
./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \
|
||||||
|
|
|
@ -32,8 +32,9 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) {
|
||||||
return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", g.Name, file)
|
return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", g.Name, file)
|
||||||
}
|
}
|
||||||
g.File = file
|
g.File = file
|
||||||
g.done = make(chan struct{})
|
g.doneCh = make(chan struct{})
|
||||||
g.finished = make(chan struct{})
|
g.finishedCh = make(chan struct{})
|
||||||
|
g.updateCh = make(chan Group)
|
||||||
|
|
||||||
groupsNames[g.Name] = struct{}{}
|
groupsNames[g.Name] = struct{}{}
|
||||||
for _, rule := range g.Rules {
|
for _, rule := range g.Rules {
|
||||||
|
|
|
@ -19,13 +19,16 @@ type Group struct {
|
||||||
File string
|
File string
|
||||||
Rules []*Rule
|
Rules []*Rule
|
||||||
|
|
||||||
done chan struct{}
|
doneCh chan struct{}
|
||||||
finished chan struct{}
|
finishedCh chan struct{}
|
||||||
|
// channel accepts new Group obj
|
||||||
|
// which supposed to update current group
|
||||||
|
updateCh chan Group
|
||||||
}
|
}
|
||||||
|
|
||||||
// ID return unique group ID that consists of
|
// ID return unique group ID that consists of
|
||||||
// rules file and group name
|
// rules file and group name
|
||||||
func (g Group) ID() uint64 {
|
func (g *Group) ID() uint64 {
|
||||||
hash := fnv.New64a()
|
hash := fnv.New64a()
|
||||||
hash.Write([]byte(g.File))
|
hash.Write([]byte(g.File))
|
||||||
hash.Write([]byte("\xff"))
|
hash.Write([]byte("\xff"))
|
||||||
|
@ -47,8 +50,8 @@ func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateWith updates existing group with
|
// updateWith updates existing group with
|
||||||
// passed group object. Must be called
|
// passed group object.
|
||||||
// under mutex lock.
|
// Not thread-safe.
|
||||||
func (g *Group) updateWith(newGroup Group) {
|
func (g *Group) updateWith(newGroup Group) {
|
||||||
rulesRegistry := make(map[string]*Rule)
|
rulesRegistry := make(map[string]*Rule)
|
||||||
for _, nr := range newGroup.Rules {
|
for _, nr := range newGroup.Rules {
|
||||||
|
@ -106,11 +109,11 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (g *Group) close() {
|
func (g *Group) close() {
|
||||||
if g.done == nil {
|
if g.doneCh == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
close(g.done)
|
close(g.doneCh)
|
||||||
<-g.finished
|
<-g.finishedCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Group) start(ctx context.Context, interval time.Duration,
|
func (g *Group) start(ctx context.Context, interval time.Duration,
|
||||||
|
@ -122,12 +125,14 @@ func (g *Group) start(ctx context.Context, interval time.Duration,
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Infof("group %q: context cancelled", g.Name)
|
logger.Infof("group %q: context cancelled", g.Name)
|
||||||
close(g.finished)
|
close(g.finishedCh)
|
||||||
return
|
return
|
||||||
case <-g.done:
|
case <-g.doneCh:
|
||||||
logger.Infof("group %q: received stop signal", g.Name)
|
logger.Infof("group %q: received stop signal", g.Name)
|
||||||
close(g.finished)
|
close(g.finishedCh)
|
||||||
return
|
return
|
||||||
|
case ng := <-g.updateCh:
|
||||||
|
g.updateWith(ng)
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
iterationTotal.Inc()
|
iterationTotal.Inc()
|
||||||
iterationStart := time.Now()
|
iterationStart := time.Now()
|
||||||
|
|
|
@ -87,8 +87,7 @@ func (m *manager) update(ctx context.Context, path []string, validate, restore b
|
||||||
|
|
||||||
m.groupsMu.Lock()
|
m.groupsMu.Lock()
|
||||||
for _, og := range m.groups {
|
for _, og := range m.groups {
|
||||||
id := og.ID()
|
ng, ok := groupsRegistry[og.ID()]
|
||||||
ng, ok := groupsRegistry[id]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
// old group is not present in new list
|
// old group is not present in new list
|
||||||
// and must be stopped and deleted
|
// and must be stopped and deleted
|
||||||
|
@ -97,7 +96,7 @@ func (m *manager) update(ctx context.Context, path []string, validate, restore b
|
||||||
og = nil
|
og = nil
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
og.updateWith(ng)
|
og.updateCh <- ng
|
||||||
delete(groupsRegistry, ng.ID())
|
delete(groupsRegistry, ng.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,24 +26,35 @@ func TestManagerUpdateError(t *testing.T) {
|
||||||
// execution of configuration update.
|
// execution of configuration update.
|
||||||
// Should be executed with -race flag
|
// Should be executed with -race flag
|
||||||
func TestManagerUpdateConcurrent(t *testing.T) {
|
func TestManagerUpdateConcurrent(t *testing.T) {
|
||||||
m := &manager{groups: make(map[uint64]*Group)}
|
m := &manager{
|
||||||
|
groups: make(map[uint64]*Group),
|
||||||
|
storage: &fakeQuerier{},
|
||||||
|
notifier: &fakeNotifier{},
|
||||||
|
}
|
||||||
paths := []string{
|
paths := []string{
|
||||||
"testdata/dir/rules0-good.rules",
|
"testdata/dir/rules0-good.rules",
|
||||||
"testdata/dir/rules1-good.rules",
|
"testdata/dir/rules1-good.rules",
|
||||||
"testdata/rules0-good.rules",
|
"testdata/rules0-good.rules",
|
||||||
}
|
}
|
||||||
|
*evaluationInterval = time.Millisecond
|
||||||
|
if err := m.start(context.Background(), []string{paths[0]}, true); err != nil {
|
||||||
|
t.Fatalf("failed to start: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
const n = 500
|
const workers = 500
|
||||||
|
const iterations = 10
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(n)
|
wg.Add(workers)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
rnd := rand.Intn(len(paths))
|
for i := 0; i < iterations; i++ {
|
||||||
path := []string{paths[rnd]}
|
rnd := rand.Intn(len(paths))
|
||||||
err := m.update(context.Background(), path, true, false)
|
path := []string{paths[rnd]}
|
||||||
if err != nil {
|
err := m.update(context.Background(), path, true, false)
|
||||||
t.Errorf("update error: %s", err)
|
if err != nil {
|
||||||
|
t.Errorf("update error: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -111,7 +122,7 @@ func TestManagerUpdate(t *testing.T) {
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
m := &manager{groups: make(map[uint64]*Group)}
|
m := &manager{groups: make(map[uint64]*Group), storage: &fakeQuerier{}}
|
||||||
path := []string{tc.initPath}
|
path := []string{tc.initPath}
|
||||||
if err := m.update(ctx, path, true, false); err != nil {
|
if err := m.update(ctx, path, true, false); err != nil {
|
||||||
t.Fatalf("failed to complete initial rules update: %s", err)
|
t.Fatalf("failed to complete initial rules update: %s", err)
|
||||||
|
|
Loading…
Reference in a new issue