From d6fa4da7128e1ecbeff31d0091ecf843b5da938f Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Wed, 1 Mar 2023 15:48:20 +0100 Subject: [PATCH] vmalert: cancel in-flight requests on group's update or close (#3886) When group's update() or close() method is called, the group still need to wait for its current evaluation to finish. Sometimes, evaluation could take a significant amount of time which slows configuration update or vmalert's graceful shutdown. The change interrupts current evaluation in order to speed up the graceful shutdown or config update procedures. Signed-off-by: hagen1778 --- app/vmalert/group.go | 42 ++++++++++++++++++++++++++++++++++--- app/vmalert/group_test.go | 28 +++++++++++++++++++++++++ app/vmalert/helpers_test.go | 18 ++++++++++++++++ app/vmalert/manager.go | 5 +++-- 4 files changed, 88 insertions(+), 5 deletions(-) diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 7bfda9eee..77bb23e30 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "hash/fnv" "net/url" @@ -44,6 +45,9 @@ type Group struct { // channel accepts new Group obj // which supposed to update current group updateCh chan *Group + // evalCancel stores the cancel fn for interrupting + // rules evaluation. Used on groups update() and close(). + evalCancel context.CancelFunc metrics *groupMetrics } @@ -233,11 +237,24 @@ func (g *Group) updateWith(newGroup *Group) error { return nil } +// interruptEval interrupts in-flight rules evaluations +// within the group. It is expected that g.evalCancel +// will be repopulated after the call. +func (g *Group) interruptEval() { + g.mu.RLock() + defer g.mu.RUnlock() + + if g.evalCancel != nil { + g.evalCancel() + } +} + func (g *Group) close() { if g.doneCh == nil { return } close(g.doneCh) + g.interruptEval() <-g.finishedCh g.metrics.iterationDuration.Unregister() @@ -263,7 +280,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) - eval := func(ts time.Time) { + eval := func(ctx context.Context, ts time.Time) { g.metrics.iterationTotal.Inc() start := time.Now() @@ -285,7 +302,13 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r g.LastEvaluation = start } - eval(evalTS) + evalCtx, cancel := context.WithCancel(ctx) + g.mu.Lock() + g.evalCancel = cancel + g.mu.Unlock() + defer g.evalCancel() + + eval(evalCtx, evalTS) t := time.NewTicker(g.Interval) defer t.Stop() @@ -309,6 +332,14 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r return case ng := <-g.updateCh: g.mu.Lock() + + // it is expected that g.evalCancel will be evoked + // somewhere else to unblock group from the rules evaluation. + // we recreate the evalCtx and g.evalCancel, so it can + // be called again. + evalCtx, cancel = context.WithCancel(ctx) + g.evalCancel = cancel + err := g.updateWith(ng) if err != nil { logger.Errorf("group %q: failed to update: %s", g.Name, err) @@ -333,7 +364,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r } evalTS = evalTS.Add((missed + 1) * g.Interval) - eval(evalTS) + eval(evalCtx, evalTS) } } } @@ -407,6 +438,11 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur tss, err := rule.Exec(ctx, ts, limit) if err != nil { + if errors.Is(err, context.Canceled) { + // the context can be cancelled on graceful shutdown + // or on group update. So no need to handle the error as usual. + return nil + } execErrors.Inc() return fmt.Errorf("rule %q: failed to execute: %w", rule, err) } diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index b20a0a516..29199a4bd 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -474,3 +474,31 @@ func TestFaultyRW(t *testing.T) { t.Fatalf("expected to get an error from faulty RW client, got nil instead") } } + +func TestCloseWithEvalInterruption(t *testing.T) { + groups, err := config.Parse([]string{"config/testdata/rules/rules1-good.rules"}, notifier.ValidateTemplates, true) + if err != nil { + t.Fatalf("failed to parse rules: %s", err) + } + + const delay = time.Second * 2 + fq := &fakeQuerierWithDelay{delay: delay} + + const evalInterval = time.Millisecond + g := newGroup(groups[0], fq, evalInterval, nil) + + go g.start(context.Background(), nil, nil, nil) + + time.Sleep(evalInterval * 20) + + go func() { + g.close() + }() + + deadline := time.Tick(delay / 2) + select { + case <-deadline: + t.Fatalf("deadline for close exceeded") + case <-g.finishedCh: + } +} diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go index 7ef8bd981..60183aad8 100644 --- a/app/vmalert/helpers_test.go +++ b/app/vmalert/helpers_test.go @@ -104,6 +104,24 @@ func (fqr *fakeQuerierWithRegistry) Query(_ context.Context, expr string, _ time return cp, req, nil } +type fakeQuerierWithDelay struct { + fakeQuerier + delay time.Duration +} + +func (fqd *fakeQuerierWithDelay) Query(ctx context.Context, expr string, ts time.Time) ([]datasource.Metric, *http.Request, error) { + timer := time.NewTimer(fqd.delay) + select { + case <-ctx.Done(): + case <-timer.C: + } + return fqd.fakeQuerier.Query(ctx, expr, ts) +} + +func (fqd *fakeQuerierWithDelay) BuildWithParams(_ datasource.QuerierParams) datasource.Querier { + return fqd +} + type fakeNotifier struct { sync.Mutex alerts []notifier.Alert diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index 028d8df24..0d7b7ba83 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -87,6 +87,7 @@ func (m *manager) startGroup(ctx context.Context, g *Group, restore bool) error m.wg.Add(1) id := g.ID() go func() { + defer m.wg.Done() // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics. if !skipRandSleepOnGroupStart { randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64))) @@ -111,8 +112,6 @@ func (m *manager) startGroup(ctx context.Context, g *Group, restore bool) error } else { g.start(ctx, m.notifiers, m.rw, nil) } - - m.wg.Done() }() m.groups[id] = g return nil @@ -168,6 +167,7 @@ func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore } for _, ng := range groupsRegistry { if err := m.startGroup(ctx, ng, restore); err != nil { + m.groupsMu.Unlock() return err } } @@ -181,6 +181,7 @@ func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore old.updateCh <- new wg.Done() }(item.old, item.new) + item.old.interruptEval() } wg.Wait() }