mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vmalert: cancel in-flight requests on group's update or close (#3886)
When group's update() or close() method is called, the group still need to wait for its current evaluation to finish. Sometimes, evaluation could take a significant amount of time which slows configuration update or vmalert's graceful shutdown. The change interrupts current evaluation in order to speed up the graceful shutdown or config update procedures. Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
2e153b68cd
commit
d6fa4da712
4 changed files with 88 additions and 5 deletions
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"net/url"
|
||||
|
@ -44,6 +45,9 @@ type Group struct {
|
|||
// channel accepts new Group obj
|
||||
// which supposed to update current group
|
||||
updateCh chan *Group
|
||||
// evalCancel stores the cancel fn for interrupting
|
||||
// rules evaluation. Used on groups update() and close().
|
||||
evalCancel context.CancelFunc
|
||||
|
||||
metrics *groupMetrics
|
||||
}
|
||||
|
@ -233,11 +237,24 @@ func (g *Group) updateWith(newGroup *Group) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// interruptEval interrupts in-flight rules evaluations
|
||||
// within the group. It is expected that g.evalCancel
|
||||
// will be repopulated after the call.
|
||||
func (g *Group) interruptEval() {
|
||||
g.mu.RLock()
|
||||
defer g.mu.RUnlock()
|
||||
|
||||
if g.evalCancel != nil {
|
||||
g.evalCancel()
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) close() {
|
||||
if g.doneCh == nil {
|
||||
return
|
||||
}
|
||||
close(g.doneCh)
|
||||
g.interruptEval()
|
||||
<-g.finishedCh
|
||||
|
||||
g.metrics.iterationDuration.Unregister()
|
||||
|
@ -263,7 +280,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
|
||||
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
||||
|
||||
eval := func(ts time.Time) {
|
||||
eval := func(ctx context.Context, ts time.Time) {
|
||||
g.metrics.iterationTotal.Inc()
|
||||
|
||||
start := time.Now()
|
||||
|
@ -285,7 +302,13 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
g.LastEvaluation = start
|
||||
}
|
||||
|
||||
eval(evalTS)
|
||||
evalCtx, cancel := context.WithCancel(ctx)
|
||||
g.mu.Lock()
|
||||
g.evalCancel = cancel
|
||||
g.mu.Unlock()
|
||||
defer g.evalCancel()
|
||||
|
||||
eval(evalCtx, evalTS)
|
||||
|
||||
t := time.NewTicker(g.Interval)
|
||||
defer t.Stop()
|
||||
|
@ -309,6 +332,14 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
return
|
||||
case ng := <-g.updateCh:
|
||||
g.mu.Lock()
|
||||
|
||||
// it is expected that g.evalCancel will be evoked
|
||||
// somewhere else to unblock group from the rules evaluation.
|
||||
// we recreate the evalCtx and g.evalCancel, so it can
|
||||
// be called again.
|
||||
evalCtx, cancel = context.WithCancel(ctx)
|
||||
g.evalCancel = cancel
|
||||
|
||||
err := g.updateWith(ng)
|
||||
if err != nil {
|
||||
logger.Errorf("group %q: failed to update: %s", g.Name, err)
|
||||
|
@ -333,7 +364,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
}
|
||||
evalTS = evalTS.Add((missed + 1) * g.Interval)
|
||||
|
||||
eval(evalTS)
|
||||
eval(evalCtx, evalTS)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -407,6 +438,11 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur
|
|||
|
||||
tss, err := rule.Exec(ctx, ts, limit)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// the context can be cancelled on graceful shutdown
|
||||
// or on group update. So no need to handle the error as usual.
|
||||
return nil
|
||||
}
|
||||
execErrors.Inc()
|
||||
return fmt.Errorf("rule %q: failed to execute: %w", rule, err)
|
||||
}
|
||||
|
|
|
@ -474,3 +474,31 @@ func TestFaultyRW(t *testing.T) {
|
|||
t.Fatalf("expected to get an error from faulty RW client, got nil instead")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCloseWithEvalInterruption(t *testing.T) {
|
||||
groups, err := config.Parse([]string{"config/testdata/rules/rules1-good.rules"}, notifier.ValidateTemplates, true)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to parse rules: %s", err)
|
||||
}
|
||||
|
||||
const delay = time.Second * 2
|
||||
fq := &fakeQuerierWithDelay{delay: delay}
|
||||
|
||||
const evalInterval = time.Millisecond
|
||||
g := newGroup(groups[0], fq, evalInterval, nil)
|
||||
|
||||
go g.start(context.Background(), nil, nil, nil)
|
||||
|
||||
time.Sleep(evalInterval * 20)
|
||||
|
||||
go func() {
|
||||
g.close()
|
||||
}()
|
||||
|
||||
deadline := time.Tick(delay / 2)
|
||||
select {
|
||||
case <-deadline:
|
||||
t.Fatalf("deadline for close exceeded")
|
||||
case <-g.finishedCh:
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,6 +104,24 @@ func (fqr *fakeQuerierWithRegistry) Query(_ context.Context, expr string, _ time
|
|||
return cp, req, nil
|
||||
}
|
||||
|
||||
type fakeQuerierWithDelay struct {
|
||||
fakeQuerier
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func (fqd *fakeQuerierWithDelay) Query(ctx context.Context, expr string, ts time.Time) ([]datasource.Metric, *http.Request, error) {
|
||||
timer := time.NewTimer(fqd.delay)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-timer.C:
|
||||
}
|
||||
return fqd.fakeQuerier.Query(ctx, expr, ts)
|
||||
}
|
||||
|
||||
func (fqd *fakeQuerierWithDelay) BuildWithParams(_ datasource.QuerierParams) datasource.Querier {
|
||||
return fqd
|
||||
}
|
||||
|
||||
type fakeNotifier struct {
|
||||
sync.Mutex
|
||||
alerts []notifier.Alert
|
||||
|
|
|
@ -87,6 +87,7 @@ func (m *manager) startGroup(ctx context.Context, g *Group, restore bool) error
|
|||
m.wg.Add(1)
|
||||
id := g.ID()
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
||||
if !skipRandSleepOnGroupStart {
|
||||
randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64)))
|
||||
|
@ -111,8 +112,6 @@ func (m *manager) startGroup(ctx context.Context, g *Group, restore bool) error
|
|||
} else {
|
||||
g.start(ctx, m.notifiers, m.rw, nil)
|
||||
}
|
||||
|
||||
m.wg.Done()
|
||||
}()
|
||||
m.groups[id] = g
|
||||
return nil
|
||||
|
@ -168,6 +167,7 @@ func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore
|
|||
}
|
||||
for _, ng := range groupsRegistry {
|
||||
if err := m.startGroup(ctx, ng, restore); err != nil {
|
||||
m.groupsMu.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -181,6 +181,7 @@ func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore
|
|||
old.updateCh <- new
|
||||
wg.Done()
|
||||
}(item.old, item.new)
|
||||
item.old.interruptEval()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue