From e0d47ab6afbf1b8671a6d76039c3bfd5b294c28c Mon Sep 17 00:00:00 2001
From: Hui Wang <haley@victoriametrics.com>
Date: Fri, 19 Apr 2024 15:16:26 +0800
Subject: [PATCH] =?UTF-8?q?vmalert:=20avoid=20blocking=20APIs=20when=20ale?=
 =?UTF-8?q?rting=20rule=20uses=20template=20functio=E2=80=A6=20(#6129)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* vmalert: avoid blocking APIs when alerting rule uses template function `query`

* app/vmalert: small refactoring

* simplify labels and templates expanding
* simplify `newAlert` interface
* fix `TestGroupStart` which mistakenly skipped annotations
and response labels check

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* reduce alerts lock time when restore

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
---
 app/vmalert/rule/alerting.go   | 142 +++++++++++++++++++--------------
 app/vmalert/rule/group_test.go |  20 +++--
 docs/CHANGELOG.md              |   1 +
 3 files changed, 97 insertions(+), 66 deletions(-)

diff --git a/app/vmalert/rule/alerting.go b/app/vmalert/rule/alerting.go
index 8f6d0e9805..81fbd2cf22 100644
--- a/app/vmalert/rule/alerting.go
+++ b/app/vmalert/rule/alerting.go
@@ -314,23 +314,20 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
 		return nil, fmt.Errorf("`query` template isn't supported in replay mode")
 	}
 	for _, s := range res.Data {
-		ls, err := ar.toLabels(s, qFn)
+		ls, as, err := ar.expandTemplates(s, qFn, time.Time{})
 		if err != nil {
-			return nil, fmt.Errorf("failed to expand labels: %s", err)
-		}
-		h := hash(ls.processed)
-		a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert
-		if err != nil {
-			return nil, fmt.Errorf("failed to create alert: %w", err)
+			return nil, fmt.Errorf("failed to expand templates: %s", err)
 		}
+		alertID := hash(ls.processed)
+		a := ar.newAlert(s, time.Time{}, ls.processed, as) // initial alert
 
 		prevT := time.Time{}
 		for i := range s.Values {
 			at := time.Unix(s.Timestamps[i], 0)
 			// try to restore alert's state on the first iteration
 			if at.Equal(start) {
-				if _, ok := ar.alerts[h]; ok {
-					a = ar.alerts[h]
+				if _, ok := ar.alerts[alertID]; ok {
+					a = ar.alerts[alertID]
 					prevT = at
 				}
 			}
@@ -352,7 +349,7 @@ func (ar *AlertingRule) execRange(ctx context.Context, start, end time.Time) ([]
 
 			// save alert's state on last iteration, so it can be used on the next execRange call
 			if at.Equal(end) {
-				holdAlertState[h] = a
+				holdAlertState[alertID] = a
 			}
 		}
 	}
@@ -386,15 +383,34 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
 		}
 	}()
 
-	ar.alertsMu.Lock()
-	defer ar.alertsMu.Unlock()
-
 	if err != nil {
 		return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err)
 	}
-
 	ar.logDebugf(ts, nil, "query returned %d samples (elapsed: %s)", curState.Samples, curState.Duration)
 
+	qFn := func(query string) ([]datasource.Metric, error) {
+		res, _, err := ar.q.Query(ctx, query, ts)
+		return res.Data, err
+	}
+
+	// template labels and annotations before updating ar.alerts,
+	// since they could use `query` function which takes a while to execute,
+	// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079.
+	expandedLabels := make([]*labelSet, len(res.Data))
+	expandedAnnotations := make([]map[string]string, len(res.Data))
+	for i, m := range res.Data {
+		ls, as, err := ar.expandTemplates(m, qFn, ts)
+		if err != nil {
+			curState.Err = fmt.Errorf("failed to expand templates: %w", err)
+			return nil, curState.Err
+		}
+		expandedLabels[i] = ls
+		expandedAnnotations[i] = as
+	}
+
+	ar.alertsMu.Lock()
+	defer ar.alertsMu.Unlock()
+
 	for h, a := range ar.alerts {
 		// cleanup inactive alerts from previous Exec
 		if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention {
@@ -403,26 +419,18 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
 		}
 	}
 
-	qFn := func(query string) ([]datasource.Metric, error) {
-		res, _, err := ar.q.Query(ctx, query, ts)
-		return res.Data, err
-	}
 	updated := make(map[uint64]struct{})
 	// update list of active alerts
-	for _, m := range res.Data {
-		ls, err := ar.toLabels(m, qFn)
-		if err != nil {
-			curState.Err = fmt.Errorf("failed to expand labels: %w", err)
-			return nil, curState.Err
-		}
-		h := hash(ls.processed)
-		if _, ok := updated[h]; ok {
+	for i, m := range res.Data {
+		labels, annotations := expandedLabels[i], expandedAnnotations[i]
+		alertID := hash(labels.processed)
+		if _, ok := updated[alertID]; ok {
 			// duplicate may be caused the removal of `__name__` label
-			curState.Err = fmt.Errorf("labels %v: %w", ls.processed, errDuplicate)
+			curState.Err = fmt.Errorf("labels %v: %w", labels.processed, errDuplicate)
 			return nil, curState.Err
 		}
-		updated[h] = struct{}{}
-		if a, ok := ar.alerts[h]; ok {
+		updated[alertID] = struct{}{}
+		if a, ok := ar.alerts[alertID]; ok {
 			if a.State == notifier.StateInactive {
 				// alert could be in inactive state for resolvedRetention
 				// so when we again receive metrics for it - we switch it
@@ -432,22 +440,17 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
 				ar.logDebugf(ts, a, "INACTIVE => PENDING")
 			}
 			a.Value = m.Values[0]
-			// re-exec template since Value or query can be used in annotations
-			a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
+			a.Annotations = annotations
 			if err != nil {
 				return nil, err
 			}
 			a.KeepFiringSince = time.Time{}
 			continue
 		}
-		a, err := ar.newAlert(m, ls, ts, qFn)
-		if err != nil {
-			curState.Err = fmt.Errorf("failed to create alert: %w", err)
-			return nil, curState.Err
-		}
-		a.ID = h
+		a := ar.newAlert(m, ts, labels.processed, annotations)
+		a.ID = alertID
 		a.State = notifier.StatePending
-		ar.alerts[h] = a
+		ar.alerts[alertID] = a
 		ar.logDebugf(ts, a, "created in state PENDING")
 	}
 	var numActivePending int
@@ -497,6 +500,28 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
 	return ar.toTimeSeries(ts.Unix()), nil
 }
 
+func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.QueryFn, ts time.Time) (*labelSet, map[string]string, error) {
+	ls, err := ar.toLabels(m, qFn)
+	if err != nil {
+		return nil, nil, fmt.Errorf("failed to expand labels: %w", err)
+	}
+
+	tplData := notifier.AlertTplData{
+		Value:    m.Values[0],
+		Labels:   ls.origin,
+		Expr:     ar.Expr,
+		AlertID:  hash(ls.processed),
+		GroupID:  ar.GroupID,
+		ActiveAt: ts,
+		For:      ar.For,
+	}
+	as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
+	if err != nil {
+		return nil, nil, fmt.Errorf("failed to template annotations: %w", err)
+	}
+	return ls, as, nil
+}
+
 func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries {
 	var tss []prompbmarshal.TimeSeries
 	for _, a := range ar.alerts {
@@ -530,25 +555,25 @@ func hash(labels map[string]string) uint64 {
 	return hash.Sum64()
 }
 
-func (ar *AlertingRule) newAlert(m datasource.Metric, ls *labelSet, start time.Time, qFn templates.QueryFn) (*notifier.Alert, error) {
-	var err error
-	if ls == nil {
-		ls, err = ar.toLabels(m, qFn)
-		if err != nil {
-			return nil, fmt.Errorf("failed to expand labels: %w", err)
-		}
+func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, labels, annotations map[string]string) *notifier.Alert {
+	as := make(map[string]string)
+	if annotations != nil {
+		as = annotations
 	}
-	a := &notifier.Alert{
-		GroupID:  ar.GroupID,
-		Name:     ar.Name,
-		Labels:   ls.processed,
-		Value:    m.Values[0],
-		ActiveAt: start,
-		Expr:     ar.Expr,
-		For:      ar.For,
+	ls := make(map[string]string)
+	if labels != nil {
+		ls = labels
+	}
+	return &notifier.Alert{
+		GroupID:     ar.GroupID,
+		Name:        ar.Name,
+		Expr:        ar.Expr,
+		For:         ar.For,
+		ActiveAt:    start,
+		Value:       m.Values[0],
+		Labels:      ls,
+		Annotations: as,
 	}
-	a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
-	return a, err
 }
 
 const (
@@ -604,9 +629,6 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
 		return nil
 	}
 
-	ar.alertsMu.Lock()
-	defer ar.alertsMu.Unlock()
-
 	if len(ar.alerts) < 1 {
 		return nil
 	}
@@ -631,6 +653,10 @@ func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts ti
 		ar.logDebugf(ts, nil, "no response was received from restore query")
 		return nil
 	}
+
+	ar.alertsMu.Lock()
+	defer ar.alertsMu.Unlock()
+
 	for _, series := range res.Data {
 		series.DelLabel("__name__")
 		labelSet := make(map[string]string, len(series.Labels))
diff --git a/app/vmalert/rule/group_test.go b/app/vmalert/rule/group_test.go
index b7560c1aeb..0ff6adec03 100644
--- a/app/vmalert/rule/group_test.go
+++ b/app/vmalert/rule/group_test.go
@@ -223,13 +223,15 @@ func TestGroupStart(t *testing.T) {
 	m2 := metricWithLabels(t, "instance", inst2, "job", job)
 
 	r := g.Rules[0].(*AlertingRule)
-	alert1, err := r.newAlert(m1, nil, time.Now(), nil)
-	if err != nil {
-		t.Fatalf("faield to create alert: %s", err)
-	}
+	alert1 := r.newAlert(m1, time.Now(), nil, nil)
 	alert1.State = notifier.StateFiring
+	// add annotations
+	alert1.Annotations["summary"] = "1"
 	// add external label
 	alert1.Labels["cluster"] = "east-1"
+	// add labels from response
+	alert1.Labels["job"] = job
+	alert1.Labels["instance"] = inst1
 	// add rule labels
 	alert1.Labels["label"] = "bar"
 	alert1.Labels["host"] = inst1
@@ -238,13 +240,15 @@ func TestGroupStart(t *testing.T) {
 	alert1.Labels[alertGroupNameLabel] = g.Name
 	alert1.ID = hash(alert1.Labels)
 
-	alert2, err := r.newAlert(m2, nil, time.Now(), nil)
-	if err != nil {
-		t.Fatalf("faield to create alert: %s", err)
-	}
+	alert2 := r.newAlert(m2, time.Now(), nil, nil)
 	alert2.State = notifier.StateFiring
+	// add annotations
+	alert2.Annotations["summary"] = "1"
 	// add external label
 	alert2.Labels["cluster"] = "east-1"
+	// add labels from response
+	alert2.Labels["job"] = job
+	alert2.Labels["instance"] = inst2
 	// add rule labels
 	alert2.Labels["label"] = "bar"
 	alert2.Labels["host"] = inst2
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 6e3126ffaa..92a9a90b88 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): in the Select component, user-entered values are now preserved on blur if they match options in the list.
 
 * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): supported any status codes from the range 200-299 from alertmanager. Previously, only 200 status code considered a successful action. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6110).
+* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): avoid blocking `/api/v1/rules`, `/api/v1/alerts`, `/metrics` APIs when alerting rule uses template functions `query`, which could takes a while to execute. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6079).
 * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): don't treat concurrency limit hit as an error of the backend. Previously, hitting the concurrency limit would increment both `vmauth_concurrent_requests_limit_reached_total` and `vmauth_user_request_backend_errors_total` counters. Now, only concurrency limit counter is incremented. Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5565).
 
 ## [v1.100.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.100.1)