mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
vmalert: add eval_offset
for group (#4693)
Adds `eval_offset` attribute for Groups.
If specified, Group will be evaluated at the exact time offset on the range of [0...evaluationInterval].
The setting might be useful for cron-like rules which must be evaluated at specific moments of time.
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409
Signed-off-by: Haley Wang <pipilong.25@gmail.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 45c0e4bb31
)
This commit is contained in:
parent
e0923f988e
commit
0212219f6c
16 changed files with 361 additions and 68 deletions
|
@ -112,6 +112,13 @@ name: <string>
|
|||
# How often rules in the group are evaluated.
|
||||
[ interval: <duration> | default = -evaluationInterval flag ]
|
||||
|
||||
# Optional
|
||||
# Group will be evaluated at the exact offset in the range of [0...interval].
|
||||
# E.g. for Group with `interval: 1h` and `eval_offset: 5m` the evaluation will
|
||||
# start at 5th minute of the hour. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409
|
||||
# `eval_offset` can't be bigger than `interval`.
|
||||
[ eval_offset: <duration> ]
|
||||
|
||||
# Limit the number of alerts an alerting rule and series a recording
|
||||
# rule can produce. 0 is no limit.
|
||||
[ limit: <int> | default = 0 ]
|
||||
|
|
|
@ -72,6 +72,7 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
|
|||
q: qb.BuildWithParams(datasource.QuerierParams{
|
||||
DataSourceType: group.Type.String(),
|
||||
EvaluationInterval: group.Interval,
|
||||
EvalOffset: group.EvalOffset,
|
||||
QueryParams: group.Params,
|
||||
Headers: group.Headers,
|
||||
Debug: cfg.Debug,
|
||||
|
|
|
@ -427,7 +427,8 @@ func TestAlertingRule_ExecRange(t *testing.T) {
|
|||
newTestAlertingRule("multi-series-for=>pending=>pending=>firing", 3*time.Second),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
|
||||
{Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||
{
|
||||
Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||
Labels: []datasource.Label{{Name: "foo", Value: "bar"}},
|
||||
},
|
||||
},
|
||||
|
@ -436,21 +437,26 @@ func TestAlertingRule_ExecRange(t *testing.T) {
|
|||
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
|
||||
{State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
|
||||
//
|
||||
{State: notifier.StatePending, ActiveAt: time.Unix(1, 0),
|
||||
{
|
||||
State: notifier.StatePending, ActiveAt: time.Unix(1, 0),
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
}},
|
||||
{State: notifier.StatePending, ActiveAt: time.Unix(5, 0),
|
||||
},
|
||||
},
|
||||
{
|
||||
State: notifier.StatePending, ActiveAt: time.Unix(5, 0),
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRuleWithLabels("multi-series-firing", "source", "vm"),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1}, Timestamps: []int64{1, 100}},
|
||||
{Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||
{
|
||||
Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||
Labels: []datasource.Label{{Name: "foo", Value: "bar"}},
|
||||
},
|
||||
},
|
||||
|
@ -586,7 +592,8 @@ func TestGroup_Restore(t *testing.T) {
|
|||
[]config.Rule{{Alert: "foo", Expr: "foo", For: promutils.NewDuration(time.Second)}},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore"}): {
|
||||
ActiveAt: ts},
|
||||
ActiveAt: ts,
|
||||
},
|
||||
})
|
||||
|
||||
// two rules, two active alerts, one with state restored
|
||||
|
@ -603,7 +610,8 @@ func TestGroup_Restore(t *testing.T) {
|
|||
ActiveAt: defaultTS,
|
||||
},
|
||||
hash(map[string]string{alertNameLabel: "bar", alertGroupNameLabel: "TestRestore"}): {
|
||||
ActiveAt: ts},
|
||||
ActiveAt: ts,
|
||||
},
|
||||
})
|
||||
|
||||
// two rules, two active alerts, two with state restored
|
||||
|
@ -622,7 +630,8 @@ func TestGroup_Restore(t *testing.T) {
|
|||
ActiveAt: ts,
|
||||
},
|
||||
hash(map[string]string{alertNameLabel: "bar", alertGroupNameLabel: "TestRestore"}): {
|
||||
ActiveAt: ts},
|
||||
ActiveAt: ts,
|
||||
},
|
||||
})
|
||||
|
||||
// one active alert but wrong state restore
|
||||
|
@ -844,7 +853,8 @@ func TestAlertingRule_Template(t *testing.T) {
|
|||
hash(map[string]string{
|
||||
alertNameLabel: "OriginLabels",
|
||||
alertGroupNameLabel: "Testing",
|
||||
"instance": "foo"}): {
|
||||
"instance": "foo",
|
||||
}): {
|
||||
Labels: map[string]string{
|
||||
alertNameLabel: "OriginLabels",
|
||||
alertGroupNameLabel: "Testing",
|
||||
|
|
|
@ -23,6 +23,7 @@ type Group struct {
|
|||
File string
|
||||
Name string `yaml:"name"`
|
||||
Interval *promutils.Duration `yaml:"interval,omitempty"`
|
||||
EvalOffset *promutils.Duration `yaml:"eval_offset,omitempty"`
|
||||
Limit int `yaml:"limit,omitempty"`
|
||||
Rules []Rule `yaml:"rules"`
|
||||
Concurrency int `yaml:"concurrency"`
|
||||
|
@ -63,11 +64,27 @@ func (g *Group) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Validate check for internal Group or Rule configuration errors
|
||||
// Validate checks configuration errors for group and internal rules
|
||||
func (g *Group) Validate(validateTplFn ValidateTplFn, validateExpressions bool) error {
|
||||
if g.Name == "" {
|
||||
return fmt.Errorf("group name must be set")
|
||||
}
|
||||
if g.Interval.Duration() < 0 {
|
||||
return fmt.Errorf("interval shouldn't be lower than 0")
|
||||
}
|
||||
if g.EvalOffset.Duration() < 0 {
|
||||
return fmt.Errorf("eval_offset shouldn't be lower than 0")
|
||||
}
|
||||
// if `eval_offset` is set, interval won't use global evaluationInterval flag and must bigger than offset.
|
||||
if g.EvalOffset.Duration() > g.Interval.Duration() {
|
||||
return fmt.Errorf("eval_offset should be smaller than interval; now eval_offset: %v, interval: %v", g.EvalOffset.Duration(), g.Interval.Duration())
|
||||
}
|
||||
if g.Limit < 0 {
|
||||
return fmt.Errorf("invalid limit %d, shouldn't be less than 0", g.Limit)
|
||||
}
|
||||
if g.Concurrency < 0 {
|
||||
return fmt.Errorf("invalid concurrency %d, shouldn't be less than 0", g.Concurrency)
|
||||
}
|
||||
|
||||
uniqueRules := map[uint64]struct{}{}
|
||||
for _, r := range g.Rules {
|
||||
|
@ -76,26 +93,26 @@ func (g *Group) Validate(validateTplFn ValidateTplFn, validateExpressions bool)
|
|||
ruleName = r.Alert
|
||||
}
|
||||
if _, ok := uniqueRules[r.ID]; ok {
|
||||
return fmt.Errorf("%q is a duplicate within the group %q", r.String(), g.Name)
|
||||
return fmt.Errorf("%q is a duplicate in group", r.String())
|
||||
}
|
||||
uniqueRules[r.ID] = struct{}{}
|
||||
if err := r.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid rule %q.%q: %w", g.Name, ruleName, err)
|
||||
return fmt.Errorf("invalid rule %q: %w", ruleName, err)
|
||||
}
|
||||
if validateExpressions {
|
||||
// its needed only for tests.
|
||||
// because correct types must be inherited after unmarshalling.
|
||||
exprValidator := g.Type.ValidateExpr
|
||||
if err := exprValidator(r.Expr); err != nil {
|
||||
return fmt.Errorf("invalid expression for rule %q.%q: %w", g.Name, ruleName, err)
|
||||
return fmt.Errorf("invalid expression for rule %q: %w", ruleName, err)
|
||||
}
|
||||
}
|
||||
if validateTplFn != nil {
|
||||
if err := validateTplFn(r.Annotations); err != nil {
|
||||
return fmt.Errorf("invalid annotations for rule %q.%q: %w", g.Name, ruleName, err)
|
||||
return fmt.Errorf("invalid annotations for rule %q: %w", ruleName, err)
|
||||
}
|
||||
if err := validateTplFn(r.Labels); err != nil {
|
||||
return fmt.Errorf("invalid labels for rule %q.%q: %w", g.Name, ruleName, err)
|
||||
return fmt.Errorf("invalid labels for rule %q: %w", ruleName, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,10 @@ func TestParseBad(t *testing.T) {
|
|||
path []string
|
||||
expErr string
|
||||
}{
|
||||
{
|
||||
[]string{"testdata/rules/rules_interval_bad.rules"},
|
||||
"eval_offset should be smaller than interval",
|
||||
},
|
||||
{
|
||||
[]string{"testdata/rules/rules0-bad.rules"},
|
||||
"unexpected token",
|
||||
|
@ -141,6 +145,35 @@ func TestGroup_Validate(t *testing.T) {
|
|||
group: &Group{},
|
||||
expErr: "group name must be set",
|
||||
},
|
||||
{
|
||||
group: &Group{
|
||||
Name: "negative interval",
|
||||
Interval: promutils.NewDuration(-1),
|
||||
},
|
||||
expErr: "interval shouldn't be lower than 0",
|
||||
},
|
||||
{
|
||||
group: &Group{
|
||||
Name: "wrong eval_offset",
|
||||
Interval: promutils.NewDuration(time.Minute),
|
||||
EvalOffset: promutils.NewDuration(2 * time.Minute),
|
||||
},
|
||||
expErr: "eval_offset should be smaller than interval",
|
||||
},
|
||||
{
|
||||
group: &Group{
|
||||
Name: "wrong limit",
|
||||
Limit: -1,
|
||||
},
|
||||
expErr: "invalid limit",
|
||||
},
|
||||
{
|
||||
group: &Group{
|
||||
Name: "wrong concurrency",
|
||||
Concurrency: -1,
|
||||
},
|
||||
expErr: "invalid concurrency",
|
||||
},
|
||||
{
|
||||
group: &Group{
|
||||
Name: "test",
|
||||
|
|
13
app/vmalert/config/testdata/rules/rules_interval_bad.rules
vendored
Normal file
13
app/vmalert/config/testdata/rules/rules_interval_bad.rules
vendored
Normal file
|
@ -0,0 +1,13 @@
|
|||
groups:
|
||||
- name: groupTest
|
||||
## default interval is 1min, eval_offset shouldn't be greater than interval
|
||||
eval_offset: 2m
|
||||
rules:
|
||||
- alert: VMRows
|
||||
for: 2s
|
||||
expr: sum(rate(vm_http_request_errors_total[2s])) > 0
|
||||
labels:
|
||||
label: bar
|
||||
host: "{{ $labels.instance }}"
|
||||
annotations:
|
||||
summary: "{{ $value }}"
|
|
@ -44,6 +44,7 @@ type QuerierBuilder interface {
|
|||
type QuerierParams struct {
|
||||
DataSourceType string
|
||||
EvaluationInterval time.Duration
|
||||
EvalOffset *time.Duration
|
||||
QueryParams url.Values
|
||||
Headers map[string]string
|
||||
Debug bool
|
||||
|
|
|
@ -37,11 +37,20 @@ type VMStorage struct {
|
|||
appendTypePrefix bool
|
||||
lookBack time.Duration
|
||||
queryStep time.Duration
|
||||
dataSourceType datasourceType
|
||||
|
||||
dataSourceType datasourceType
|
||||
// evaluationInterval will align the request's timestamp
|
||||
// if `datasource.queryTimeAlignment` is enabled,
|
||||
// will set request's `step` param as well.
|
||||
evaluationInterval time.Duration
|
||||
extraParams url.Values
|
||||
extraHeaders []keyValue
|
||||
// evaluationOffset shifts the request's timestamp, will be equal
|
||||
// to the offset specified evaluationInterval.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4693
|
||||
evaluationOffset *time.Duration
|
||||
// extraParams contains params to be attached to each HTTP request
|
||||
extraParams url.Values
|
||||
// extraHeaders are headers to be attached to each HTTP request
|
||||
extraHeaders []keyValue
|
||||
|
||||
// whether to print additional log messages
|
||||
// for each sent request
|
||||
|
@ -86,6 +95,7 @@ func (s *VMStorage) Clone() *VMStorage {
|
|||
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
|
||||
s.dataSourceType = toDatasourceType(params.DataSourceType)
|
||||
s.evaluationInterval = params.EvaluationInterval
|
||||
s.evaluationOffset = params.EvalOffset
|
||||
if params.QueryParams != nil {
|
||||
if s.extraParams == nil {
|
||||
s.extraParams = url.Values{}
|
||||
|
|
|
@ -161,13 +161,8 @@ func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string,
|
|||
r.URL.Path += "/api/v1/query"
|
||||
}
|
||||
q := r.URL.Query()
|
||||
if s.lookBack > 0 {
|
||||
timestamp = timestamp.Add(-s.lookBack)
|
||||
}
|
||||
if *queryTimeAlignment && s.evaluationInterval > 0 {
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
|
||||
timestamp = timestamp.Truncate(s.evaluationInterval)
|
||||
}
|
||||
|
||||
timestamp = s.adjustReqTimestamp(timestamp)
|
||||
q.Set("time", timestamp.Format(time.RFC3339))
|
||||
if !*disableStepParam && s.evaluationInterval > 0 { // set step as evaluationInterval by default
|
||||
// always convert to seconds to keep compatibility with older
|
||||
|
@ -191,6 +186,9 @@ func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, s
|
|||
r.URL.Path += "/api/v1/query_range"
|
||||
}
|
||||
q := r.URL.Query()
|
||||
if s.evaluationOffset != nil {
|
||||
start = start.Truncate(s.evaluationInterval).Add(*s.evaluationOffset)
|
||||
}
|
||||
q.Add("start", start.Format(time.RFC3339))
|
||||
q.Add("end", end.Format(time.RFC3339))
|
||||
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
|
||||
|
@ -215,3 +213,30 @@ func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
|
|||
q.Set("query", query)
|
||||
r.URL.RawQuery = q.Encode()
|
||||
}
|
||||
|
||||
func (s *VMStorage) adjustReqTimestamp(timestamp time.Time) time.Time {
|
||||
if s.evaluationOffset != nil {
|
||||
// calculate the min timestamp on the evaluationInterval
|
||||
intervalStart := timestamp.Truncate(s.evaluationInterval)
|
||||
ts := intervalStart.Add(*s.evaluationOffset)
|
||||
if timestamp.Before(ts) {
|
||||
// if passed timestamp is before the expected evaluation offset,
|
||||
// then we should adjust it to the previous evaluation round.
|
||||
// E.g. request with evaluationInterval=1h and evaluationOffset=30m
|
||||
// was evaluated at 11:20. Then the timestamp should be adjusted
|
||||
// to 10:30, to the previous evaluationInterval.
|
||||
return ts.Add(-s.evaluationInterval)
|
||||
}
|
||||
// evaluationOffset shouldn't interfere with queryTimeAlignment or lookBack,
|
||||
// so we return it immediately
|
||||
return ts
|
||||
}
|
||||
if *queryTimeAlignment {
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
|
||||
timestamp = timestamp.Truncate(s.evaluationInterval)
|
||||
}
|
||||
if s.lookBack > 0 {
|
||||
timestamp = timestamp.Add(-s.lookBack)
|
||||
}
|
||||
return timestamp
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package datasource
|
|||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func BenchmarkMetrics(b *testing.B) {
|
||||
|
@ -18,3 +19,74 @@ func BenchmarkMetrics(b *testing.B) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetPrometheusReqTimestamp(t *testing.T) {
|
||||
offset := 30 * time.Minute
|
||||
testCases := []struct {
|
||||
name string
|
||||
s *VMStorage
|
||||
queryTimeAlignment bool
|
||||
originTS, expTS string
|
||||
}{
|
||||
{
|
||||
"with eval_offset, find previous offset point",
|
||||
&VMStorage{
|
||||
evaluationOffset: &offset,
|
||||
evaluationInterval: time.Hour,
|
||||
lookBack: 1 * time.Minute,
|
||||
},
|
||||
false,
|
||||
"2023-08-28T11:11:00+00:00",
|
||||
"2023-08-28T10:30:00+00:00",
|
||||
},
|
||||
{
|
||||
"with eval_offset",
|
||||
&VMStorage{
|
||||
evaluationOffset: &offset,
|
||||
evaluationInterval: time.Hour,
|
||||
},
|
||||
true,
|
||||
"2023-08-28T11:41:00+00:00",
|
||||
"2023-08-28T11:30:00+00:00",
|
||||
},
|
||||
{
|
||||
"with query align",
|
||||
&VMStorage{
|
||||
evaluationInterval: time.Hour,
|
||||
},
|
||||
true,
|
||||
"2023-08-28T11:11:00+00:00",
|
||||
"2023-08-28T11:00:00+00:00",
|
||||
},
|
||||
{
|
||||
"with query align and lookback",
|
||||
&VMStorage{
|
||||
evaluationInterval: time.Hour,
|
||||
lookBack: 1 * time.Minute,
|
||||
},
|
||||
true,
|
||||
"2023-08-28T11:11:00+00:00",
|
||||
"2023-08-28T10:59:00+00:00",
|
||||
},
|
||||
{
|
||||
"without query align",
|
||||
&VMStorage{
|
||||
evaluationInterval: time.Hour,
|
||||
},
|
||||
false,
|
||||
"2023-08-28T11:11:00+00:00",
|
||||
"2023-08-28T11:11:00+00:00",
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
oldAlignPara := *queryTimeAlignment
|
||||
*queryTimeAlignment = tc.queryTimeAlignment
|
||||
originT, _ := time.Parse(time.RFC3339, tc.originTS)
|
||||
expT, _ := time.Parse(time.RFC3339, tc.expTS)
|
||||
gotTS := tc.s.adjustReqTimestamp(originT)
|
||||
if !gotTS.Equal(expT) {
|
||||
t.Fatalf("get wrong prometheus request timestamp, expect %s, got %s", expT, gotTS)
|
||||
}
|
||||
*queryTimeAlignment = oldAlignPara
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ type Group struct {
|
|||
Rules []Rule
|
||||
Type config.Type
|
||||
Interval time.Duration
|
||||
EvalOffset *time.Duration
|
||||
Limit int
|
||||
Concurrency int
|
||||
Checksum string
|
||||
|
@ -116,6 +117,9 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
|
|||
if g.Concurrency < 1 {
|
||||
g.Concurrency = 1
|
||||
}
|
||||
if cfg.EvalOffset != nil {
|
||||
g.EvalOffset = &cfg.EvalOffset.D
|
||||
}
|
||||
for _, h := range cfg.Headers {
|
||||
g.Headers[h.Key] = h.Value
|
||||
}
|
||||
|
@ -163,6 +167,10 @@ func (g *Group) ID() uint64 {
|
|||
hash.Write([]byte("\xff"))
|
||||
hash.Write([]byte(g.Name))
|
||||
hash.Write([]byte(g.Type.Get()))
|
||||
hash.Write([]byte(g.Interval.String()))
|
||||
if g.EvalOffset != nil {
|
||||
hash.Write([]byte(g.EvalOffset.String()))
|
||||
}
|
||||
return hash.Sum64()
|
||||
}
|
||||
|
||||
|
@ -277,15 +285,13 @@ var skipRandSleepOnGroupStart bool
|
|||
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
|
||||
defer func() { close(g.finishedCh) }()
|
||||
|
||||
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
||||
// sleep random duration to spread group rules evaluation
|
||||
// over time in order to reduce load on datasource.
|
||||
if !skipRandSleepOnGroupStart {
|
||||
randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64)))
|
||||
sleepOffset := uint64(time.Now().UnixNano()) % uint64(g.Interval)
|
||||
if randSleep < sleepOffset {
|
||||
randSleep += uint64(g.Interval)
|
||||
}
|
||||
randSleep -= sleepOffset
|
||||
sleepTimer := time.NewTimer(time.Duration(randSleep))
|
||||
sleepBeforeStart := delayBeforeStart(time.Now(), g.ID(), g.Interval, g.EvalOffset)
|
||||
g.infof("will start in %v", sleepBeforeStart)
|
||||
|
||||
sleepTimer := time.NewTimer(sleepBeforeStart)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
sleepTimer.Stop()
|
||||
|
@ -297,6 +303,8 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
}
|
||||
}
|
||||
|
||||
evalTS := time.Now()
|
||||
|
||||
e := &executor{
|
||||
rw: rw,
|
||||
notifiers: nts,
|
||||
|
@ -304,9 +312,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
||||
}
|
||||
|
||||
evalTS := time.Now()
|
||||
|
||||
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
||||
g.infof("started")
|
||||
|
||||
eval := func(ctx context.Context, ts time.Time) {
|
||||
g.metrics.iterationTotal.Inc()
|
||||
|
@ -375,19 +381,12 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
continue
|
||||
}
|
||||
|
||||
// ensure that staleness is tracked or existing rules only
|
||||
// ensure that staleness is tracked for existing rules only
|
||||
e.purgeStaleSeries(g.Rules)
|
||||
|
||||
e.notifierHeaders = g.NotifierHeaders
|
||||
|
||||
if g.Interval != ng.Interval {
|
||||
g.Interval = ng.Interval
|
||||
t.Stop()
|
||||
t = time.NewTicker(g.Interval)
|
||||
evalTS = time.Now()
|
||||
}
|
||||
g.mu.Unlock()
|
||||
logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
||||
|
||||
g.infof("re-started")
|
||||
case <-t.C:
|
||||
missed := (time.Since(evalTS) / g.Interval) - 1
|
||||
if missed < 0 {
|
||||
|
@ -405,6 +404,35 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
|||
}
|
||||
}
|
||||
|
||||
// delayBeforeStart returns a duration on the interval between [ts..ts+interval].
|
||||
// delayBeforeStart accounts for `offset`, so returned duration should be always
|
||||
// bigger than the `offset`.
|
||||
func delayBeforeStart(ts time.Time, key uint64, interval time.Duration, offset *time.Duration) time.Duration {
|
||||
var randSleep time.Duration
|
||||
randSleep = time.Duration(float64(interval) * (float64(key) / (1 << 64)))
|
||||
sleepOffset := time.Duration(ts.UnixNano() % interval.Nanoseconds())
|
||||
if randSleep < sleepOffset {
|
||||
randSleep += interval
|
||||
}
|
||||
randSleep -= sleepOffset
|
||||
// check if `ts` after randSleep is before `offset`,
|
||||
// if it is, add extra eval_offset to randSleep.
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409.
|
||||
if offset != nil {
|
||||
tmpEvalTS := ts.Add(randSleep)
|
||||
if tmpEvalTS.Before(tmpEvalTS.Truncate(interval).Add(*offset)) {
|
||||
randSleep += *offset
|
||||
}
|
||||
}
|
||||
return randSleep.Truncate(time.Second)
|
||||
}
|
||||
|
||||
func (g *Group) infof(format string, args ...interface{}) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
logger.Infof("group %q %s; interval=%v; eval_offset=%v; concurrency=%d",
|
||||
g.Name, msg, g.Interval, g.EvalOffset, g.Concurrency)
|
||||
}
|
||||
|
||||
// getResolveDuration returns the duration after which firing alert
|
||||
// can be considered as resolved.
|
||||
func getResolveDuration(groupInterval, delta, maxDuration time.Duration) time.Duration {
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
@ -35,18 +36,19 @@ func TestUpdateWith(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"update alerting rule",
|
||||
[]config.Rule{{
|
||||
Alert: "foo",
|
||||
Expr: "up > 0",
|
||||
For: promutils.NewDuration(time.Second),
|
||||
Labels: map[string]string{
|
||||
"bar": "baz",
|
||||
[]config.Rule{
|
||||
{
|
||||
Alert: "foo",
|
||||
Expr: "up > 0",
|
||||
For: promutils.NewDuration(time.Second),
|
||||
Labels: map[string]string{
|
||||
"bar": "baz",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": "{{ $value|humanize }}",
|
||||
"description": "{{$labels}}",
|
||||
},
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": "{{ $value|humanize }}",
|
||||
"description": "{{$labels}}",
|
||||
},
|
||||
},
|
||||
{
|
||||
Alert: "bar",
|
||||
Expr: "up > 0",
|
||||
|
@ -54,7 +56,8 @@ func TestUpdateWith(t *testing.T) {
|
|||
Labels: map[string]string{
|
||||
"bar": "baz",
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
[]config.Rule{
|
||||
{
|
||||
Alert: "foo",
|
||||
|
@ -75,7 +78,8 @@ func TestUpdateWith(t *testing.T) {
|
|||
Labels: map[string]string{
|
||||
"bar": "baz",
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"update recording rule",
|
||||
|
@ -520,3 +524,62 @@ func TestCloseWithEvalInterruption(t *testing.T) {
|
|||
case <-g.finishedCh:
|
||||
}
|
||||
}
|
||||
|
||||
func TestGroupStartDelay(t *testing.T) {
|
||||
g := &Group{}
|
||||
// interval of 5min and key generate a static delay of 30s
|
||||
g.Interval = time.Minute * 5
|
||||
key := uint64(math.MaxUint64 / 10)
|
||||
|
||||
f := func(atS, expS string) {
|
||||
t.Helper()
|
||||
at, err := time.Parse(time.DateTime, atS)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expTS, err := time.Parse(time.DateTime, expS)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
delay := delayBeforeStart(at, key, g.Interval, g.EvalOffset)
|
||||
gotStart := at.Add(delay)
|
||||
if expTS != gotStart {
|
||||
t.Errorf("expected to get %v; got %v instead", expTS, gotStart)
|
||||
}
|
||||
}
|
||||
|
||||
// test group without offset
|
||||
f("2023-01-01 00:00:00", "2023-01-01 00:00:30")
|
||||
f("2023-01-01 00:00:29", "2023-01-01 00:00:30")
|
||||
f("2023-01-01 00:00:31", "2023-01-01 00:05:30")
|
||||
|
||||
// test group with offset smaller than above fixed randSleep,
|
||||
// this way randSleep will always be enough
|
||||
offset := 20 * time.Second
|
||||
g.EvalOffset = &offset
|
||||
|
||||
f("2023-01-01 00:00:00", "2023-01-01 00:00:30")
|
||||
f("2023-01-01 00:00:29", "2023-01-01 00:00:30")
|
||||
f("2023-01-01 00:00:31", "2023-01-01 00:05:30")
|
||||
|
||||
// test group with offset bigger than above fixed randSleep,
|
||||
// this way offset will be added to delay
|
||||
offset = 3 * time.Minute
|
||||
g.EvalOffset = &offset
|
||||
|
||||
f("2023-01-01 00:00:00", "2023-01-01 00:03:30")
|
||||
f("2023-01-01 00:00:29", "2023-01-01 00:03:30")
|
||||
f("2023-01-01 00:01:00", "2023-01-01 00:08:30")
|
||||
f("2023-01-01 00:03:30", "2023-01-01 00:08:30")
|
||||
f("2023-01-01 00:07:30", "2023-01-01 00:13:30")
|
||||
|
||||
offset = 10 * time.Minute
|
||||
g.EvalOffset = &offset
|
||||
// interval of 1h and key generate a static delay of 6m
|
||||
g.Interval = time.Hour
|
||||
|
||||
f("2023-01-01 00:00:00", "2023-01-01 00:16:00")
|
||||
f("2023-01-01 00:05:00", "2023-01-01 00:16:00")
|
||||
f("2023-01-01 00:30:00", "2023-01-01 01:16:00")
|
||||
|
||||
}
|
||||
|
|
|
@ -168,7 +168,8 @@ func TestManagerUpdate(t *testing.T) {
|
|||
Name: "TestGroup", Rules: []Rule{
|
||||
Conns,
|
||||
ExampleAlertAlwaysFiring,
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -191,7 +192,8 @@ func TestManagerUpdate(t *testing.T) {
|
|||
Rules: []Rule{
|
||||
Conns,
|
||||
ExampleAlertAlwaysFiring,
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -264,7 +266,8 @@ func TestManagerUpdateNegative(t *testing.T) {
|
|||
{
|
||||
nil,
|
||||
nil,
|
||||
config.Group{Name: "Recording rule only",
|
||||
config.Group{
|
||||
Name: "Recording rule only",
|
||||
Rules: []config.Rule{
|
||||
{Record: "record", Expr: "max(up)"},
|
||||
},
|
||||
|
@ -274,7 +277,8 @@ func TestManagerUpdateNegative(t *testing.T) {
|
|||
{
|
||||
nil,
|
||||
nil,
|
||||
config.Group{Name: "Alerting rule only",
|
||||
config.Group{
|
||||
Name: "Alerting rule only",
|
||||
Rules: []config.Rule{
|
||||
{Alert: "alert", Expr: "up > 0"},
|
||||
},
|
||||
|
@ -284,7 +288,8 @@ func TestManagerUpdateNegative(t *testing.T) {
|
|||
{
|
||||
[]notifier.Notifier{&fakeNotifier{}},
|
||||
nil,
|
||||
config.Group{Name: "Recording and alerting rules",
|
||||
config.Group{
|
||||
Name: "Recording and alerting rules",
|
||||
Rules: []config.Rule{
|
||||
{Alert: "alert1", Expr: "up > 0"},
|
||||
{Alert: "alert2", Expr: "up > 0"},
|
||||
|
@ -296,7 +301,8 @@ func TestManagerUpdateNegative(t *testing.T) {
|
|||
{
|
||||
nil,
|
||||
&remotewrite.Client{},
|
||||
config.Group{Name: "Recording and alerting rules",
|
||||
config.Group{
|
||||
Name: "Recording and alerting rules",
|
||||
Rules: []config.Rule{
|
||||
{Record: "record1", Expr: "max(up)"},
|
||||
{Record: "record2", Expr: "max(up)"},
|
||||
|
|
|
@ -61,6 +61,7 @@ func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
|
|||
q: qb.BuildWithParams(datasource.QuerierParams{
|
||||
DataSourceType: group.Type.String(),
|
||||
EvaluationInterval: group.Interval,
|
||||
EvalOffset: group.EvalOffset,
|
||||
QueryParams: group.Params,
|
||||
Headers: group.Headers,
|
||||
}),
|
||||
|
|
|
@ -38,7 +38,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
|||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve accessibility score to 100 according to [Google's Lighthouse](https://developer.chrome.com/docs/lighthouse/accessibility/) tests.
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): organize `min`, `max`, `median` values on the chart legend and tooltips for better visibility.
|
||||
* FEATURE: dashboards: provide copies of Grafana dashboards alternated with VictoriaMetrics datasource at [dashboards/vm](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/dashboards/vm).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): added ability to set, override and clear request and response headers on a per-user and per-path basis. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4825) and [these docs](https://docs.victoriametrics.com/vmauth.html#auth-config) for details.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `eval_offset` attribute for [Groups](https://docs.victoriametrics.com/vmalert.html#groups). If specified, Group will be evaluated at the exact time offset on the range of [0...evaluationInterval]. The setting might be useful for cron-like rules which must be evaluated at specific moments of time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409) for details.
|
||||
|
||||
* BUGFIX: [Official Grafana dashboards for VictoriaMetrics](https://grafana.com/orgs/victoriametrics): fix display of ingested rows rate for `Samples ingested/s` and `Samples rate` panels for vmagent's dasbhoard. Previously, not all ingested protocols were accounted in these panels. An extra panel `Rows rate` was added to `Ingestion` section to display the split for rows ingested rate by protocol.
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix the bug causing render looping when switching to heatmap.
|
||||
|
@ -124,7 +124,6 @@ The v1.93.x line will be supported for at least 12 months since [v1.93.0](https:
|
|||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): properly handle client address with `X-Forwarded-For` part at the [Active queries](https://docs.victoriametrics.com/#active-queries) page. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4676#issuecomment-1663203424).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): prevent from panic when the lookbehind window in square brackets of [rollup function](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) is parsed into negative value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4795).
|
||||
|
||||
|
||||
## [v1.92.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.92.1)
|
||||
|
||||
Released at 2023-07-28
|
||||
|
|
|
@ -123,6 +123,13 @@ name: <string>
|
|||
# How often rules in the group are evaluated.
|
||||
[ interval: <duration> | default = -evaluationInterval flag ]
|
||||
|
||||
# Optional
|
||||
# Group will be evaluated at the exact offset in the range of [0...interval].
|
||||
# E.g. for Group with `interval: 1h` and `eval_offset: 5m` the evaluation will
|
||||
# start at 5th minute of the hour. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409
|
||||
# `eval_offset` can't be bigger than `interval`.
|
||||
[ eval_offset: <duration> ]
|
||||
|
||||
# Limit the number of alerts an alerting rule and series a recording
|
||||
# rule can produce. 0 is no limit.
|
||||
[ limit: <int> | default = 0 ]
|
||||
|
|
Loading…
Reference in a new issue