From 3f0ecee12882d476dbfa03ddae81ef075e9254a4 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Fri, 13 May 2022 10:04:49 +0200 Subject: [PATCH] vmalert: properly cleanup stale series tracker on rules update (#2577) Rules executor within group tracks series sent to remote write in order to mark them as stale if they had disappeared in next evaluation round. The executor uses rules ID as a key to identifies series which belong to rule. On config reload, executor remains active but the set of rules could change. Hence, we need to properly cleanup the tracker for rules which has been disappeared on config reload. Signed-off-by: hagen1778 --- app/vmalert/group.go | 28 +++++++++++++++++++ app/vmalert/group_test.go | 58 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/app/vmalert/group.go b/app/vmalert/group.go index d89f5037c..3009a7650 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -303,6 +303,10 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r g.mu.Unlock() continue } + + // ensure that staleness is tracked or existing rules only + e.purgeStaleSeries(g.Rules) + if g.Interval != ng.Interval { g.Interval = ng.Interval t.Stop() @@ -457,6 +461,30 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim return staleS } +// purgeStaleSeries deletes references in tracked +// previouslySentSeriesToRW list to Rules which aren't present +// in the given activeRules list. The method is used when the list +// of loaded rules has changed and executor has to remove +// references to non-existing rules. +func (e *executor) purgeStaleSeries(activeRules []Rule) { + newPreviouslySentSeriesToRW := make(map[uint64]map[string][]prompbmarshal.Label) + + e.previouslySentSeriesToRWMu.Lock() + + for _, rule := range activeRules { + id := rule.ID() + prev, ok := e.previouslySentSeriesToRW[id] + if ok { + // keep previous series for staleness detection + newPreviouslySentSeriesToRW[id] = prev + } + } + e.previouslySentSeriesToRW = nil + e.previouslySentSeriesToRW = newPreviouslySentSeriesToRW + + e.previouslySentSeriesToRWMu.Unlock() +} + func labelsToString(labels []prompbmarshal.Label) string { var b strings.Builder b.WriteRune('{') diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index 8322e65d0..ab2cfe951 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -355,3 +355,61 @@ func TestGetStaleSeries(t *testing.T) { [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, nil) } + +func TestPurgeStaleSeries(t *testing.T) { + ts := time.Now() + labels := toPromLabels(t, "__name__", "job:foo", "job", "foo") + tss := []prompbmarshal.TimeSeries{newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, labels)} + + f := func(curRules, newRules, expStaleRules []Rule) { + t.Helper() + e := &executor{ + previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + } + // seed executor with series for + // current rules + for _, rule := range curRules { + e.getStaleSeries(rule, tss, ts) + } + + e.purgeStaleSeries(newRules) + + if len(e.previouslySentSeriesToRW) != len(expStaleRules) { + t.Fatalf("expected to get %d stale series, got %d", + len(expStaleRules), len(e.previouslySentSeriesToRW)) + } + + for _, exp := range expStaleRules { + if _, ok := e.previouslySentSeriesToRW[exp.ID()]; !ok { + t.Fatalf("expected to have rule %d; got nil instead", exp.ID()) + } + } + } + + f(nil, nil, nil) + f( + nil, + []Rule{&AlertingRule{RuleID: 1}}, + nil, + ) + f( + []Rule{&AlertingRule{RuleID: 1}}, + nil, + nil, + ) + f( + []Rule{&AlertingRule{RuleID: 1}}, + []Rule{&AlertingRule{RuleID: 2}}, + nil, + ) + f( + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 2}}, + ) + f( + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + ) +}