From a84491324d7b922341c2b77214f4ff3a229fe514 Mon Sep 17 00:00:00 2001 From: Hui Wang Date: Fri, 19 Apr 2024 15:16:26 +0800 Subject: [PATCH] =?UTF-8?q?vmalert:=20avoid=20blocking=20APIs=20when=20ale?= =?UTF-8?q?rting=20rule=20uses=20template=20functio=E2=80=A6=20(#6129)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * vmalert: avoid blocking APIs when alerting rule uses template function `query` * app/vmalert: small refactoring * simplify labels and templates expanding * simplify `newAlert` interface * fix `TestGroupStart` which mistakenly skipped annotations and response labels check Signed-off-by: hagen1778 * reduce alerts lock time when restore --------- Signed-off-by: hagen1778 Co-authored-by: hagen1778 --- app/vmalert/rule/alerting.go | 142 +++++++++++++++++++-------------- app/vmalert/rule/group_test.go | 20 +++-- docs/CHANGELOG.md | 1 + 3 files changed, 97 insertions(+), 66 deletions(-) diff --git a/app/vmalert/rule/alerting.go b/app/vmalert/rule/alerting.go index 8f6d0e980..81fbd2cf2 100644 --- a/app/vmalert/rule/alerting.go +++ b/app/vmalert/rule/alerting.go @@ -314,23 +314,20 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([] return nil, fmt.Errorf("`query` template isn't supported in replay mode") } for _, s := range res.Data { - ls, err := ar.toLabels(s, qFn) + ls, as, err := ar.expandTemplates(s, qFn, time.Time{}) if err != nil { - return nil, fmt.Errorf("failed to expand labels: %s", err) - } - h := hash(ls.processed) - a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert - if err != nil { - return nil, fmt.Errorf("failed to create alert: %w", err) + return nil, fmt.Errorf("failed to expand templates: %s", err) } + alertID := hash(ls.processed) + a := ar.newAlert(s, time.Time{}, ls.processed, as) // initial alert prevT := time.Time{} for i := range s.Values { at := time.Unix(s.Timestamps[i], 0) // try to restore alert's state on the first iteration if at.Equal(start) { - if _, ok := ar.alerts[h]; ok { - a = ar.alerts[h] + if _, ok := ar.alerts[alertID]; ok { + a = ar.alerts[alertID] prevT = at } } @@ -352,7 +349,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([] // save alert's state on last iteration, so it can be used on the next execRange call if at.Equal(end) { - holdAlertState[h] = a + holdAlertState[alertID] = a } } } @@ -386,15 +383,34 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr } }() - ar.alertsMu.Lock() - defer ar.alertsMu.Unlock() - if err != nil { return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err) } - ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration) + qFn := func(query string) ([]datasource.Metric, error) { + res, _, err := ar.q.Query(ctx, query, ts) + return res.Data, err + } + + // template labels and annotations before updating ar.alerts, + // since they could use `query` function which takes a while to execute, + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079. + expandedLabels := make([]*labelSet, len(res.Data)) + expandedAnnotations := make([]map[string]string, len(res.Data)) + for i, m := range res.Data { + ls, as, err := ar.expandTemplates(m, qFn, ts) + if err != nil { + curState.Err = fmt.Errorf("failed to expand templates: %w", err) + return nil, curState.Err + } + expandedLabels[i] = ls + expandedAnnotations[i] = as + } + + ar.alertsMu.Lock() + defer ar.alertsMu.Unlock() + for h, a := range ar.alerts { // cleanup inactive alerts from previous Exec if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention { @@ -403,26 +419,18 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr } } - qFn := func(query string) ([]datasource.Metric, error) { - res, _, err := ar.q.Query(ctx, query, ts) - return res.Data, err - } updated := make(map[uint64]struct{}) // update list of active alerts - for _, m := range res.Data { - ls, err := ar.toLabels(m, qFn) - if err != nil { - curState.Err = fmt.Errorf("failed to expand labels: %w", err) - return nil, curState.Err - } - h := hash(ls.processed) - if _, ok := updated[h]; ok { + for i, m := range res.Data { + labels, annotations := expandedLabels[i], expandedAnnotations[i] + alertID := hash(labels.processed) + if _, ok := updated[alertID]; ok { // duplicate may be caused the removal of `__name__` label - curState.Err = fmt.Errorf("labels %v: %w", ls.processed, errDuplicate) + curState.Err = fmt.Errorf("labels %v: %w", labels.processed, errDuplicate) return nil, curState.Err } - updated[h] = struct{}{} - if a, ok := ar.alerts[h]; ok { + updated[alertID] = struct{}{} + if a, ok := ar.alerts[alertID]; ok { if a.State == notifier.StateInactive { // alert could be in inactive state for resolvedRetention // so when we again receive metrics for it - we switch it @@ -432,22 +440,17 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr ar.logDebugf(ts, a, "INACTIVE => PENDING") } a.Value = m.Values[0] - // re-exec template since Value or query can be used in annotations - a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations) + a.Annotations = annotations if err != nil { return nil, err } a.KeepFiringSince = time.Time{} continue } - a, err := ar.newAlert(m, ls, ts, qFn) - if err != nil { - curState.Err = fmt.Errorf("failed to create alert: %w", err) - return nil, curState.Err - } - a.ID = h + a := ar.newAlert(m, ts, labels.processed, annotations) + a.ID = alertID a.State = notifier.StatePending - ar.alerts[h] = a + ar.alerts[alertID] = a ar.logDebugf(ts, a, "created in state PENDING") } var numActivePending int @@ -497,6 +500,28 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr return ar.toTimeSeries(ts.Unix()), nil } +func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.QueryFn, ts time.Time) (*labelSet, map[string]string, error) { + ls, err := ar.toLabels(m, qFn) + if err != nil { + return nil, nil, fmt.Errorf("failed to expand labels: %w", err) + } + + tplData := notifier.AlertTplData{ + Value: m.Values[0], + Labels: ls.origin, + Expr: ar.Expr, + AlertID: hash(ls.processed), + GroupID: ar.GroupID, + ActiveAt: ts, + For: ar.For, + } + as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData) + if err != nil { + return nil, nil, fmt.Errorf("failed to template annotations: %w", err) + } + return ls, as, nil +} + func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries { var tss []prompbmarshal.TimeSeries for _, a := range ar.alerts { @@ -530,25 +555,25 @@ func hash(labels map[string]string) uint64 { return hash.Sum64() } -func (ar *AlertingRule) newAlert(m datasource.Metric, ls *labelSet, start time.Time, qFn templates.QueryFn) (*notifier.Alert, error) { - var err error - if ls == nil { - ls, err = ar.toLabels(m, qFn) - if err != nil { - return nil, fmt.Errorf("failed to expand labels: %w", err) - } +func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, labels, annotations map[string]string) *notifier.Alert { + as := make(map[string]string) + if annotations != nil { + as = annotations } - a := ¬ifier.Alert{ - GroupID: ar.GroupID, - Name: ar.Name, - Labels: ls.processed, - Value: m.Values[0], - ActiveAt: start, - Expr: ar.Expr, - For: ar.For, + ls := make(map[string]string) + if labels != nil { + ls = labels + } + return ¬ifier.Alert{ + GroupID: ar.GroupID, + Name: ar.Name, + Expr: ar.Expr, + For: ar.For, + ActiveAt: start, + Value: m.Values[0], + Labels: ls, + Annotations: as, } - a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations) - return a, err } const ( @@ -604,9 +629,6 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti return nil } - ar.alertsMu.Lock() - defer ar.alertsMu.Unlock() - if len(ar.alerts) < 1 { return nil } @@ -631,6 +653,10 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti ar.logDebugf(ts, nil, "no response was received from restore query") return nil } + + ar.alertsMu.Lock() + defer ar.alertsMu.Unlock() + for _, series := range res.Data { series.DelLabel("__name__") labelSet := make(map[string]string, len(series.Labels)) diff --git a/app/vmalert/rule/group_test.go b/app/vmalert/rule/group_test.go index b7560c1ae..0ff6adec0 100644 --- a/app/vmalert/rule/group_test.go +++ b/app/vmalert/rule/group_test.go @@ -223,13 +223,15 @@ func TestGroupStart(t *testing.T) { m2 := metricWithLabels(t, "instance", inst2, "job", job) r := g.Rules[0].(*AlertingRule) - alert1, err := r.newAlert(m1, nil, time.Now(), nil) - if err != nil { - t.Fatalf("faield to create alert: %s", err) - } + alert1 := r.newAlert(m1, time.Now(), nil, nil) alert1.State = notifier.StateFiring + // add annotations + alert1.Annotations["summary"] = "1" // add external label alert1.Labels["cluster"] = "east-1" + // add labels from response + alert1.Labels["job"] = job + alert1.Labels["instance"] = inst1 // add rule labels alert1.Labels["label"] = "bar" alert1.Labels["host"] = inst1 @@ -238,13 +240,15 @@ func TestGroupStart(t *testing.T) { alert1.Labels[alertGroupNameLabel] = g.Name alert1.ID = hash(alert1.Labels) - alert2, err := r.newAlert(m2, nil, time.Now(), nil) - if err != nil { - t.Fatalf("faield to create alert: %s", err) - } + alert2 := r.newAlert(m2, time.Now(), nil, nil) alert2.State = notifier.StateFiring + // add annotations + alert2.Annotations["summary"] = "1" // add external label alert2.Labels["cluster"] = "east-1" + // add labels from response + alert2.Labels["job"] = job + alert2.Labels["instance"] = inst2 // add rule labels alert2.Labels["label"] = "bar" alert2.Labels["host"] = inst2 diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6e3126ffa..92a9a90b8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): in the Select component, user-entered values are now preserved on blur if they match options in the list. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): supported any status codes from the range 200-299 from alertmanager. Previously, only 200 status code considered a successful action. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6110). +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): avoid blocking `/api/v1/rules`, `/api/v1/alerts`, `/metrics` APIs when alerting rule uses template functions `query`, which could takes a while to execute. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079). * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): don't treat concurrency limit hit as an error of the backend. Previously, hitting the concurrency limit would increment both `vmauth_concurrent_requests_limit_reached_total` and `vmauth_user_request_backend_errors_total` counters. Now, only concurrency limit counter is incremented. Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5565). ## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1)