vmalert: avoid blocking APIs when alerting rule uses template functions query

This commit is contained in:
Haley Wang 2024-04-14 01:18:45 +08:00
parent cba2f6dce1
commit 52fbc7d2f5
No known key found for this signature in database
GPG key ID: C6299A8A1D6CC50C
4 changed files with 98 additions and 71 deletions

View file

@ -6,7 +6,7 @@ import (
"hash/fnv" "hash/fnv"
"sort" "sort"
"strings" "strings"
"sync" "sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
@ -36,9 +36,11 @@ type AlertingRule struct {
q datasource.Querier q datasource.Querier
alertsMu sync.RWMutex
// stores list of active alerts // stores list of active alerts
alerts map[uint64]*notifier.Alert alerts atomic.Pointer[map[uint64]*notifier.Alert]
firingCount atomic.Int64
pendingCount atomic.Int64
// state stores recent state changes // state stores recent state changes
// during evaluations // during evaluations
@ -78,7 +80,6 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
Headers: group.Headers, Headers: group.Headers,
Debug: cfg.Debug, Debug: cfg.Debug,
}), }),
alerts: make(map[uint64]*notifier.Alert),
metrics: &alertingRuleMetrics{}, metrics: &alertingRuleMetrics{},
} }
@ -96,27 +97,11 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
labels := fmt.Sprintf(`alertname=%q, group=%q, file=%q, id="%d"`, ar.Name, group.Name, group.File, ar.ID()) labels := fmt.Sprintf(`alertname=%q, group=%q, file=%q, id="%d"`, ar.Name, group.Name, group.File, ar.ID())
ar.metrics.pending = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels), ar.metrics.pending = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels),
func() float64 { func() float64 {
ar.alertsMu.RLock() return float64(ar.pendingCount.Load())
defer ar.alertsMu.RUnlock()
var num int
for _, a := range ar.alerts {
if a.State == notifier.StatePending {
num++
}
}
return float64(num)
}) })
ar.metrics.active = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels), ar.metrics.active = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels),
func() float64 { func() float64 {
ar.alertsMu.RLock() return float64(ar.firingCount.Load())
defer ar.alertsMu.RUnlock()
var num int
for _, a := range ar.alerts {
if a.State == notifier.StateFiring {
num++
}
}
return float64(num)
}) })
ar.metrics.errors = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_alerting_rules_errors_total{%s}`, labels)) ar.metrics.errors = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_alerting_rules_errors_total{%s}`, labels))
ar.metrics.samples = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_last_evaluation_samples{%s}`, labels), ar.metrics.samples = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_last_evaluation_samples{%s}`, labels),
@ -144,6 +129,7 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
// close unregisters rule metrics // close unregisters rule metrics
func (ar *AlertingRule) close() { func (ar *AlertingRule) close() {
ar.storeAlertState(nil)
ar.metrics.active.Unregister() ar.metrics.active.Unregister()
ar.metrics.pending.Unregister() ar.metrics.pending.Unregister()
ar.metrics.errors.Unregister() ar.metrics.errors.Unregister()
@ -164,10 +150,8 @@ func (ar *AlertingRule) ID() uint64 {
// GetAlerts returns active alerts of rule // GetAlerts returns active alerts of rule
func (ar *AlertingRule) GetAlerts() []*notifier.Alert { func (ar *AlertingRule) GetAlerts() []*notifier.Alert {
ar.alertsMu.RLock()
defer ar.alertsMu.RUnlock()
var alerts []*notifier.Alert var alerts []*notifier.Alert
for _, a := range ar.alerts { for _, a := range ar.loadAlerts() {
alerts = append(alerts, a) alerts = append(alerts, a)
} }
return alerts return alerts
@ -175,12 +159,7 @@ func (ar *AlertingRule) GetAlerts() []*notifier.Alert {
// GetAlert returns alert if id exists // GetAlert returns alert if id exists
func (ar *AlertingRule) GetAlert(id uint64) *notifier.Alert { func (ar *AlertingRule) GetAlert(id uint64) *notifier.Alert {
ar.alertsMu.RLock() return ar.loadAlert(id)
defer ar.alertsMu.RUnlock()
if ar.alerts == nil {
return nil
}
return ar.alerts[id]
} }
func (ar *AlertingRule) logDebugf(at time.Time, a *notifier.Alert, format string, args ...interface{}) { func (ar *AlertingRule) logDebugf(at time.Time, a *notifier.Alert, format string, args ...interface{}) {
@ -229,6 +208,50 @@ func (ar *AlertingRule) updateWith(r Rule) error {
return nil return nil
} }
// loadAlert returns copied alert if id exists
func (ar *AlertingRule) loadAlert(id uint64) *notifier.Alert {
activeAlerts := ar.alerts.Load()
if activeAlerts == nil {
return nil
}
for k, v := range *activeAlerts {
if k == id {
na := *v
return &na
}
}
return nil
}
// loadAlerts returns copied alerts
func (ar *AlertingRule) loadAlerts() map[uint64]*notifier.Alert {
activeAlerts := ar.alerts.Load()
if activeAlerts == nil {
return map[uint64]*notifier.Alert{}
}
newAlerts := make(map[uint64]*notifier.Alert)
for k, v := range *activeAlerts {
na := *v
newAlerts[k] = &na
}
return newAlerts
}
// storeAlertState updates firingCount, pendingCount and active alerts
func (ar *AlertingRule) storeAlertState(alerts map[uint64]*notifier.Alert) {
var firingCount, pendingCount int64
for _, a := range alerts {
if a.State == notifier.StateFiring {
firingCount++
} else if a.State == notifier.StatePending {
pendingCount++
}
}
ar.firingCount.Store(firingCount)
ar.pendingCount.Store(pendingCount)
ar.alerts.Store(&alerts)
}
type labelSet struct { type labelSet struct {
// origin labels extracted from received time series // origin labels extracted from received time series
// plus extra labels (group labels, service labels like alertNameLabel). // plus extra labels (group labels, service labels like alertNameLabel).
@ -313,6 +336,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
qFn := func(_ string) ([]datasource.Metric, error) { qFn := func(_ string) ([]datasource.Metric, error) {
return nil, fmt.Errorf("`query` template isn't supported in replay mode") return nil, fmt.Errorf("`query` template isn't supported in replay mode")
} }
activeAlerts := ar.loadAlerts()
for _, s := range res.Data { for _, s := range res.Data {
ls, err := ar.toLabels(s, qFn) ls, err := ar.toLabels(s, qFn)
if err != nil { if err != nil {
@ -329,8 +353,8 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
at := time.Unix(s.Timestamps[i], 0) at := time.Unix(s.Timestamps[i], 0)
// try to restore alert's state on the first iteration // try to restore alert's state on the first iteration
if at.Equal(start) { if at.Equal(start) {
if _, ok := ar.alerts[h]; ok { if _, ok := activeAlerts[h]; ok {
a = ar.alerts[h] a = activeAlerts[h]
prevT = at prevT = at
} }
} }
@ -356,7 +380,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
} }
} }
} }
ar.alerts = holdAlertState ar.storeAlertState(holdAlertState)
return result, nil return result, nil
} }
@ -386,20 +410,22 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
} }
}() }()
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err) return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err)
} }
ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration) ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration)
for h, a := range ar.alerts { activeAlerts := ar.loadAlerts()
defer func() {
ar.storeAlertState(activeAlerts)
}()
for h, a := range activeAlerts {
// cleanup inactive alerts from previous Exec // cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention { if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention {
ar.logDebugf(ts, a, "deleted as inactive") ar.logDebugf(ts, a, "deleted as inactive")
delete(ar.alerts, h) delete(activeAlerts, h)
} }
} }
@ -422,7 +448,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
return nil, curState.Err return nil, curState.Err
} }
updated[h] = struct{}{} updated[h] = struct{}{}
if a, ok := ar.alerts[h]; ok { if a, ok := activeAlerts[h]; ok {
if a.State == notifier.StateInactive { if a.State == notifier.StateInactive {
// alert could be in inactive state for resolvedRetention // alert could be in inactive state for resolvedRetention
// so when we again receive metrics for it - we switch it // so when we again receive metrics for it - we switch it
@ -447,18 +473,18 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
} }
a.ID = h a.ID = h
a.State = notifier.StatePending a.State = notifier.StatePending
ar.alerts[h] = a activeAlerts[h] = a
ar.logDebugf(ts, a, "created in state PENDING") ar.logDebugf(ts, a, "created in state PENDING")
} }
var numActivePending int var numActivePending int
for h, a := range ar.alerts { for h, a := range activeAlerts {
// if alert wasn't updated in this iteration // if alert wasn't updated in this iteration
// means it is resolved already // means it is resolved already
if _, ok := updated[h]; !ok { if _, ok := updated[h]; !ok {
if a.State == notifier.StatePending { if a.State == notifier.StatePending {
// alert was in Pending state - it is not // alert was in Pending state - it is not
// active anymore // active anymore
delete(ar.alerts, h) delete(activeAlerts, h)
ar.logDebugf(ts, a, "PENDING => DELETED: is absent in current evaluation round") ar.logDebugf(ts, a, "PENDING => DELETED: is absent in current evaluation round")
continue continue
} }
@ -490,16 +516,16 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
} }
} }
if limit > 0 && numActivePending > limit { if limit > 0 && numActivePending > limit {
ar.alerts = map[uint64]*notifier.Alert{} activeAlerts = nil
curState.Err = fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending) curState.Err = fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending)
return nil, curState.Err return nil, curState.Err
} }
return ar.toTimeSeries(ts.Unix()), nil return ar.toTimeSeries(ts.Unix(), activeAlerts), nil
} }
func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries { func (ar *AlertingRule) toTimeSeries(timestamp int64, activeAlerts map[uint64]*notifier.Alert) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
for _, a := range ar.alerts { for _, a := range activeAlerts {
if a.State == notifier.StateInactive { if a.State == notifier.StateInactive {
continue continue
} }
@ -604,10 +630,8 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
return nil return nil
} }
ar.alertsMu.Lock() activeAlerts := ar.loadAlerts()
defer ar.alertsMu.Unlock() if len(activeAlerts) < 1 {
if len(ar.alerts) < 1 {
return nil return nil
} }
@ -638,7 +662,7 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
labelSet[v.Name] = v.Value labelSet[v.Name] = v.Value
} }
id := hash(labelSet) id := hash(labelSet)
a, ok := ar.alerts[id] a, ok := activeAlerts[id]
if !ok { if !ok {
continue continue
} }
@ -649,6 +673,7 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
a.Restored = true a.Restored = true
logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt) logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt)
} }
ar.storeAlertState(activeAlerts)
return nil return nil
} }
@ -671,7 +696,7 @@ func (ar *AlertingRule) alertsToSend(resolveDuration, resendDelay time.Duration)
} }
var alerts []notifier.Alert var alerts []notifier.Alert
for _, a := range ar.alerts { for _, a := range ar.loadAlerts() {
if !needsSending(a) { if !needsSending(a) {
continue continue
} }

View file

@ -121,8 +121,9 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
tc.rule.alerts[tc.alert.ID] = tc.alert alerts := map[uint64]*notifier.Alert{tc.alert.ID: tc.alert}
tss := tc.rule.toTimeSeries(timestamp.Unix()) tc.rule.storeAlertState(alerts)
tss := tc.rule.toTimeSeries(timestamp.Unix(), alerts)
if err := compareTimeSeries(t, tc.expTS, tss); err != nil { if err := compareTimeSeries(t, tc.expTS, tss); err != nil {
t.Fatalf("timeseries missmatch: %s", err) t.Fatalf("timeseries missmatch: %s", err)
} }
@ -346,8 +347,8 @@ func TestAlertingRule_Exec(t *testing.T) {
if _, ok := tc.expAlerts[i]; !ok { if _, ok := tc.expAlerts[i]; !ok {
continue continue
} }
if len(tc.rule.alerts) != len(tc.expAlerts[i]) { if len(tc.rule.loadAlerts()) != len(tc.expAlerts[i]) {
t.Fatalf("evalIndex %d: expected %d alerts; got %d", i, len(tc.expAlerts[i]), len(tc.rule.alerts)) t.Fatalf("evalIndex %d: expected %d alerts; got %d", i, len(tc.expAlerts[i]), len(tc.rule.loadAlerts()))
} }
expAlerts := make(map[uint64]*notifier.Alert) expAlerts := make(map[uint64]*notifier.Alert)
for _, ta := range tc.expAlerts[i] { for _, ta := range tc.expAlerts[i] {
@ -360,8 +361,9 @@ func TestAlertingRule_Exec(t *testing.T) {
h := hash(labels) h := hash(labels)
expAlerts[h] = ta.alert expAlerts[h] = ta.alert
} }
activeAlerts := tc.rule.loadAlerts()
for key, exp := range expAlerts { for key, exp := range expAlerts {
got, ok := tc.rule.alerts[key] got, ok := activeAlerts[key]
if !ok { if !ok {
t.Fatalf("evalIndex %d: expected to have key %d", i, key) t.Fatalf("evalIndex %d: expected to have key %d", i, key)
} }
@ -640,8 +642,8 @@ func TestAlertingRule_ExecRange(t *testing.T) {
} }
} }
if tc.expHoldAlertStateAlerts != nil { if tc.expHoldAlertStateAlerts != nil {
if !reflect.DeepEqual(tc.expHoldAlertStateAlerts, tc.rule.alerts) { if !reflect.DeepEqual(tc.expHoldAlertStateAlerts, tc.rule.loadAlerts()) {
t.Fatalf("expected hold alerts state: \n%v but got \n%v", tc.expHoldAlertStateAlerts, tc.rule.alerts) t.Fatalf("expected hold alerts state: \n%v but got \n%v", tc.expHoldAlertStateAlerts, tc.rule.loadAlerts())
} }
} }
}) })
@ -662,17 +664,18 @@ func TestGroup_Restore(t *testing.T) {
fg := NewGroup(config.Group{Name: "TestRestore", Rules: rules}, fqr, time.Second, nil) fg := NewGroup(config.Group{Name: "TestRestore", Rules: rules}, fqr, time.Second, nil)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
nts := func() []notifier.Notifier { return []notifier.Notifier{&notifier.FakeNotifier{}} } nts := func() []notifier.Notifier { return []notifier.Notifier{&notifier.FakeNotifier{}} }
fg.Start(context.Background(), nts, nil, fqr) fg.Start(ctx, nts, nil, fqr)
wg.Done() wg.Done()
}() }()
fg.Close() cancel()
wg.Wait() wg.Wait()
gotAlerts := make(map[uint64]*notifier.Alert) gotAlerts := make(map[uint64]*notifier.Alert)
for _, rs := range fg.Rules { for _, rs := range fg.Rules {
alerts := rs.(*AlertingRule).alerts alerts := rs.(*AlertingRule).loadAlerts()
for k, v := range alerts { for k, v := range alerts {
if !v.Restored { if !v.Restored {
// set not restored alerts to predictable timestamp // set not restored alerts to predictable timestamp
@ -909,7 +912,6 @@ func TestAlertingRule_Template(t *testing.T) {
Annotations: map[string]string{ Annotations: map[string]string{
"summary": `{{ $labels.alertname }}: Too high connection number for "{{ $labels.instance }}"`, "summary": `{{ $labels.alertname }}: Too high connection number for "{{ $labels.instance }}"`,
}, },
alerts: make(map[uint64]*notifier.Alert),
}, },
[]datasource.Metric{ []datasource.Metric{
metricWithValueAndLabels(t, 1, "instance", "foo"), metricWithValueAndLabels(t, 1, "instance", "foo"),
@ -948,7 +950,6 @@ func TestAlertingRule_Template(t *testing.T) {
"summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}"`, "summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}"`,
"description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`, "description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`,
}, },
alerts: make(map[uint64]*notifier.Alert),
}, },
[]datasource.Metric{ []datasource.Metric{
metricWithValueAndLabels(t, 2, "__name__", "first", "instance", "foo", alertNameLabel, "override"), metricWithValueAndLabels(t, 2, "__name__", "first", "instance", "foo", alertNameLabel, "override"),
@ -989,7 +990,6 @@ func TestAlertingRule_Template(t *testing.T) {
Annotations: map[string]string{ Annotations: map[string]string{
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`, "summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`,
}, },
alerts: make(map[uint64]*notifier.Alert),
}, },
[]datasource.Metric{ []datasource.Metric{
metricWithValueAndLabels(t, 1, metricWithValueAndLabels(t, 1,
@ -1030,8 +1030,9 @@ func TestAlertingRule_Template(t *testing.T) {
if _, err := tc.rule.exec(context.TODO(), time.Now(), 0); err != nil { if _, err := tc.rule.exec(context.TODO(), time.Now(), 0); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
activeAlerts := tc.rule.loadAlerts()
for hash, expAlert := range tc.expAlerts { for hash, expAlert := range tc.expAlerts {
gotAlert := tc.rule.alerts[hash] gotAlert := activeAlerts[hash]
if gotAlert == nil { if gotAlert == nil {
t.Fatalf("alert %d is missing; labels: %v; annotations: %v", hash, expAlert.Labels, expAlert.Annotations) t.Fatalf("alert %d is missing; labels: %v; annotations: %v", hash, expAlert.Labels, expAlert.Annotations)
} }
@ -1050,10 +1051,12 @@ func TestAlertsToSend(t *testing.T) {
ts := time.Now() ts := time.Now()
f := func(alerts, expAlerts []*notifier.Alert, resolveDuration, resendDelay time.Duration) { f := func(alerts, expAlerts []*notifier.Alert, resolveDuration, resendDelay time.Duration) {
t.Helper() t.Helper()
ar := &AlertingRule{alerts: make(map[uint64]*notifier.Alert)} ar := &AlertingRule{}
activeAlerts := make(map[uint64]*notifier.Alert)
for i, a := range alerts { for i, a := range alerts {
ar.alerts[uint64(i)] = a activeAlerts[uint64(i)] = a
} }
ar.storeAlertState(activeAlerts)
gotAlerts := ar.alertsToSend(resolveDuration, resendDelay) gotAlerts := ar.alertsToSend(resolveDuration, resendDelay)
if gotAlerts == nil && expAlerts == nil { if gotAlerts == nil && expAlerts == nil {
return return
@ -1116,7 +1119,6 @@ func newTestAlertingRule(name string, waitFor time.Duration) *AlertingRule {
Name: name, Name: name,
For: waitFor, For: waitFor,
EvalInterval: waitFor, EvalInterval: waitFor,
alerts: make(map[uint64]*notifier.Alert),
state: &ruleState{entries: make([]StateEntry, 10)}, state: &ruleState{entries: make([]StateEntry, 10)},
metrics: &alertingRuleMetrics{ metrics: &alertingRuleMetrics{
errors: utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_alerting_rules_errors_total{alertname=%q}`, name)), errors: utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_alerting_rules_errors_total{alertname=%q}`, name)),

View file

@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip ## tip
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): avoid blocking `/api/v1/rules`, `/api/v1/alerts`, `/metrics` APIs when alerting rule uses template functions `query`, which could take long time to execute. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079).
## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1) ## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1)

View file

@ -783,7 +783,6 @@ See full description for these flags in `./vmalert -help`.
### Limitations ### Limitations
* Graphite engine isn't supported yet; * Graphite engine isn't supported yet;
* `query` template function is disabled for performance reasons (might be changed in future);
* `limit` group's param has no effect during replay (might be changed in future); * `limit` group's param has no effect during replay (might be changed in future);
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future). * `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).