diff --git a/app/vmalert/Makefile b/app/vmalert/Makefile index d49984ddfe..0d94393985 100644 --- a/app/vmalert/Makefile +++ b/app/vmalert/Makefile @@ -52,7 +52,9 @@ publish-vmalert: APP_NAME=vmalert $(MAKE) publish-via-docker test-vmalert: - go test -race -cover ./app/vmalert + go test -v -race -cover ./app/vmalert -loggerLevel=ERROR + go test -v -race -cover ./app/vmalert/datasource + go test -v -race -cover ./app/vmalert/notifier run-vmalert: vmalert ./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \ diff --git a/app/vmalert/config.go b/app/vmalert/config.go index e46c0c4978..b84e7044d6 100644 --- a/app/vmalert/config.go +++ b/app/vmalert/config.go @@ -32,8 +32,9 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) { return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", g.Name, file) } g.File = file - g.done = make(chan struct{}) - g.finished = make(chan struct{}) + g.doneCh = make(chan struct{}) + g.finishedCh = make(chan struct{}) + g.updateCh = make(chan Group) groupsNames[g.Name] = struct{}{} for _, rule := range g.Rules { diff --git a/app/vmalert/group.go b/app/vmalert/group.go index d4713496f9..f666c69341 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -19,13 +19,16 @@ type Group struct { File string Rules []*Rule - done chan struct{} - finished chan struct{} + doneCh chan struct{} + finishedCh chan struct{} + // channel accepts new Group obj + // which supposed to update current group + updateCh chan Group } // ID return unique group ID that consists of // rules file and group name -func (g Group) ID() uint64 { +func (g *Group) ID() uint64 { hash := fnv.New64a() hash.Write([]byte(g.File)) hash.Write([]byte("\xff")) @@ -47,8 +50,8 @@ func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time } // updateWith updates existing group with -// passed group object. Must be called -// under mutex lock. +// passed group object. +// Not thread-safe. func (g *Group) updateWith(newGroup Group) { rulesRegistry := make(map[string]*Rule) for _, nr := range newGroup.Rules { @@ -106,11 +109,11 @@ var ( ) func (g *Group) close() { - if g.done == nil { + if g.doneCh == nil { return } - close(g.done) - <-g.finished + close(g.doneCh) + <-g.finishedCh } func (g *Group) start(ctx context.Context, interval time.Duration, @@ -122,12 +125,14 @@ func (g *Group) start(ctx context.Context, interval time.Duration, select { case <-ctx.Done(): logger.Infof("group %q: context cancelled", g.Name) - close(g.finished) + close(g.finishedCh) return - case <-g.done: + case <-g.doneCh: logger.Infof("group %q: received stop signal", g.Name) - close(g.finished) + close(g.finishedCh) return + case ng := <-g.updateCh: + g.updateWith(ng) case <-t.C: iterationTotal.Inc() iterationStart := time.Now() diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index 38504fd52c..4429ca8d30 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -87,8 +87,7 @@ func (m *manager) update(ctx context.Context, path []string, validate, restore b m.groupsMu.Lock() for _, og := range m.groups { - id := og.ID() - ng, ok := groupsRegistry[id] + ng, ok := groupsRegistry[og.ID()] if !ok { // old group is not present in new list // and must be stopped and deleted @@ -97,7 +96,7 @@ func (m *manager) update(ctx context.Context, path []string, validate, restore b og = nil continue } - og.updateWith(ng) + og.updateCh <- ng delete(groupsRegistry, ng.ID()) } diff --git a/app/vmalert/manager_test.go b/app/vmalert/manager_test.go index afe30d5b73..1e96e6b493 100644 --- a/app/vmalert/manager_test.go +++ b/app/vmalert/manager_test.go @@ -26,24 +26,35 @@ func TestManagerUpdateError(t *testing.T) { // execution of configuration update. // Should be executed with -race flag func TestManagerUpdateConcurrent(t *testing.T) { - m := &manager{groups: make(map[uint64]*Group)} + m := &manager{ + groups: make(map[uint64]*Group), + storage: &fakeQuerier{}, + notifier: &fakeNotifier{}, + } paths := []string{ "testdata/dir/rules0-good.rules", "testdata/dir/rules1-good.rules", "testdata/rules0-good.rules", } + *evaluationInterval = time.Millisecond + if err := m.start(context.Background(), []string{paths[0]}, true); err != nil { + t.Fatalf("failed to start: %s", err) + } - const n = 500 + const workers = 500 + const iterations = 10 wg := sync.WaitGroup{} - wg.Add(n) - for i := 0; i < n; i++ { + wg.Add(workers) + for i := 0; i < workers; i++ { go func() { defer wg.Done() - rnd := rand.Intn(len(paths)) - path := []string{paths[rnd]} - err := m.update(context.Background(), path, true, false) - if err != nil { - t.Errorf("update error: %s", err) + for i := 0; i < iterations; i++ { + rnd := rand.Intn(len(paths)) + path := []string{paths[rnd]} + err := m.update(context.Background(), path, true, false) + if err != nil { + t.Errorf("update error: %s", err) + } } }() } @@ -111,7 +122,7 @@ func TestManagerUpdate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) - m := &manager{groups: make(map[uint64]*Group)} + m := &manager{groups: make(map[uint64]*Group), storage: &fakeQuerier{}} path := []string{tc.initPath} if err := m.update(ctx, path, true, false); err != nil { t.Fatalf("failed to complete initial rules update: %s", err)