feat: rule limit (#2676)

vmalert: support `limit` param in groups definition

`limit` param limits number of time series samples produced by a single rule
during execution.
On reaching the limit rule will return an err.

Signed-off-by: lihaowei <haoweili35@gmail.com>
This commit is contained in:
Howie 2022-06-09 14:21:30 +08:00 committed by GitHub
parent 12ac255dae
commit 76f05f8670
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 187 additions and 38 deletions

View file

@ -101,6 +101,10 @@ name: <string>
# How often rules in the group are evaluated. # How often rules in the group are evaluated.
[ interval: <duration> | default = -evaluationInterval flag ] [ interval: <duration> | default = -evaluationInterval flag ]
# Limit the number of alerts an alerting rule and series a recording
# rule can produce. 0 is no limit.
[ limit: <int> | default = 0 ]
# How many rules execute at once within a group. Increasing concurrency may speed # How many rules execute at once within a group. Increasing concurrency may speed
# up round execution speed. # up round execution speed.
[ concurrency: <integer> | default = 1 ] [ concurrency: <integer> | default = 1 ]

View file

@ -193,12 +193,13 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l
// It doesn't update internal states of the Rule and meant to be used just // It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling. // to get time series for backfilling.
// It returns ALERT and ALERT_FOR_STATE time series as result. // It returns ALERT and ALERT_FOR_STATE time series as result.
func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) { func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
series, err := ar.q.QueryRange(ctx, ar.Expr, start, end) series, err := ar.q.QueryRange(ctx, ar.Expr, start, end)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var result []prompbmarshal.TimeSeries var result []prompbmarshal.TimeSeries
timestamp2Series := make(map[int64][]prompbmarshal.TimeSeries, 0)
qFn := func(query string) ([]datasource.Metric, error) { qFn := func(query string) ([]datasource.Metric, error) {
return nil, fmt.Errorf("`query` template isn't supported in replay mode") return nil, fmt.Errorf("`query` template isn't supported in replay mode")
} }
@ -210,11 +211,14 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
if ar.For == 0 { // if alert is instant if ar.For == 0 { // if alert is instant
a.State = notifier.StateFiring a.State = notifier.StateFiring
for i := range s.Values { for i := range s.Values {
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) if limit > 0 {
timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...)
} else {
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
}
} }
continue continue
} }
// if alert with For > 0 // if alert with For > 0
prevT := time.Time{} prevT := time.Time{}
for i := range s.Values { for i := range s.Values {
@ -228,9 +232,28 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
a.Start = at a.Start = at
} }
prevT = at prevT = at
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) if limit > 0 {
timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...)
} else {
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
}
} }
} }
if limit <= 0 {
return result, nil
}
sortedTimestamp := make([]int64, 0)
for timestamp := range timestamp2Series {
sortedTimestamp = append(sortedTimestamp, timestamp)
}
sort.Slice(sortedTimestamp, func(i, j int) bool { return sortedTimestamp[i] < sortedTimestamp[j] })
for _, timestamp := range sortedTimestamp {
if len(timestamp2Series[timestamp]) > limit {
logger.Errorf("exec exceeded limit of %d with %d alerts", limit, len(timestamp2Series[timestamp]))
continue
}
result = append(result, timestamp2Series[timestamp]...)
}
return result, nil return result, nil
} }
@ -240,7 +263,7 @@ const resolvedRetention = 15 * time.Minute
// Exec executes AlertingRule expression via the given Querier. // Exec executes AlertingRule expression via the given Querier.
// Based on the Querier results AlertingRule maintains notifier.Alerts // Based on the Querier results AlertingRule maintains notifier.Alerts
func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) { func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
start := time.Now() start := time.Now()
qMetrics, err := ar.q.Query(ctx, ar.Expr, ts) qMetrics, err := ar.q.Query(ctx, ar.Expr, ts)
ar.mu.Lock() ar.mu.Lock()
@ -307,7 +330,7 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal
a.ActiveAt = ts a.ActiveAt = ts
ar.alerts[h] = a ar.alerts[h] = a
} }
var numActivePending int
for h, a := range ar.alerts { for h, a := range ar.alerts {
// if alert wasn't updated in this iteration // if alert wasn't updated in this iteration
// means it is resolved already // means it is resolved already
@ -324,12 +347,17 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal
} }
continue continue
} }
numActivePending++
if a.State == notifier.StatePending && ts.Sub(a.ActiveAt) >= ar.For { if a.State == notifier.StatePending && ts.Sub(a.ActiveAt) >= ar.For {
a.State = notifier.StateFiring a.State = notifier.StateFiring
a.Start = ts a.Start = ts
alertsFired.Inc() alertsFired.Inc()
} }
} }
if limit > 0 && numActivePending > limit {
ar.alerts = map[uint64]*notifier.Alert{}
return nil, fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending)
}
return ar.toTimeSeries(ts.Unix()), nil return ar.toTimeSeries(ts.Unix()), nil
} }

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
@ -304,7 +305,7 @@ func TestAlertingRule_Exec(t *testing.T) {
for _, step := range tc.steps { for _, step := range tc.steps {
fq.reset() fq.reset()
fq.add(step...) fq.add(step...)
if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil { if _, err := tc.rule.Exec(context.TODO(), time.Now(), 0); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
// artificial delay between applying steps // artificial delay between applying steps
@ -472,7 +473,7 @@ func TestAlertingRule_ExecRange(t *testing.T) {
tc.rule.q = fq tc.rule.q = fq
tc.rule.GroupID = fakeGroup.ID() tc.rule.GroupID = fakeGroup.ID()
fq.add(tc.data...) fq.add(tc.data...)
gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now()) gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
@ -624,14 +625,14 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
// successful attempt // successful attempt
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
_, err := ar.Exec(context.TODO(), time.Now()) _, err := ar.Exec(context.TODO(), time.Now(), 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// label `job` will collide with rule extra label and will make both time series equal // label `job` will collide with rule extra label and will make both time series equal
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz"))
_, err = ar.Exec(context.TODO(), time.Now()) _, err = ar.Exec(context.TODO(), time.Now(), 0)
if !errors.Is(err, errDuplicate) { if !errors.Is(err, errDuplicate) {
t.Fatalf("expected to have %s error; got %s", errDuplicate, err) t.Fatalf("expected to have %s error; got %s", errDuplicate, err)
} }
@ -640,7 +641,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
expErr := "connection reset by peer" expErr := "connection reset by peer"
fq.setErr(errors.New(expErr)) fq.setErr(errors.New(expErr))
_, err = ar.Exec(context.TODO(), time.Now()) _, err = ar.Exec(context.TODO(), time.Now(), 0)
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }
@ -649,6 +650,59 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
} }
} }
func TestAlertingRuleLimit(t *testing.T) {
fq := &fakeQuerier{}
ar := newTestAlertingRule("test", 0)
ar.Labels = map[string]string{"job": "test"}
ar.q = fq
ar.For = time.Minute
testCases := []struct {
limit int
err string
tssNum int
}{
{
limit: 0,
tssNum: 4,
},
{
limit: -1,
tssNum: 4,
},
{
limit: 1,
err: "exec exceeded limit of 1 with 2 alerts",
tssNum: 0,
},
{
limit: 4,
tssNum: 4,
},
}
var (
err error
timestamp = time.Now()
)
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "bar", "job"))
for _, testCase := range testCases {
_, err = ar.Exec(context.TODO(), timestamp, testCase.limit)
if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err)
}
}
for _, testCase := range testCases {
tss, err := ar.ExecRange(context.TODO(), timestamp, timestamp, testCase.limit)
if err != nil {
t.Fatal(err)
}
if len(tss) != testCase.tssNum {
t.Fatal(fmt.Errorf("tss len %d is not equal to supposed %d", len(tss), testCase.tssNum))
}
}
fq.reset()
}
func TestAlertingRule_Template(t *testing.T) { func TestAlertingRule_Template(t *testing.T) {
testCases := []struct { testCases := []struct {
rule *AlertingRule rule *AlertingRule
@ -761,7 +815,7 @@ func TestAlertingRule_Template(t *testing.T) {
tc.rule.GroupID = fakeGroup.ID() tc.rule.GroupID = fakeGroup.ID()
tc.rule.q = fq tc.rule.q = fq
fq.add(tc.metrics...) fq.add(tc.metrics...)
if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil { if _, err := tc.rule.Exec(context.TODO(), time.Now(), 0); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
for hash, expAlert := range tc.expAlerts { for hash, expAlert := range tc.expAlerts {

View file

@ -27,6 +27,7 @@ type Group struct {
File string File string
Name string `yaml:"name"` Name string `yaml:"name"`
Interval *promutils.Duration `yaml:"interval,omitempty"` Interval *promutils.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []Rule `yaml:"rules"` Rules []Rule `yaml:"rules"`
Concurrency int `yaml:"concurrency"` Concurrency int `yaml:"concurrency"`
// ExtraFilterLabels is a list label filters applied to every rule // ExtraFilterLabels is a list label filters applied to every rule

View file

@ -10,6 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
@ -18,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
) )
// Group is an entity for grouping rules // Group is an entity for grouping rules
@ -29,6 +30,7 @@ type Group struct {
Rules []Rule Rules []Rule
Type datasource.Type Type datasource.Type
Interval time.Duration Interval time.Duration
Limit int
Concurrency int Concurrency int
Checksum string Checksum string
LastEvaluation time.Time LastEvaluation time.Time
@ -90,6 +92,7 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
Name: cfg.Name, Name: cfg.Name,
File: cfg.File, File: cfg.File,
Interval: cfg.Interval.Duration(), Interval: cfg.Interval.Duration(),
Limit: cfg.Limit,
Concurrency: cfg.Concurrency, Concurrency: cfg.Concurrency,
Checksum: cfg.Checksum, Checksum: cfg.Checksum,
Params: cfg.Params, Params: cfg.Params,
@ -282,7 +285,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
} }
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration) resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration) errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
for err := range errs { for err := range errs {
if err != nil { if err != nil {
logger.Errorf("group %q: %s", g.Name, err) logger.Errorf("group %q: %s", g.Name, err)
@ -360,12 +363,12 @@ type executor struct {
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
} }
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration) chan error { func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error {
res := make(chan error, len(rules)) res := make(chan error, len(rules))
if concurrency == 1 { if concurrency == 1 {
// fast path // fast path
for _, rule := range rules { for _, rule := range rules {
res <- e.exec(ctx, rule, ts, resolveDuration) res <- e.exec(ctx, rule, ts, resolveDuration, limit)
} }
close(res) close(res)
return res return res
@ -378,7 +381,7 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.T
sem <- struct{}{} sem <- struct{}{}
wg.Add(1) wg.Add(1)
go func(r Rule) { go func(r Rule) {
res <- e.exec(ctx, r, ts, resolveDuration) res <- e.exec(ctx, r, ts, resolveDuration, limit)
<-sem <-sem
wg.Done() wg.Done()
}(rule) }(rule)
@ -399,10 +402,10 @@ var (
remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`) remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`)
) )
func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration) error { func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration, limit int) error {
execTotal.Inc() execTotal.Inc()
tss, err := rule.Exec(ctx, ts) tss, err := rule.Exec(ctx, ts, limit)
if err != nil { if err != nil {
execErrors.Inc() execErrors.Inc()
return fmt.Errorf("rule %q: failed to execute: %w", rule, err) return fmt.Errorf("rule %q: failed to execute: %w", rule, err)

View file

@ -104,7 +104,7 @@ func (rr *RecordingRule) Close() {
// ExecRange executes recording rule on the given time range similarly to Exec. // ExecRange executes recording rule on the given time range similarly to Exec.
// It doesn't update internal states of the Rule and meant to be used just // It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling. // to get time series for backfilling.
func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) { func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
series, err := rr.q.QueryRange(ctx, rr.Expr, start, end) series, err := rr.q.QueryRange(ctx, rr.Expr, start, end)
if err != nil { if err != nil {
return nil, err return nil, err
@ -120,11 +120,14 @@ func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([
duplicates[key] = struct{}{} duplicates[key] = struct{}{}
tss = append(tss, ts) tss = append(tss, ts)
} }
if limit > 0 && len(tss) > limit {
return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, len(tss))
}
return tss, nil return tss, nil
} }
// Exec executes RecordingRule expression via the given Querier. // Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) { func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
qMetrics, err := rr.q.Query(ctx, rr.Expr, ts) qMetrics, err := rr.q.Query(ctx, rr.Expr, ts)
rr.mu.Lock() rr.mu.Lock()
defer rr.mu.Unlock() defer rr.mu.Unlock()
@ -137,6 +140,11 @@ func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarsha
return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err) return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err)
} }
numSeries := len(qMetrics)
if limit > 0 && numSeries > limit {
return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, numSeries)
}
duplicates := make(map[string]struct{}, len(qMetrics)) duplicates := make(map[string]struct{}, len(qMetrics))
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
for _, r := range qMetrics { for _, r := range qMetrics {

View file

@ -11,7 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
func TestRecoridngRule_Exec(t *testing.T) { func TestRecordingRule_Exec(t *testing.T) {
timestamp := time.Now() timestamp := time.Now()
testCases := []struct { testCases := []struct {
rule *RecordingRule rule *RecordingRule
@ -77,7 +77,7 @@ func TestRecoridngRule_Exec(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
fq.add(tc.metrics...) fq.add(tc.metrics...)
tc.rule.q = fq tc.rule.q = fq
tss, err := tc.rule.Exec(context.TODO(), time.Now()) tss, err := tc.rule.Exec(context.TODO(), time.Now(), 0)
if err != nil { if err != nil {
t.Fatalf("unexpected Exec err: %s", err) t.Fatalf("unexpected Exec err: %s", err)
} }
@ -88,7 +88,7 @@ func TestRecoridngRule_Exec(t *testing.T) {
} }
} }
func TestRecoridngRule_ExecRange(t *testing.T) { func TestRecordingRule_ExecRange(t *testing.T) {
timestamp := time.Now() timestamp := time.Now()
testCases := []struct { testCases := []struct {
rule *RecordingRule rule *RecordingRule
@ -158,7 +158,7 @@ func TestRecoridngRule_ExecRange(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
fq.add(tc.metrics...) fq.add(tc.metrics...)
tc.rule.q = fq tc.rule.q = fq
tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now()) tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0)
if err != nil { if err != nil {
t.Fatalf("unexpected Exec err: %s", err) t.Fatalf("unexpected Exec err: %s", err)
} }
@ -169,7 +169,52 @@ func TestRecoridngRule_ExecRange(t *testing.T) {
} }
} }
func TestRecoridngRule_ExecNegative(t *testing.T) { func TestRecordingRuleLimit(t *testing.T) {
timestamp := time.Now()
testCases := []struct {
limit int
err string
}{
{
limit: 0,
},
{
limit: -1,
},
{
limit: 1,
err: "exec exceeded limit of 1 with 3 series",
},
{
limit: 2,
err: "exec exceeded limit of 2 with 3 series",
},
}
testMetrics := []datasource.Metric{
metricWithValuesAndLabels(t, []float64{1}, "__name__", "foo", "job", "foo"),
metricWithValuesAndLabels(t, []float64{2, 3}, "__name__", "bar", "job", "bar"),
metricWithValuesAndLabels(t, []float64{4, 5, 6}, "__name__", "baz", "job", "baz"),
}
rule := &RecordingRule{Name: "job:foo", Labels: map[string]string{
"source": "test_limit",
}}
var err error
for _, testCase := range testCases {
fq := &fakeQuerier{}
fq.add(testMetrics...)
rule.q = fq
_, err = rule.Exec(context.TODO(), timestamp, testCase.limit)
if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err)
}
_, err = rule.ExecRange(context.TODO(), timestamp.Add(-2*time.Second), timestamp, testCase.limit)
if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err)
}
}
}
func TestRecordingRule_ExecNegative(t *testing.T) {
rr := &RecordingRule{Name: "job:foo", Labels: map[string]string{ rr := &RecordingRule{Name: "job:foo", Labels: map[string]string{
"job": "test", "job": "test",
}} }}
@ -178,7 +223,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
expErr := "connection reset by peer" expErr := "connection reset by peer"
fq.setErr(errors.New(expErr)) fq.setErr(errors.New(expErr))
rr.q = fq rr.q = fq
_, err := rr.Exec(context.TODO(), time.Now()) _, err := rr.Exec(context.TODO(), time.Now(), 0)
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }
@ -193,7 +238,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"))
fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar")) fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar"))
_, err = rr.Exec(context.TODO(), time.Now()) _, err = rr.Exec(context.TODO(), time.Now(), 0)
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }

View file

@ -7,12 +7,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/dmitryk-dk/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/dmitryk-dk/pb/v3"
) )
var ( var (
@ -95,7 +96,7 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
} }
ri.reset() ri.reset()
for ri.next() { for ri.next() {
n, err := replayRule(rule, ri.s, ri.e, rw) n, err := replayRule(rule, ri.s, ri.e, rw, g.Limit)
if err != nil { if err != nil {
logger.Fatalf("rule %q: %s", rule, err) logger.Fatalf("rule %q: %s", rule, err)
} }
@ -114,11 +115,11 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
return total return total
} }
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) { func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client, limit int) (int, error) {
var err error var err error
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
for i := 0; i < *replayRuleRetryAttempts; i++ { for i := 0; i < *replayRuleRetryAttempts; i++ {
tss, err = rule.ExecRange(context.Background(), start, end) tss, err = rule.ExecRange(context.Background(), start, end, limit)
if err == nil { if err == nil {
break break
} }

View file

@ -15,10 +15,12 @@ type Rule interface {
// ID returns unique ID that may be used for // ID returns unique ID that may be used for
// identifying this Rule among others. // identifying this Rule among others.
ID() uint64 ID() uint64
// Exec executes the rule with given context at the given timestamp // Exec executes the rule with given context at the given timestamp and limit.
Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) // returns an err if number of resulting time series exceeds the limit.
// ExecRange executes the rule on the given time range Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) // ExecRange executes the rule on the given time range and limit.
// returns an err if number of resulting time series exceeds the limit.
ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
// UpdateWith performs modification of current Rule // UpdateWith performs modification of current Rule
// with fields of the given Rule. // with fields of the given Rule.
UpdateWith(Rule) error UpdateWith(Rule) error

View file

@ -105,6 +105,10 @@ name: <string>
# How often rules in the group are evaluated. # How often rules in the group are evaluated.
[ interval: <duration> | default = -evaluationInterval flag ] [ interval: <duration> | default = -evaluationInterval flag ]
# Limit the number of alerts an alerting rule and series a recording
# rule can produce. 0 is no limit.
[ limit: <int> | default = 0 ]
# How many rules execute at once within a group. Increasing concurrency may speed # How many rules execute at once within a group. Increasing concurrency may speed
# up round execution speed. # up round execution speed.
[ concurrency: <integer> | default = 1 ] [ concurrency: <integer> | default = 1 ]

View file

@ -40,7 +40,6 @@ func Init() {
initTimezone() initTimezone()
go logLimiterCleaner() go logLimiterCleaner()
logAllFlags() logAllFlags()
} }
func initTimezone() { func initTimezone() {
@ -79,7 +78,7 @@ func validateLoggerFormat() {
switch *loggerFormat { switch *loggerFormat {
case "default", "json": case "default", "json":
default: default:
// We cannot use logger.Pancif here, since the logger isn't initialized yet. // We cannot use logger.Panicf here, since the logger isn't initialized yet.
panic(fmt.Errorf("FATAL: unsupported `-loggerFormat` value: %q; supported values are: default, json", *loggerFormat)) panic(fmt.Errorf("FATAL: unsupported `-loggerFormat` value: %q; supported values are: default, json", *loggerFormat))
} }
} }