diff --git a/app/vmalert/group.go b/app/vmalert/group.go index d89f5037c3..3009a76507 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -303,6 +303,10 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r g.mu.Unlock() continue } + + // ensure that staleness is tracked or existing rules only + e.purgeStaleSeries(g.Rules) + if g.Interval != ng.Interval { g.Interval = ng.Interval t.Stop() @@ -457,6 +461,30 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim return staleS } +// purgeStaleSeries deletes references in tracked +// previouslySentSeriesToRW list to Rules which aren't present +// in the given activeRules list. The method is used when the list +// of loaded rules has changed and executor has to remove +// references to non-existing rules. +func (e *executor) purgeStaleSeries(activeRules []Rule) { + newPreviouslySentSeriesToRW := make(map[uint64]map[string][]prompbmarshal.Label) + + e.previouslySentSeriesToRWMu.Lock() + + for _, rule := range activeRules { + id := rule.ID() + prev, ok := e.previouslySentSeriesToRW[id] + if ok { + // keep previous series for staleness detection + newPreviouslySentSeriesToRW[id] = prev + } + } + e.previouslySentSeriesToRW = nil + e.previouslySentSeriesToRW = newPreviouslySentSeriesToRW + + e.previouslySentSeriesToRWMu.Unlock() +} + func labelsToString(labels []prompbmarshal.Label) string { var b strings.Builder b.WriteRune('{') diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index 8322e65d02..ab2cfe9511 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -355,3 +355,61 @@ func TestGetStaleSeries(t *testing.T) { [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, nil) } + +func TestPurgeStaleSeries(t *testing.T) { + ts := time.Now() + labels := toPromLabels(t, "__name__", "job:foo", "job", "foo") + tss := []prompbmarshal.TimeSeries{newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, labels)} + + f := func(curRules, newRules, expStaleRules []Rule) { + t.Helper() + e := &executor{ + previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + } + // seed executor with series for + // current rules + for _, rule := range curRules { + e.getStaleSeries(rule, tss, ts) + } + + e.purgeStaleSeries(newRules) + + if len(e.previouslySentSeriesToRW) != len(expStaleRules) { + t.Fatalf("expected to get %d stale series, got %d", + len(expStaleRules), len(e.previouslySentSeriesToRW)) + } + + for _, exp := range expStaleRules { + if _, ok := e.previouslySentSeriesToRW[exp.ID()]; !ok { + t.Fatalf("expected to have rule %d; got nil instead", exp.ID()) + } + } + } + + f(nil, nil, nil) + f( + nil, + []Rule{&AlertingRule{RuleID: 1}}, + nil, + ) + f( + []Rule{&AlertingRule{RuleID: 1}}, + nil, + nil, + ) + f( + []Rule{&AlertingRule{RuleID: 1}}, + []Rule{&AlertingRule{RuleID: 2}}, + nil, + ) + f( + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 2}}, + ) + f( + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, + ) +}