diff --git a/app/vmalert/README.md b/app/vmalert/README.md index 86bfcedb6..297186cf2 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -101,6 +101,10 @@ name: # How often rules in the group are evaluated. [ interval: | default = -evaluationInterval flag ] +# Limit the number of alerts an alerting rule and series a recording +# rule can produce. 0 is no limit. +[ limit: | default = 0 ] + # How many rules execute at once within a group. Increasing concurrency may speed # up round execution speed. [ concurrency: | default = 1 ] diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index 855843df5..1af935c31 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -193,12 +193,13 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l // It doesn't update internal states of the Rule and meant to be used just // to get time series for backfilling. // It returns ALERT and ALERT_FOR_STATE time series as result. -func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) { +func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) { series, err := ar.q.QueryRange(ctx, ar.Expr, start, end) if err != nil { return nil, err } var result []prompbmarshal.TimeSeries + timestamp2Series := make(map[int64][]prompbmarshal.TimeSeries, 0) qFn := func(query string) ([]datasource.Metric, error) { return nil, fmt.Errorf("`query` template isn't supported in replay mode") } @@ -210,11 +211,14 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([] if ar.For == 0 { // if alert is instant a.State = notifier.StateFiring for i := range s.Values { - result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) + if limit > 0 { + timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...) + } else { + result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) + } } continue } - // if alert with For > 0 prevT := time.Time{} for i := range s.Values { @@ -228,9 +232,28 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([] a.Start = at } prevT = at - result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) + if limit > 0 { + timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...) + } else { + result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) + } } } + if limit <= 0 { + return result, nil + } + sortedTimestamp := make([]int64, 0) + for timestamp := range timestamp2Series { + sortedTimestamp = append(sortedTimestamp, timestamp) + } + sort.Slice(sortedTimestamp, func(i, j int) bool { return sortedTimestamp[i] < sortedTimestamp[j] }) + for _, timestamp := range sortedTimestamp { + if len(timestamp2Series[timestamp]) > limit { + logger.Errorf("exec exceeded limit of %d with %d alerts", limit, len(timestamp2Series[timestamp])) + continue + } + result = append(result, timestamp2Series[timestamp]...) + } return result, nil } @@ -240,7 +263,7 @@ const resolvedRetention = 15 * time.Minute // Exec executes AlertingRule expression via the given Querier. // Based on the Querier results AlertingRule maintains notifier.Alerts -func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) { +func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) { start := time.Now() qMetrics, err := ar.q.Query(ctx, ar.Expr, ts) ar.mu.Lock() @@ -307,7 +330,7 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal a.ActiveAt = ts ar.alerts[h] = a } - + var numActivePending int for h, a := range ar.alerts { // if alert wasn't updated in this iteration // means it is resolved already @@ -324,12 +347,17 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal } continue } + numActivePending++ if a.State == notifier.StatePending && ts.Sub(a.ActiveAt) >= ar.For { a.State = notifier.StateFiring a.Start = ts alertsFired.Inc() } } + if limit > 0 && numActivePending > limit { + ar.alerts = map[uint64]*notifier.Alert{} + return nil, fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending) + } return ar.toTimeSeries(ts.Unix()), nil } diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index 6bbd2dff6..f0b1cc4eb 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "reflect" "sort" "strings" @@ -304,7 +305,7 @@ func TestAlertingRule_Exec(t *testing.T) { for _, step := range tc.steps { fq.reset() fq.add(step...) - if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil { + if _, err := tc.rule.Exec(context.TODO(), time.Now(), 0); err != nil { t.Fatalf("unexpected err: %s", err) } // artificial delay between applying steps @@ -472,7 +473,7 @@ func TestAlertingRule_ExecRange(t *testing.T) { tc.rule.q = fq tc.rule.GroupID = fakeGroup.ID() fq.add(tc.data...) - gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now()) + gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0) if err != nil { t.Fatalf("unexpected err: %s", err) } @@ -624,14 +625,14 @@ func TestAlertingRule_Exec_Negative(t *testing.T) { // successful attempt fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) - _, err := ar.Exec(context.TODO(), time.Now()) + _, err := ar.Exec(context.TODO(), time.Now(), 0) if err != nil { t.Fatal(err) } // label `job` will collide with rule extra label and will make both time series equal fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) - _, err = ar.Exec(context.TODO(), time.Now()) + _, err = ar.Exec(context.TODO(), time.Now(), 0) if !errors.Is(err, errDuplicate) { t.Fatalf("expected to have %s error; got %s", errDuplicate, err) } @@ -640,7 +641,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) { expErr := "connection reset by peer" fq.setErr(errors.New(expErr)) - _, err = ar.Exec(context.TODO(), time.Now()) + _, err = ar.Exec(context.TODO(), time.Now(), 0) if err == nil { t.Fatalf("expected to get err; got nil") } @@ -649,6 +650,59 @@ func TestAlertingRule_Exec_Negative(t *testing.T) { } } +func TestAlertingRuleLimit(t *testing.T) { + fq := &fakeQuerier{} + ar := newTestAlertingRule("test", 0) + ar.Labels = map[string]string{"job": "test"} + ar.q = fq + ar.For = time.Minute + testCases := []struct { + limit int + err string + tssNum int + }{ + { + limit: 0, + tssNum: 4, + }, + { + limit: -1, + tssNum: 4, + }, + { + limit: 1, + err: "exec exceeded limit of 1 with 2 alerts", + tssNum: 0, + }, + { + limit: 4, + tssNum: 4, + }, + } + var ( + err error + timestamp = time.Now() + ) + fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) + fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "bar", "job")) + for _, testCase := range testCases { + _, err = ar.Exec(context.TODO(), timestamp, testCase.limit) + if err != nil && !strings.EqualFold(err.Error(), testCase.err) { + t.Fatal(err) + } + } + for _, testCase := range testCases { + tss, err := ar.ExecRange(context.TODO(), timestamp, timestamp, testCase.limit) + if err != nil { + t.Fatal(err) + } + if len(tss) != testCase.tssNum { + t.Fatal(fmt.Errorf("tss len %d is not equal to supposed %d", len(tss), testCase.tssNum)) + } + } + fq.reset() +} + func TestAlertingRule_Template(t *testing.T) { testCases := []struct { rule *AlertingRule @@ -761,7 +815,7 @@ func TestAlertingRule_Template(t *testing.T) { tc.rule.GroupID = fakeGroup.ID() tc.rule.q = fq fq.add(tc.metrics...) - if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil { + if _, err := tc.rule.Exec(context.TODO(), time.Now(), 0); err != nil { t.Fatalf("unexpected err: %s", err) } for hash, expAlert := range tc.expAlerts { diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go index ec961b092..f79801025 100644 --- a/app/vmalert/config/config.go +++ b/app/vmalert/config/config.go @@ -27,6 +27,7 @@ type Group struct { File string Name string `yaml:"name"` Interval *promutils.Duration `yaml:"interval,omitempty"` + Limit int `yaml:"limit,omitempty"` Rules []Rule `yaml:"rules"` Concurrency int `yaml:"concurrency"` // ExtraFilterLabels is a list label filters applied to every rule diff --git a/app/vmalert/group.go b/app/vmalert/group.go index c428537ba..a9983ce1c 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" @@ -18,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/metrics" ) // Group is an entity for grouping rules @@ -29,6 +30,7 @@ type Group struct { Rules []Rule Type datasource.Type Interval time.Duration + Limit int Concurrency int Checksum string LastEvaluation time.Time @@ -90,6 +92,7 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti Name: cfg.Name, File: cfg.File, Interval: cfg.Interval.Duration(), + Limit: cfg.Limit, Concurrency: cfg.Concurrency, Checksum: cfg.Checksum, Params: cfg.Params, @@ -282,7 +285,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r } resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration) - errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration) + errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration, g.Limit) for err := range errs { if err != nil { logger.Errorf("group %q: %s", g.Name, err) @@ -360,12 +363,12 @@ type executor struct { previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label } -func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration) chan error { +func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error { res := make(chan error, len(rules)) if concurrency == 1 { // fast path for _, rule := range rules { - res <- e.exec(ctx, rule, ts, resolveDuration) + res <- e.exec(ctx, rule, ts, resolveDuration, limit) } close(res) return res @@ -378,7 +381,7 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.T sem <- struct{}{} wg.Add(1) go func(r Rule) { - res <- e.exec(ctx, r, ts, resolveDuration) + res <- e.exec(ctx, r, ts, resolveDuration, limit) <-sem wg.Done() }(rule) @@ -399,10 +402,10 @@ var ( remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`) ) -func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration) error { +func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration, limit int) error { execTotal.Inc() - tss, err := rule.Exec(ctx, ts) + tss, err := rule.Exec(ctx, ts, limit) if err != nil { execErrors.Inc() return fmt.Errorf("rule %q: failed to execute: %w", rule, err) diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 9a16cd408..cb5ac8e2f 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -104,7 +104,7 @@ func (rr *RecordingRule) Close() { // ExecRange executes recording rule on the given time range similarly to Exec. // It doesn't update internal states of the Rule and meant to be used just // to get time series for backfilling. -func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) { +func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) { series, err := rr.q.QueryRange(ctx, rr.Expr, start, end) if err != nil { return nil, err @@ -120,11 +120,14 @@ func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([ duplicates[key] = struct{}{} tss = append(tss, ts) } + if limit > 0 && len(tss) > limit { + return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, len(tss)) + } return tss, nil } // Exec executes RecordingRule expression via the given Querier. -func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) { +func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) { qMetrics, err := rr.q.Query(ctx, rr.Expr, ts) rr.mu.Lock() defer rr.mu.Unlock() @@ -137,6 +140,11 @@ func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarsha return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err) } + numSeries := len(qMetrics) + if limit > 0 && numSeries > limit { + return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, numSeries) + } + duplicates := make(map[string]struct{}, len(qMetrics)) var tss []prompbmarshal.TimeSeries for _, r := range qMetrics { diff --git a/app/vmalert/recording_test.go b/app/vmalert/recording_test.go index cbb2b75a7..4fe94d8e8 100644 --- a/app/vmalert/recording_test.go +++ b/app/vmalert/recording_test.go @@ -11,7 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) -func TestRecoridngRule_Exec(t *testing.T) { +func TestRecordingRule_Exec(t *testing.T) { timestamp := time.Now() testCases := []struct { rule *RecordingRule @@ -77,7 +77,7 @@ func TestRecoridngRule_Exec(t *testing.T) { fq := &fakeQuerier{} fq.add(tc.metrics...) tc.rule.q = fq - tss, err := tc.rule.Exec(context.TODO(), time.Now()) + tss, err := tc.rule.Exec(context.TODO(), time.Now(), 0) if err != nil { t.Fatalf("unexpected Exec err: %s", err) } @@ -88,7 +88,7 @@ func TestRecoridngRule_Exec(t *testing.T) { } } -func TestRecoridngRule_ExecRange(t *testing.T) { +func TestRecordingRule_ExecRange(t *testing.T) { timestamp := time.Now() testCases := []struct { rule *RecordingRule @@ -158,7 +158,7 @@ func TestRecoridngRule_ExecRange(t *testing.T) { fq := &fakeQuerier{} fq.add(tc.metrics...) tc.rule.q = fq - tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now()) + tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0) if err != nil { t.Fatalf("unexpected Exec err: %s", err) } @@ -169,7 +169,52 @@ func TestRecoridngRule_ExecRange(t *testing.T) { } } -func TestRecoridngRule_ExecNegative(t *testing.T) { +func TestRecordingRuleLimit(t *testing.T) { + timestamp := time.Now() + testCases := []struct { + limit int + err string + }{ + { + limit: 0, + }, + { + limit: -1, + }, + { + limit: 1, + err: "exec exceeded limit of 1 with 3 series", + }, + { + limit: 2, + err: "exec exceeded limit of 2 with 3 series", + }, + } + testMetrics := []datasource.Metric{ + metricWithValuesAndLabels(t, []float64{1}, "__name__", "foo", "job", "foo"), + metricWithValuesAndLabels(t, []float64{2, 3}, "__name__", "bar", "job", "bar"), + metricWithValuesAndLabels(t, []float64{4, 5, 6}, "__name__", "baz", "job", "baz"), + } + rule := &RecordingRule{Name: "job:foo", Labels: map[string]string{ + "source": "test_limit", + }} + var err error + for _, testCase := range testCases { + fq := &fakeQuerier{} + fq.add(testMetrics...) + rule.q = fq + _, err = rule.Exec(context.TODO(), timestamp, testCase.limit) + if err != nil && !strings.EqualFold(err.Error(), testCase.err) { + t.Fatal(err) + } + _, err = rule.ExecRange(context.TODO(), timestamp.Add(-2*time.Second), timestamp, testCase.limit) + if err != nil && !strings.EqualFold(err.Error(), testCase.err) { + t.Fatal(err) + } + } +} + +func TestRecordingRule_ExecNegative(t *testing.T) { rr := &RecordingRule{Name: "job:foo", Labels: map[string]string{ "job": "test", }} @@ -178,7 +223,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) { expErr := "connection reset by peer" fq.setErr(errors.New(expErr)) rr.q = fq - _, err := rr.Exec(context.TODO(), time.Now()) + _, err := rr.Exec(context.TODO(), time.Now(), 0) if err == nil { t.Fatalf("expected to get err; got nil") } @@ -193,7 +238,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) { fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo")) fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar")) - _, err = rr.Exec(context.TODO(), time.Now()) + _, err = rr.Exec(context.TODO(), time.Now(), 0) if err == nil { t.Fatalf("expected to get err; got nil") } diff --git a/app/vmalert/replay.go b/app/vmalert/replay.go index 16f182902..2ebfa6997 100644 --- a/app/vmalert/replay.go +++ b/app/vmalert/replay.go @@ -7,12 +7,13 @@ import ( "strings" "time" + "github.com/dmitryk-dk/pb/v3" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/dmitryk-dk/pb/v3" ) var ( @@ -95,7 +96,7 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int { } ri.reset() for ri.next() { - n, err := replayRule(rule, ri.s, ri.e, rw) + n, err := replayRule(rule, ri.s, ri.e, rw, g.Limit) if err != nil { logger.Fatalf("rule %q: %s", rule, err) } @@ -114,11 +115,11 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int { return total } -func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) { +func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client, limit int) (int, error) { var err error var tss []prompbmarshal.TimeSeries for i := 0; i < *replayRuleRetryAttempts; i++ { - tss, err = rule.ExecRange(context.Background(), start, end) + tss, err = rule.ExecRange(context.Background(), start, end, limit) if err == nil { break } diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index 04ce44933..a1332005c 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -15,10 +15,12 @@ type Rule interface { // ID returns unique ID that may be used for // identifying this Rule among others. ID() uint64 - // Exec executes the rule with given context at the given timestamp - Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) - // ExecRange executes the rule on the given time range - ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) + // Exec executes the rule with given context at the given timestamp and limit. + // returns an err if number of resulting time series exceeds the limit. + Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) + // ExecRange executes the rule on the given time range and limit. + // returns an err if number of resulting time series exceeds the limit. + ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) // UpdateWith performs modification of current Rule // with fields of the given Rule. UpdateWith(Rule) error diff --git a/docs/vmalert.md b/docs/vmalert.md index 208552a3f..5e56a886c 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -105,6 +105,10 @@ name: # How often rules in the group are evaluated. [ interval: | default = -evaluationInterval flag ] +# Limit the number of alerts an alerting rule and series a recording +# rule can produce. 0 is no limit. +[ limit: | default = 0 ] + # How many rules execute at once within a group. Increasing concurrency may speed # up round execution speed. [ concurrency: | default = 1 ] diff --git a/lib/logger/logger.go b/lib/logger/logger.go index ab81fb59a..840cf8bf1 100644 --- a/lib/logger/logger.go +++ b/lib/logger/logger.go @@ -40,7 +40,6 @@ func Init() { initTimezone() go logLimiterCleaner() logAllFlags() - } func initTimezone() { @@ -79,7 +78,7 @@ func validateLoggerFormat() { switch *loggerFormat { case "default", "json": default: - // We cannot use logger.Pancif here, since the logger isn't initialized yet. + // We cannot use logger.Panicf here, since the logger isn't initialized yet. panic(fmt.Errorf("FATAL: unsupported `-loggerFormat` value: %q; supported values are: default, json", *loggerFormat)) } }