vmalert: avoid blocking APIs when alerting rule uses template functio… (#6129)

* vmalert: avoid blocking APIs when alerting rule uses template function `query`

* app/vmalert: small refactoring

* simplify labels and templates expanding
* simplify `newAlert` interface
* fix `TestGroupStart` which mistakenly skipped annotations
and response labels check

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* reduce alerts lock time when restore

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Hui Wang 2024-04-19 15:16:26 +08:00 committed by GitHub
parent 316b19a5d1
commit a84491324d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 97 additions and 66 deletions

View file

@ -314,23 +314,20 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
return nil, fmt.Errorf("`query` template isn't supported in replay mode")
}
for _, s := range res.Data {
ls, err := ar.toLabels(s, qFn)
ls, as, err := ar.expandTemplates(s, qFn, time.Time{})
if err != nil {
return nil, fmt.Errorf("failed to expand labels: %s", err)
}
h := hash(ls.processed)
a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert
if err != nil {
return nil, fmt.Errorf("failed to create alert: %w", err)
return nil, fmt.Errorf("failed to expand templates: %s", err)
}
alertID := hash(ls.processed)
a := ar.newAlert(s, time.Time{}, ls.processed, as) // initial alert
prevT := time.Time{}
for i := range s.Values {
at := time.Unix(s.Timestamps[i], 0)
// try to restore alert's state on the first iteration
if at.Equal(start) {
if _, ok := ar.alerts[h]; ok {
a = ar.alerts[h]
if _, ok := ar.alerts[alertID]; ok {
a = ar.alerts[alertID]
prevT = at
}
}
@ -352,7 +349,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
// save alert's state on last iteration, so it can be used on the next execRange call
if at.Equal(end) {
holdAlertState[h] = a
holdAlertState[alertID] = a
}
}
}
@ -386,15 +383,34 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
}
}()
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err)
}
ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration)
qFn := func(query string) ([]datasource.Metric, error) {
res, _, err := ar.q.Query(ctx, query, ts)
return res.Data, err
}
// template labels and annotations before updating ar.alerts,
// since they could use `query` function which takes a while to execute,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079.
expandedLabels := make([]*labelSet, len(res.Data))
expandedAnnotations := make([]map[string]string, len(res.Data))
for i, m := range res.Data {
ls, as, err := ar.expandTemplates(m, qFn, ts)
if err != nil {
curState.Err = fmt.Errorf("failed to expand templates: %w", err)
return nil, curState.Err
}
expandedLabels[i] = ls
expandedAnnotations[i] = as
}
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
for h, a := range ar.alerts {
// cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention {
@ -403,26 +419,18 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
}
}
qFn := func(query string) ([]datasource.Metric, error) {
res, _, err := ar.q.Query(ctx, query, ts)
return res.Data, err
}
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range res.Data {
ls, err := ar.toLabels(m, qFn)
if err != nil {
curState.Err = fmt.Errorf("failed to expand labels: %w", err)
return nil, curState.Err
}
h := hash(ls.processed)
if _, ok := updated[h]; ok {
for i, m := range res.Data {
labels, annotations := expandedLabels[i], expandedAnnotations[i]
alertID := hash(labels.processed)
if _, ok := updated[alertID]; ok {
// duplicate may be caused the removal of `__name__` label
curState.Err = fmt.Errorf("labels %v: %w", ls.processed, errDuplicate)
curState.Err = fmt.Errorf("labels %v: %w", labels.processed, errDuplicate)
return nil, curState.Err
}
updated[h] = struct{}{}
if a, ok := ar.alerts[h]; ok {
updated[alertID] = struct{}{}
if a, ok := ar.alerts[alertID]; ok {
if a.State == notifier.StateInactive {
// alert could be in inactive state for resolvedRetention
// so when we again receive metrics for it - we switch it
@ -432,22 +440,17 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
ar.logDebugf(ts, a, "INACTIVE => PENDING")
}
a.Value = m.Values[0]
// re-exec template since Value or query can be used in annotations
a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
a.Annotations = annotations
if err != nil {
return nil, err
}
a.KeepFiringSince = time.Time{}
continue
}
a, err := ar.newAlert(m, ls, ts, qFn)
if err != nil {
curState.Err = fmt.Errorf("failed to create alert: %w", err)
return nil, curState.Err
}
a.ID = h
a := ar.newAlert(m, ts, labels.processed, annotations)
a.ID = alertID
a.State = notifier.StatePending
ar.alerts[h] = a
ar.alerts[alertID] = a
ar.logDebugf(ts, a, "created in state PENDING")
}
var numActivePending int
@ -497,6 +500,28 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
return ar.toTimeSeries(ts.Unix()), nil
}
func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.QueryFn, ts time.Time) (*labelSet, map[string]string, error) {
ls, err := ar.toLabels(m, qFn)
if err != nil {
return nil, nil, fmt.Errorf("failed to expand labels: %w", err)
}
tplData := notifier.AlertTplData{
Value: m.Values[0],
Labels: ls.origin,
Expr: ar.Expr,
AlertID: hash(ls.processed),
GroupID: ar.GroupID,
ActiveAt: ts,
For: ar.For,
}
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
if err != nil {
return nil, nil, fmt.Errorf("failed to template annotations: %w", err)
}
return ls, as, nil
}
func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries
for _, a := range ar.alerts {
@ -530,25 +555,25 @@ func hash(labels map[string]string) uint64 {
return hash.Sum64()
}
func (ar *AlertingRule) newAlert(m datasource.Metric, ls *labelSet, start time.Time, qFn templates.QueryFn) (*notifier.Alert, error) {
var err error
if ls == nil {
ls, err = ar.toLabels(m, qFn)
if err != nil {
return nil, fmt.Errorf("failed to expand labels: %w", err)
}
func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, labels, annotations map[string]string) *notifier.Alert {
as := make(map[string]string)
if annotations != nil {
as = annotations
}
a := &notifier.Alert{
GroupID: ar.GroupID,
Name: ar.Name,
Labels: ls.processed,
Value: m.Values[0],
ActiveAt: start,
Expr: ar.Expr,
For: ar.For,
ls := make(map[string]string)
if labels != nil {
ls = labels
}
return &notifier.Alert{
GroupID: ar.GroupID,
Name: ar.Name,
Expr: ar.Expr,
For: ar.For,
ActiveAt: start,
Value: m.Values[0],
Labels: ls,
Annotations: as,
}
a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
return a, err
}
const (
@ -604,9 +629,6 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
return nil
}
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
if len(ar.alerts) < 1 {
return nil
}
@ -631,6 +653,10 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
ar.logDebugf(ts, nil, "no response was received from restore query")
return nil
}
ar.alertsMu.Lock()
defer ar.alertsMu.Unlock()
for _, series := range res.Data {
series.DelLabel("__name__")
labelSet := make(map[string]string, len(series.Labels))

View file

@ -223,13 +223,15 @@ func TestGroupStart(t *testing.T) {
m2 := metricWithLabels(t, "instance", inst2, "job", job)
r := g.Rules[0].(*AlertingRule)
alert1, err := r.newAlert(m1, nil, time.Now(), nil)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
alert1 := r.newAlert(m1, time.Now(), nil, nil)
alert1.State = notifier.StateFiring
// add annotations
alert1.Annotations["summary"] = "1"
// add external label
alert1.Labels["cluster"] = "east-1"
// add labels from response
alert1.Labels["job"] = job
alert1.Labels["instance"] = inst1
// add rule labels
alert1.Labels["label"] = "bar"
alert1.Labels["host"] = inst1
@ -238,13 +240,15 @@ func TestGroupStart(t *testing.T) {
alert1.Labels[alertGroupNameLabel] = g.Name
alert1.ID = hash(alert1.Labels)
alert2, err := r.newAlert(m2, nil, time.Now(), nil)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
alert2 := r.newAlert(m2, time.Now(), nil, nil)
alert2.State = notifier.StateFiring
// add annotations
alert2.Annotations["summary"] = "1"
// add external label
alert2.Labels["cluster"] = "east-1"
// add labels from response
alert2.Labels["job"] = job
alert2.Labels["instance"] = inst2
// add rule labels
alert2.Labels["label"] = "bar"
alert2.Labels["host"] = inst2

View file

@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): in the Select component, user-entered values are now preserved on blur if they match options in the list.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): supported any status codes from the range 200-299 from alertmanager. Previously, only 200 status code considered a successful action. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6110).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): avoid blocking `/api/v1/rules`, `/api/v1/alerts`, `/metrics` APIs when alerting rule uses template functions `query`, which could takes a while to execute. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): don't treat concurrency limit hit as an error of the backend. Previously, hitting the concurrency limit would increment both `vmauth_concurrent_requests_limit_reached_total` and `vmauth_user_request_backend_errors_total` counters. Now, only concurrency limit counter is incremented. Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5565).
## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1)