From 52fbc7d2f5c9895ed319b438b32494d947662d1a Mon Sep 17 00:00:00 2001 From: Haley Wang Date: Sun, 14 Apr 2024 01:18:45 +0800 Subject: [PATCH] vmalert: avoid blocking APIs when alerting rule uses template functions `query` --- app/vmalert/rule/alerting.go | 131 ++++++++++++++++++------------ app/vmalert/rule/alerting_test.go | 36 ++++---- docs/CHANGELOG.md | 1 + docs/vmalert.md | 1 - 4 files changed, 98 insertions(+), 71 deletions(-) diff --git a/app/vmalert/rule/alerting.go b/app/vmalert/rule/alerting.go index 8f6d0e980..e36eeb57e 100644 --- a/app/vmalert/rule/alerting.go +++ b/app/vmalert/rule/alerting.go @@ -6,7 +6,7 @@ import ( "hash/fnv" "sort" "strings" - "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" @@ -36,9 +36,11 @@ type AlertingRule struct { q datasource.Querier - alertsMu sync.RWMutex // stores list of active alerts - alerts map[uint64]*notifier.Alert + alerts atomic.Pointer[map[uint64]*notifier.Alert] + + firingCount atomic.Int64 + pendingCount atomic.Int64 // state stores recent state changes // during evaluations @@ -78,7 +80,6 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule Headers: group.Headers, Debug: cfg.Debug, }), - alerts: make(map[uint64]*notifier.Alert), metrics: &alertingRuleMetrics{}, } @@ -96,27 +97,11 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule labels := fmt.Sprintf(`alertname=%q, group=%q, file=%q, id="%d"`, ar.Name, group.Name, group.File, ar.ID()) ar.metrics.pending = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels), func() float64 { - ar.alertsMu.RLock() - defer ar.alertsMu.RUnlock() - var num int - for _, a := range ar.alerts { - if a.State == notifier.StatePending { - num++ - } - } - return float64(num) + return float64(ar.pendingCount.Load()) }) ar.metrics.active = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels), func() float64 { - ar.alertsMu.RLock() - defer ar.alertsMu.RUnlock() - var num int - for _, a := range ar.alerts { - if a.State == notifier.StateFiring { - num++ - } - } - return float64(num) + return float64(ar.firingCount.Load()) }) ar.metrics.errors = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_alerting_rules_errors_total{%s}`, labels)) ar.metrics.samples = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_last_evaluation_samples{%s}`, labels), @@ -144,6 +129,7 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule // close unregisters rule metrics func (ar *AlertingRule) close() { + ar.storeAlertState(nil) ar.metrics.active.Unregister() ar.metrics.pending.Unregister() ar.metrics.errors.Unregister() @@ -164,10 +150,8 @@ func (ar *AlertingRule) ID() uint64 { // GetAlerts returns active alerts of rule func (ar *AlertingRule) GetAlerts() []*notifier.Alert { - ar.alertsMu.RLock() - defer ar.alertsMu.RUnlock() var alerts []*notifier.Alert - for _, a := range ar.alerts { + for _, a := range ar.loadAlerts() { alerts = append(alerts, a) } return alerts @@ -175,12 +159,7 @@ func (ar *AlertingRule) GetAlerts() []*notifier.Alert { // GetAlert returns alert if id exists func (ar *AlertingRule) GetAlert(id uint64) *notifier.Alert { - ar.alertsMu.RLock() - defer ar.alertsMu.RUnlock() - if ar.alerts == nil { - return nil - } - return ar.alerts[id] + return ar.loadAlert(id) } func (ar *AlertingRule) logDebugf(at time.Time, a *notifier.Alert, format string, args ...interface{}) { @@ -229,6 +208,50 @@ func (ar *AlertingRule) updateWith(r Rule) error { return nil } +// loadAlert returns copied alert if id exists +func (ar *AlertingRule) loadAlert(id uint64) *notifier.Alert { + activeAlerts := ar.alerts.Load() + if activeAlerts == nil { + return nil + } + for k, v := range *activeAlerts { + if k == id { + na := *v + return &na + } + } + return nil +} + +// loadAlerts returns copied alerts +func (ar *AlertingRule) loadAlerts() map[uint64]*notifier.Alert { + activeAlerts := ar.alerts.Load() + if activeAlerts == nil { + return map[uint64]*notifier.Alert{} + } + newAlerts := make(map[uint64]*notifier.Alert) + for k, v := range *activeAlerts { + na := *v + newAlerts[k] = &na + } + return newAlerts +} + +// storeAlertState updates firingCount, pendingCount and active alerts +func (ar *AlertingRule) storeAlertState(alerts map[uint64]*notifier.Alert) { + var firingCount, pendingCount int64 + for _, a := range alerts { + if a.State == notifier.StateFiring { + firingCount++ + } else if a.State == notifier.StatePending { + pendingCount++ + } + } + ar.firingCount.Store(firingCount) + ar.pendingCount.Store(pendingCount) + ar.alerts.Store(&alerts) +} + type labelSet struct { // origin labels extracted from received time series // plus extra labels (group labels, service labels like alertNameLabel). @@ -313,6 +336,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([] qFn := func(_ string) ([]datasource.Metric, error) { return nil, fmt.Errorf("`query` template isn't supported in replay mode") } + activeAlerts := ar.loadAlerts() for _, s := range res.Data { ls, err := ar.toLabels(s, qFn) if err != nil { @@ -329,8 +353,8 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([] at := time.Unix(s.Timestamps[i], 0) // try to restore alert's state on the first iteration if at.Equal(start) { - if _, ok := ar.alerts[h]; ok { - a = ar.alerts[h] + if _, ok := activeAlerts[h]; ok { + a = activeAlerts[h] prevT = at } } @@ -356,7 +380,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([] } } } - ar.alerts = holdAlertState + ar.storeAlertState(holdAlertState) return result, nil } @@ -386,20 +410,22 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr } }() - ar.alertsMu.Lock() - defer ar.alertsMu.Unlock() - if err != nil { return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err) } ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration) - for h, a := range ar.alerts { + activeAlerts := ar.loadAlerts() + defer func() { + ar.storeAlertState(activeAlerts) + }() + + for h, a := range activeAlerts { // cleanup inactive alerts from previous Exec if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention { ar.logDebugf(ts, a, "deleted as inactive") - delete(ar.alerts, h) + delete(activeAlerts, h) } } @@ -422,7 +448,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr return nil, curState.Err } updated[h] = struct{}{} - if a, ok := ar.alerts[h]; ok { + if a, ok := activeAlerts[h]; ok { if a.State == notifier.StateInactive { // alert could be in inactive state for resolvedRetention // so when we again receive metrics for it - we switch it @@ -447,18 +473,18 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr } a.ID = h a.State = notifier.StatePending - ar.alerts[h] = a + activeAlerts[h] = a ar.logDebugf(ts, a, "created in state PENDING") } var numActivePending int - for h, a := range ar.alerts { + for h, a := range activeAlerts { // if alert wasn't updated in this iteration // means it is resolved already if _, ok := updated[h]; !ok { if a.State == notifier.StatePending { // alert was in Pending state - it is not // active anymore - delete(ar.alerts, h) + delete(activeAlerts, h) ar.logDebugf(ts, a, "PENDING => DELETED: is absent in current evaluation round") continue } @@ -490,16 +516,16 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr } } if limit > 0 && numActivePending > limit { - ar.alerts = map[uint64]*notifier.Alert{} + activeAlerts = nil curState.Err = fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending) return nil, curState.Err } - return ar.toTimeSeries(ts.Unix()), nil + return ar.toTimeSeries(ts.Unix(), activeAlerts), nil } -func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries { +func (ar *AlertingRule) toTimeSeries(timestamp int64, activeAlerts map[uint64]*notifier.Alert) []prompbmarshal.TimeSeries { var tss []prompbmarshal.TimeSeries - for _, a := range ar.alerts { + for _, a := range activeAlerts { if a.State == notifier.StateInactive { continue } @@ -604,10 +630,8 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti return nil } - ar.alertsMu.Lock() - defer ar.alertsMu.Unlock() - - if len(ar.alerts) < 1 { + activeAlerts := ar.loadAlerts() + if len(activeAlerts) < 1 { return nil } @@ -638,7 +662,7 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti labelSet[v.Name] = v.Value } id := hash(labelSet) - a, ok := ar.alerts[id] + a, ok := activeAlerts[id] if !ok { continue } @@ -649,6 +673,7 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti a.Restored = true logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt) } + ar.storeAlertState(activeAlerts) return nil } @@ -671,7 +696,7 @@ func (ar *AlertingRule) alertsToSend(resolveDuration, resendDelay time.Duration) } var alerts []notifier.Alert - for _, a := range ar.alerts { + for _, a := range ar.loadAlerts() { if !needsSending(a) { continue } diff --git a/app/vmalert/rule/alerting_test.go b/app/vmalert/rule/alerting_test.go index 07dda4273..2fd512001 100644 --- a/app/vmalert/rule/alerting_test.go +++ b/app/vmalert/rule/alerting_test.go @@ -121,8 +121,9 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) { } for _, tc := range testCases { t.Run(tc.rule.Name, func(t *testing.T) { - tc.rule.alerts[tc.alert.ID] = tc.alert - tss := tc.rule.toTimeSeries(timestamp.Unix()) + alerts := map[uint64]*notifier.Alert{tc.alert.ID: tc.alert} + tc.rule.storeAlertState(alerts) + tss := tc.rule.toTimeSeries(timestamp.Unix(), alerts) if err := compareTimeSeries(t, tc.expTS, tss); err != nil { t.Fatalf("timeseries missmatch: %s", err) } @@ -346,8 +347,8 @@ func TestAlertingRule_Exec(t *testing.T) { if _, ok := tc.expAlerts[i]; !ok { continue } - if len(tc.rule.alerts) != len(tc.expAlerts[i]) { - t.Fatalf("evalIndex %d: expected %d alerts; got %d", i, len(tc.expAlerts[i]), len(tc.rule.alerts)) + if len(tc.rule.loadAlerts()) != len(tc.expAlerts[i]) { + t.Fatalf("evalIndex %d: expected %d alerts; got %d", i, len(tc.expAlerts[i]), len(tc.rule.loadAlerts())) } expAlerts := make(map[uint64]*notifier.Alert) for _, ta := range tc.expAlerts[i] { @@ -360,8 +361,9 @@ func TestAlertingRule_Exec(t *testing.T) { h := hash(labels) expAlerts[h] = ta.alert } + activeAlerts := tc.rule.loadAlerts() for key, exp := range expAlerts { - got, ok := tc.rule.alerts[key] + got, ok := activeAlerts[key] if !ok { t.Fatalf("evalIndex %d: expected to have key %d", i, key) } @@ -640,8 +642,8 @@ func TestAlertingRule_ExecRange(t *testing.T) { } } if tc.expHoldAlertStateAlerts != nil { - if !reflect.DeepEqual(tc.expHoldAlertStateAlerts, tc.rule.alerts) { - t.Fatalf("expected hold alerts state: \n%v but got \n%v", tc.expHoldAlertStateAlerts, tc.rule.alerts) + if !reflect.DeepEqual(tc.expHoldAlertStateAlerts, tc.rule.loadAlerts()) { + t.Fatalf("expected hold alerts state: \n%v but got \n%v", tc.expHoldAlertStateAlerts, tc.rule.loadAlerts()) } } }) @@ -662,17 +664,18 @@ func TestGroup_Restore(t *testing.T) { fg := NewGroup(config.Group{Name: "TestRestore", Rules: rules}, fqr, time.Second, nil) wg := sync.WaitGroup{} wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) go func() { nts := func() []notifier.Notifier { return []notifier.Notifier{¬ifier.FakeNotifier{}} } - fg.Start(context.Background(), nts, nil, fqr) + fg.Start(ctx, nts, nil, fqr) wg.Done() }() - fg.Close() + cancel() wg.Wait() gotAlerts := make(map[uint64]*notifier.Alert) for _, rs := range fg.Rules { - alerts := rs.(*AlertingRule).alerts + alerts := rs.(*AlertingRule).loadAlerts() for k, v := range alerts { if !v.Restored { // set not restored alerts to predictable timestamp @@ -909,7 +912,6 @@ func TestAlertingRule_Template(t *testing.T) { Annotations: map[string]string{ "summary": `{{ $labels.alertname }}: Too high connection number for "{{ $labels.instance }}"`, }, - alerts: make(map[uint64]*notifier.Alert), }, []datasource.Metric{ metricWithValueAndLabels(t, 1, "instance", "foo"), @@ -948,7 +950,6 @@ func TestAlertingRule_Template(t *testing.T) { "summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}"`, "description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`, }, - alerts: make(map[uint64]*notifier.Alert), }, []datasource.Metric{ metricWithValueAndLabels(t, 2, "__name__", "first", "instance", "foo", alertNameLabel, "override"), @@ -989,7 +990,6 @@ func TestAlertingRule_Template(t *testing.T) { Annotations: map[string]string{ "summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`, }, - alerts: make(map[uint64]*notifier.Alert), }, []datasource.Metric{ metricWithValueAndLabels(t, 1, @@ -1030,8 +1030,9 @@ func TestAlertingRule_Template(t *testing.T) { if _, err := tc.rule.exec(context.TODO(), time.Now(), 0); err != nil { t.Fatalf("unexpected err: %s", err) } + activeAlerts := tc.rule.loadAlerts() for hash, expAlert := range tc.expAlerts { - gotAlert := tc.rule.alerts[hash] + gotAlert := activeAlerts[hash] if gotAlert == nil { t.Fatalf("alert %d is missing; labels: %v; annotations: %v", hash, expAlert.Labels, expAlert.Annotations) } @@ -1050,10 +1051,12 @@ func TestAlertsToSend(t *testing.T) { ts := time.Now() f := func(alerts, expAlerts []*notifier.Alert, resolveDuration, resendDelay time.Duration) { t.Helper() - ar := &AlertingRule{alerts: make(map[uint64]*notifier.Alert)} + ar := &AlertingRule{} + activeAlerts := make(map[uint64]*notifier.Alert) for i, a := range alerts { - ar.alerts[uint64(i)] = a + activeAlerts[uint64(i)] = a } + ar.storeAlertState(activeAlerts) gotAlerts := ar.alertsToSend(resolveDuration, resendDelay) if gotAlerts == nil && expAlerts == nil { return @@ -1116,7 +1119,6 @@ func newTestAlertingRule(name string, waitFor time.Duration) *AlertingRule { Name: name, For: waitFor, EvalInterval: waitFor, - alerts: make(map[uint64]*notifier.Alert), state: &ruleState{entries: make([]StateEntry, 10)}, metrics: &alertingRuleMetrics{ errors: utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_alerting_rules_errors_total{alertname=%q}`, name)), diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ca543f0cd..5076e26a1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): avoid blocking `/api/v1/rules`, `/api/v1/alerts`, `/metrics` APIs when alerting rule uses template functions `query`, which could take long time to execute. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079). ## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1) diff --git a/docs/vmalert.md b/docs/vmalert.md index 362b8f3ef..d06b756b3 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -783,7 +783,6 @@ See full description for these flags in `./vmalert -help`. ### Limitations * Graphite engine isn't supported yet; -* `query` template function is disabled for performance reasons (might be changed in future); * `limit` group's param has no effect during replay (might be changed in future); * `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).