feat: rule limit (#2676)

vmalert: support `limit` param in groups definition

`limit` param limits number of time series samples produced by a single rule
during execution.
On reaching the limit rule will return an err.

Signed-off-by: lihaowei <haoweili35@gmail.com>
This commit is contained in:
Howie 2022-06-09 14:21:30 +08:00 committed by Aliaksandr Valialkin
parent 7fe60dad10
commit 4afd7aa695
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
11 changed files with 187 additions and 38 deletions

View file

@ -101,6 +101,10 @@ name: <string>
# How often rules in the group are evaluated.
[ interval: <duration> | default = -evaluationInterval flag ]
# Limit the number of alerts an alerting rule and series a recording
# rule can produce. 0 is no limit.
[ limit: <int> | default = 0 ]
# How many rules execute at once within a group. Increasing concurrency may speed
# up round execution speed.
[ concurrency: <integer> | default = 1 ]

View file

@ -193,12 +193,13 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l
// It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling.
// It returns ALERT and ALERT_FOR_STATE time series as result.
func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
series, err := ar.q.QueryRange(ctx, ar.Expr, start, end)
if err != nil {
return nil, err
}
var result []prompbmarshal.TimeSeries
timestamp2Series := make(map[int64][]prompbmarshal.TimeSeries, 0)
qFn := func(query string) ([]datasource.Metric, error) {
return nil, fmt.Errorf("`query` template isn't supported in replay mode")
}
@ -210,11 +211,14 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
if ar.For == 0 { // if alert is instant
a.State = notifier.StateFiring
for i := range s.Values {
if limit > 0 {
timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...)
} else {
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
}
}
continue
}
// if alert with For > 0
prevT := time.Time{}
for i := range s.Values {
@ -228,9 +232,28 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
a.Start = at
}
prevT = at
if limit > 0 {
timestamp2Series[s.Timestamps[i]] = append(timestamp2Series[s.Timestamps[i]], ar.alertToTimeSeries(a, s.Timestamps[i])...)
} else {
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
}
}
}
if limit <= 0 {
return result, nil
}
sortedTimestamp := make([]int64, 0)
for timestamp := range timestamp2Series {
sortedTimestamp = append(sortedTimestamp, timestamp)
}
sort.Slice(sortedTimestamp, func(i, j int) bool { return sortedTimestamp[i] < sortedTimestamp[j] })
for _, timestamp := range sortedTimestamp {
if len(timestamp2Series[timestamp]) > limit {
logger.Errorf("exec exceeded limit of %d with %d alerts", limit, len(timestamp2Series[timestamp]))
continue
}
result = append(result, timestamp2Series[timestamp]...)
}
return result, nil
}
@ -240,7 +263,7 @@ const resolvedRetention = 15 * time.Minute
// Exec executes AlertingRule expression via the given Querier.
// Based on the Querier results AlertingRule maintains notifier.Alerts
func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) {
func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
start := time.Now()
qMetrics, err := ar.q.Query(ctx, ar.Expr, ts)
ar.mu.Lock()
@ -307,7 +330,7 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal
a.ActiveAt = ts
ar.alerts[h] = a
}
var numActivePending int
for h, a := range ar.alerts {
// if alert wasn't updated in this iteration
// means it is resolved already
@ -324,12 +347,17 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal
}
continue
}
numActivePending++
if a.State == notifier.StatePending && ts.Sub(a.ActiveAt) >= ar.For {
a.State = notifier.StateFiring
a.Start = ts
alertsFired.Inc()
}
}
if limit > 0 && numActivePending > limit {
ar.alerts = map[uint64]*notifier.Alert{}
return nil, fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending)
}
return ar.toTimeSeries(ts.Unix()), nil
}

View file

@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"strings"
@ -304,7 +305,7 @@ func TestAlertingRule_Exec(t *testing.T) {
for _, step := range tc.steps {
fq.reset()
fq.add(step...)
if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil {
if _, err := tc.rule.Exec(context.TODO(), time.Now(), 0); err != nil {
t.Fatalf("unexpected err: %s", err)
}
// artificial delay between applying steps
@ -472,7 +473,7 @@ func TestAlertingRule_ExecRange(t *testing.T) {
tc.rule.q = fq
tc.rule.GroupID = fakeGroup.ID()
fq.add(tc.data...)
gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now())
gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
@ -624,14 +625,14 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
// successful attempt
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
_, err := ar.Exec(context.TODO(), time.Now())
_, err := ar.Exec(context.TODO(), time.Now(), 0)
if err != nil {
t.Fatal(err)
}
// label `job` will collide with rule extra label and will make both time series equal
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz"))
_, err = ar.Exec(context.TODO(), time.Now())
_, err = ar.Exec(context.TODO(), time.Now(), 0)
if !errors.Is(err, errDuplicate) {
t.Fatalf("expected to have %s error; got %s", errDuplicate, err)
}
@ -640,7 +641,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
expErr := "connection reset by peer"
fq.setErr(errors.New(expErr))
_, err = ar.Exec(context.TODO(), time.Now())
_, err = ar.Exec(context.TODO(), time.Now(), 0)
if err == nil {
t.Fatalf("expected to get err; got nil")
}
@ -649,6 +650,59 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
}
}
func TestAlertingRuleLimit(t *testing.T) {
fq := &fakeQuerier{}
ar := newTestAlertingRule("test", 0)
ar.Labels = map[string]string{"job": "test"}
ar.q = fq
ar.For = time.Minute
testCases := []struct {
limit int
err string
tssNum int
}{
{
limit: 0,
tssNum: 4,
},
{
limit: -1,
tssNum: 4,
},
{
limit: 1,
err: "exec exceeded limit of 1 with 2 alerts",
tssNum: 0,
},
{
limit: 4,
tssNum: 4,
},
}
var (
err error
timestamp = time.Now()
)
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "bar", "job"))
for _, testCase := range testCases {
_, err = ar.Exec(context.TODO(), timestamp, testCase.limit)
if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err)
}
}
for _, testCase := range testCases {
tss, err := ar.ExecRange(context.TODO(), timestamp, timestamp, testCase.limit)
if err != nil {
t.Fatal(err)
}
if len(tss) != testCase.tssNum {
t.Fatal(fmt.Errorf("tss len %d is not equal to supposed %d", len(tss), testCase.tssNum))
}
}
fq.reset()
}
func TestAlertingRule_Template(t *testing.T) {
testCases := []struct {
rule *AlertingRule
@ -761,7 +815,7 @@ func TestAlertingRule_Template(t *testing.T) {
tc.rule.GroupID = fakeGroup.ID()
tc.rule.q = fq
fq.add(tc.metrics...)
if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil {
if _, err := tc.rule.Exec(context.TODO(), time.Now(), 0); err != nil {
t.Fatalf("unexpected err: %s", err)
}
for hash, expAlert := range tc.expAlerts {

View file

@ -27,6 +27,7 @@ type Group struct {
File string
Name string `yaml:"name"`
Interval *promutils.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []Rule `yaml:"rules"`
Concurrency int `yaml:"concurrency"`
// ExtraFilterLabels is a list label filters applied to every rule

View file

@ -10,6 +10,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
@ -18,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
// Group is an entity for grouping rules
@ -29,6 +30,7 @@ type Group struct {
Rules []Rule
Type datasource.Type
Interval time.Duration
Limit int
Concurrency int
Checksum string
LastEvaluation time.Time
@ -90,6 +92,7 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
Name: cfg.Name,
File: cfg.File,
Interval: cfg.Interval.Duration(),
Limit: cfg.Limit,
Concurrency: cfg.Concurrency,
Checksum: cfg.Checksum,
Params: cfg.Params,
@ -282,7 +285,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
}
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration)
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
for err := range errs {
if err != nil {
logger.Errorf("group %q: %s", g.Name, err)
@ -360,12 +363,12 @@ type executor struct {
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
}
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration) chan error {
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error {
res := make(chan error, len(rules))
if concurrency == 1 {
// fast path
for _, rule := range rules {
res <- e.exec(ctx, rule, ts, resolveDuration)
res <- e.exec(ctx, rule, ts, resolveDuration, limit)
}
close(res)
return res
@ -378,7 +381,7 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.T
sem <- struct{}{}
wg.Add(1)
go func(r Rule) {
res <- e.exec(ctx, r, ts, resolveDuration)
res <- e.exec(ctx, r, ts, resolveDuration, limit)
<-sem
wg.Done()
}(rule)
@ -399,10 +402,10 @@ var (
remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`)
)
func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration) error {
func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration, limit int) error {
execTotal.Inc()
tss, err := rule.Exec(ctx, ts)
tss, err := rule.Exec(ctx, ts, limit)
if err != nil {
execErrors.Inc()
return fmt.Errorf("rule %q: failed to execute: %w", rule, err)

View file

@ -104,7 +104,7 @@ func (rr *RecordingRule) Close() {
// ExecRange executes recording rule on the given time range similarly to Exec.
// It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling.
func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
series, err := rr.q.QueryRange(ctx, rr.Expr, start, end)
if err != nil {
return nil, err
@ -120,11 +120,14 @@ func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([
duplicates[key] = struct{}{}
tss = append(tss, ts)
}
if limit > 0 && len(tss) > limit {
return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, len(tss))
}
return tss, nil
}
// Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) {
func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
qMetrics, err := rr.q.Query(ctx, rr.Expr, ts)
rr.mu.Lock()
defer rr.mu.Unlock()
@ -137,6 +140,11 @@ func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarsha
return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err)
}
numSeries := len(qMetrics)
if limit > 0 && numSeries > limit {
return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, numSeries)
}
duplicates := make(map[string]struct{}, len(qMetrics))
var tss []prompbmarshal.TimeSeries
for _, r := range qMetrics {

View file

@ -11,7 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestRecoridngRule_Exec(t *testing.T) {
func TestRecordingRule_Exec(t *testing.T) {
timestamp := time.Now()
testCases := []struct {
rule *RecordingRule
@ -77,7 +77,7 @@ func TestRecoridngRule_Exec(t *testing.T) {
fq := &fakeQuerier{}
fq.add(tc.metrics...)
tc.rule.q = fq
tss, err := tc.rule.Exec(context.TODO(), time.Now())
tss, err := tc.rule.Exec(context.TODO(), time.Now(), 0)
if err != nil {
t.Fatalf("unexpected Exec err: %s", err)
}
@ -88,7 +88,7 @@ func TestRecoridngRule_Exec(t *testing.T) {
}
}
func TestRecoridngRule_ExecRange(t *testing.T) {
func TestRecordingRule_ExecRange(t *testing.T) {
timestamp := time.Now()
testCases := []struct {
rule *RecordingRule
@ -158,7 +158,7 @@ func TestRecoridngRule_ExecRange(t *testing.T) {
fq := &fakeQuerier{}
fq.add(tc.metrics...)
tc.rule.q = fq
tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now())
tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now(), 0)
if err != nil {
t.Fatalf("unexpected Exec err: %s", err)
}
@ -169,7 +169,52 @@ func TestRecoridngRule_ExecRange(t *testing.T) {
}
}
func TestRecoridngRule_ExecNegative(t *testing.T) {
func TestRecordingRuleLimit(t *testing.T) {
timestamp := time.Now()
testCases := []struct {
limit int
err string
}{
{
limit: 0,
},
{
limit: -1,
},
{
limit: 1,
err: "exec exceeded limit of 1 with 3 series",
},
{
limit: 2,
err: "exec exceeded limit of 2 with 3 series",
},
}
testMetrics := []datasource.Metric{
metricWithValuesAndLabels(t, []float64{1}, "__name__", "foo", "job", "foo"),
metricWithValuesAndLabels(t, []float64{2, 3}, "__name__", "bar", "job", "bar"),
metricWithValuesAndLabels(t, []float64{4, 5, 6}, "__name__", "baz", "job", "baz"),
}
rule := &RecordingRule{Name: "job:foo", Labels: map[string]string{
"source": "test_limit",
}}
var err error
for _, testCase := range testCases {
fq := &fakeQuerier{}
fq.add(testMetrics...)
rule.q = fq
_, err = rule.Exec(context.TODO(), timestamp, testCase.limit)
if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err)
}
_, err = rule.ExecRange(context.TODO(), timestamp.Add(-2*time.Second), timestamp, testCase.limit)
if err != nil && !strings.EqualFold(err.Error(), testCase.err) {
t.Fatal(err)
}
}
}
func TestRecordingRule_ExecNegative(t *testing.T) {
rr := &RecordingRule{Name: "job:foo", Labels: map[string]string{
"job": "test",
}}
@ -178,7 +223,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
expErr := "connection reset by peer"
fq.setErr(errors.New(expErr))
rr.q = fq
_, err := rr.Exec(context.TODO(), time.Now())
_, err := rr.Exec(context.TODO(), time.Now(), 0)
if err == nil {
t.Fatalf("expected to get err; got nil")
}
@ -193,7 +238,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"))
fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar"))
_, err = rr.Exec(context.TODO(), time.Now())
_, err = rr.Exec(context.TODO(), time.Now(), 0)
if err == nil {
t.Fatalf("expected to get err; got nil")
}

View file

@ -7,12 +7,13 @@ import (
"strings"
"time"
"github.com/dmitryk-dk/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/dmitryk-dk/pb/v3"
)
var (
@ -95,7 +96,7 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
}
ri.reset()
for ri.next() {
n, err := replayRule(rule, ri.s, ri.e, rw)
n, err := replayRule(rule, ri.s, ri.e, rw, g.Limit)
if err != nil {
logger.Fatalf("rule %q: %s", rule, err)
}
@ -114,11 +115,11 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
return total
}
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client, limit int) (int, error) {
var err error
var tss []prompbmarshal.TimeSeries
for i := 0; i < *replayRuleRetryAttempts; i++ {
tss, err = rule.ExecRange(context.Background(), start, end)
tss, err = rule.ExecRange(context.Background(), start, end, limit)
if err == nil {
break
}

View file

@ -15,10 +15,12 @@ type Rule interface {
// ID returns unique ID that may be used for
// identifying this Rule among others.
ID() uint64
// Exec executes the rule with given context at the given timestamp
Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error)
// ExecRange executes the rule on the given time range
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
// Exec executes the rule with given context at the given timestamp and limit.
// returns an err if number of resulting time series exceeds the limit.
Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
// ExecRange executes the rule on the given time range and limit.
// returns an err if number of resulting time series exceeds the limit.
ExecRange(ctx context.Context, start, end time.Time, limit int) ([]prompbmarshal.TimeSeries, error)
// UpdateWith performs modification of current Rule
// with fields of the given Rule.
UpdateWith(Rule) error

View file

@ -105,6 +105,10 @@ name: <string>
# How often rules in the group are evaluated.
[ interval: <duration> | default = -evaluationInterval flag ]
# Limit the number of alerts an alerting rule and series a recording
# rule can produce. 0 is no limit.
[ limit: <int> | default = 0 ]
# How many rules execute at once within a group. Increasing concurrency may speed
# up round execution speed.
[ concurrency: <integer> | default = 1 ]

View file

@ -40,7 +40,6 @@ func Init() {
initTimezone()
go logLimiterCleaner()
logAllFlags()
}
func initTimezone() {
@ -79,7 +78,7 @@ func validateLoggerFormat() {
switch *loggerFormat {
case "default", "json":
default:
// We cannot use logger.Pancif here, since the logger isn't initialized yet.
// We cannot use logger.Panicf here, since the logger isn't initialized yet.
panic(fmt.Errorf("FATAL: unsupported `-loggerFormat` value: %q; supported values are: default, json", *loggerFormat))
}
}