2023-10-13 11:54:33 +00:00
package rule
2020-05-10 16:58:17 +00:00
import (
"context"
2023-10-13 11:54:33 +00:00
"encoding/json"
2023-03-01 14:48:20 +00:00
"errors"
2023-10-13 11:54:33 +00:00
"flag"
2020-05-10 16:58:17 +00:00
"fmt"
"hash/fnv"
2021-12-02 12:45:08 +00:00
"net/url"
2022-03-29 13:09:07 +00:00
"strconv"
2020-06-01 10:46:37 +00:00
"sync"
2020-05-10 16:58:17 +00:00
"time"
2024-05-14 12:43:39 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2023-10-13 11:54:33 +00:00
"github.com/cheggaaa/pb/v3"
2022-06-09 06:21:30 +00:00
2020-06-01 10:46:37 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
2020-05-10 16:58:17 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
2020-06-29 19:21:03 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
2022-03-29 13:09:07 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2020-05-10 16:58:17 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2022-03-29 13:09:07 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2023-10-13 11:54:33 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
ruleUpdateEntriesLimit = flag . Int ( "rule.updateEntriesLimit" , 20 , "Defines the max number of rule's state updates stored in-memory. " +
"Rule's updates are available on rule's Details page and are used for debugging purposes. The number of stored updates can be overridden per rule via update_entries_limit param." )
resendDelay = flag . Duration ( "rule.resendDelay" , 0 , "MiniMum amount of time to wait before resending an alert to notifier" )
maxResolveDuration = flag . Duration ( "rule.maxResolveDuration" , 0 , "Limits the maxiMum duration for automatic alert expiration, " +
2023-11-02 19:23:32 +00:00
"which by default is 4 times evaluationInterval of the parent group" )
2023-10-25 11:07:13 +00:00
evalDelay = flag . Duration ( "rule.evalDelay" , 30 * time . Second , "Adjustment of the `time` parameter for rule evaluation requests to compensate intentional data delay from the datasource." +
"Normally, should be equal to `-search.latencyOffset` (cmd-line flag configured for VictoriaMetrics single-node or vmselect)." )
2023-10-13 11:54:33 +00:00
disableAlertGroupLabel = flag . Bool ( "disableAlertgroupLabel" , false , "Whether to disable adding group's Name as label to generated alerts and time series." )
remoteReadLookBack = flag . Duration ( "remoteRead.lookback" , time . Hour , "Lookback defines how far to look into past for alerts timeseries." +
" For example, if lookback=1h then range from now() to now()-1h will be scanned." )
2020-05-10 16:58:17 +00:00
)
// Group is an entity for grouping rules
type Group struct {
2023-10-25 09:54:18 +00:00
mu sync . RWMutex
Name string
File string
Rules [ ] Rule
Type config . Type
Interval time . Duration
EvalOffset * time . Duration
2023-10-25 11:07:13 +00:00
// EvalDelay will adjust timestamp for rule evaluation requests to compensate intentional query delay from datasource.
2023-10-25 09:54:18 +00:00
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155
EvalDelay * time . Duration
2022-06-09 06:21:30 +00:00
Limit int
2022-03-15 11:54:53 +00:00
Concurrency int
Checksum string
LastEvaluation time . Time
2021-08-31 11:52:34 +00:00
2023-04-27 10:17:26 +00:00
Labels map [ string ] string
Params url . Values
Headers map [ string ] string
NotifierHeaders map [ string ] string
2020-05-10 16:58:17 +00:00
2020-05-17 14:12:09 +00:00
doneCh chan struct { }
finishedCh chan struct { }
// channel accepts new Group obj
// which supposed to update current group
2020-06-01 10:46:37 +00:00
updateCh chan * Group
2023-03-01 14:48:20 +00:00
// evalCancel stores the cancel fn for interrupting
// rules evaluation. Used on groups update() and close().
evalCancel context . CancelFunc
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
metrics * groupMetrics
2023-10-10 10:41:19 +00:00
// evalAlignment will make the timestamp of group query
// requests be aligned with interval
evalAlignment * bool
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
}
type groupMetrics struct {
2022-02-02 12:11:41 +00:00
iterationTotal * utils . Counter
iterationDuration * utils . Summary
2022-03-29 13:09:07 +00:00
iterationMissed * utils . Counter
2022-05-20 15:31:16 +00:00
iterationInterval * utils . Gauge
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
}
2022-05-20 15:31:16 +00:00
func newGroupMetrics ( g * Group ) * groupMetrics {
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
m := & groupMetrics { }
2022-05-20 15:31:16 +00:00
labels := fmt . Sprintf ( ` group=%q, file=%q ` , g . Name , g . File )
2022-02-02 12:11:41 +00:00
m . iterationTotal = utils . GetOrCreateCounter ( fmt . Sprintf ( ` vmalert_iteration_total { %s} ` , labels ) )
m . iterationDuration = utils . GetOrCreateSummary ( fmt . Sprintf ( ` vmalert_iteration_duration_seconds { %s} ` , labels ) )
2022-03-29 13:09:07 +00:00
m . iterationMissed = utils . GetOrCreateCounter ( fmt . Sprintf ( ` vmalert_iteration_missed_total { %s} ` , labels ) )
2022-05-20 15:31:16 +00:00
m . iterationInterval = utils . GetOrCreateGauge ( fmt . Sprintf ( ` vmalert_iteration_interval_seconds { %s} ` , labels ) , func ( ) float64 {
g . mu . RLock ( )
i := g . Interval . Seconds ( )
g . mu . RUnlock ( )
return i
} )
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
return m
2020-06-01 10:46:37 +00:00
}
2021-08-31 11:52:34 +00:00
// merges group rule labels into result map
// set2 has priority over set1.
func mergeLabels ( groupName , ruleName string , set1 , set2 map [ string ] string ) map [ string ] string {
r := map [ string ] string { }
for k , v := range set1 {
r [ k ] = v
}
for k , v := range set2 {
if prevV , ok := r [ k ] ; ok {
logger . Infof ( "label %q=%q for rule %q.%q overwritten with external label %q=%q" ,
k , prevV , groupName , ruleName , k , v )
}
r [ k ] = v
}
return r
}
2023-10-13 11:54:33 +00:00
// NewGroup returns a new group
func NewGroup ( cfg config . Group , qb datasource . QuerierBuilder , defaultInterval time . Duration , labels map [ string ] string ) * Group {
2020-06-01 10:46:37 +00:00
g := & Group {
2023-04-27 10:17:26 +00:00
Type : cfg . Type ,
Name : cfg . Name ,
File : cfg . File ,
Interval : cfg . Interval . Duration ( ) ,
Limit : cfg . Limit ,
Concurrency : cfg . Concurrency ,
Checksum : cfg . Checksum ,
Params : cfg . Params ,
Headers : make ( map [ string ] string ) ,
NotifierHeaders : make ( map [ string ] string ) ,
Labels : cfg . Labels ,
2023-10-10 10:41:19 +00:00
evalAlignment : cfg . EvalAlignment ,
2021-05-22 21:26:01 +00:00
doneCh : make ( chan struct { } ) ,
finishedCh : make ( chan struct { } ) ,
updateCh : make ( chan * Group ) ,
2020-06-01 10:46:37 +00:00
}
if g . Interval == 0 {
g . Interval = defaultInterval
}
2020-06-09 12:21:20 +00:00
if g . Concurrency < 1 {
g . Concurrency = 1
}
2023-09-06 14:29:59 +00:00
if cfg . EvalOffset != nil {
g . EvalOffset = & cfg . EvalOffset . D
}
2023-10-25 09:54:18 +00:00
if cfg . EvalDelay != nil {
g . EvalDelay = & cfg . EvalDelay . D
}
2022-07-22 08:44:55 +00:00
for _ , h := range cfg . Headers {
g . Headers [ h . Key ] = h . Value
}
2023-04-27 10:17:26 +00:00
for _ , h := range cfg . NotifierHeaders {
g . NotifierHeaders [ h . Key ] = h . Value
}
2022-05-20 15:31:16 +00:00
g . metrics = newGroupMetrics ( g )
2020-06-01 10:46:37 +00:00
rules := make ( [ ] Rule , len ( cfg . Rules ) )
for i , r := range cfg . Rules {
2021-08-31 11:52:34 +00:00
var extraLabels map [ string ] string
// apply external labels
if len ( labels ) > 0 {
extraLabels = labels
}
// apply group labels, it has priority on external labels
if len ( cfg . Labels ) > 0 {
extraLabels = mergeLabels ( g . Name , r . Name ( ) , extraLabels , g . Labels )
2020-07-28 11:20:31 +00:00
}
2021-08-31 11:52:34 +00:00
// apply rules labels, it has priority on other labels
if len ( extraLabels ) > 0 {
r . Labels = mergeLabels ( g . Name , r . Name ( ) , extraLabels , r . Labels )
}
2021-04-28 20:41:15 +00:00
rules [ i ] = g . newRule ( qb , r )
2020-06-01 10:46:37 +00:00
}
g . Rules = rules
return g
}
2023-10-13 11:54:33 +00:00
func ( g * Group ) newRule ( qb datasource . QuerierBuilder , r config . Rule ) Rule {
if r . Alert != "" {
return NewAlertingRule ( qb , g , r )
2020-06-01 10:46:37 +00:00
}
2023-10-13 11:54:33 +00:00
return NewRecordingRule ( qb , g , r )
2020-05-10 16:58:17 +00:00
}
// ID return unique group ID that consists of
2022-02-02 12:11:41 +00:00
// rules file and group Name
2020-05-17 14:12:09 +00:00
func ( g * Group ) ID ( ) uint64 {
2021-10-19 13:44:13 +00:00
g . mu . RLock ( )
defer g . mu . RUnlock ( )
2020-05-10 16:58:17 +00:00
hash := fnv . New64a ( )
hash . Write ( [ ] byte ( g . File ) )
hash . Write ( [ ] byte ( "\xff" ) )
hash . Write ( [ ] byte ( g . Name ) )
2021-02-01 13:02:44 +00:00
hash . Write ( [ ] byte ( g . Type . Get ( ) ) )
2023-09-06 14:29:59 +00:00
hash . Write ( [ ] byte ( g . Interval . String ( ) ) )
if g . EvalOffset != nil {
hash . Write ( [ ] byte ( g . EvalOffset . String ( ) ) )
}
2020-05-10 16:58:17 +00:00
return hash . Sum64 ( )
}
2023-10-13 11:54:33 +00:00
// restore restores alerts state for group rules
func ( g * Group ) restore ( ctx context . Context , qb datasource . QuerierBuilder , ts time . Time , lookback time . Duration ) error {
2020-05-10 16:58:17 +00:00
for _ , rule := range g . Rules {
2023-02-04 03:46:13 +00:00
ar , ok := rule . ( * AlertingRule )
2020-06-01 10:46:37 +00:00
if ! ok {
continue
}
2023-02-04 03:46:13 +00:00
if ar . For < 1 {
2020-06-01 10:46:37 +00:00
continue
2020-05-10 16:58:17 +00:00
}
2022-12-01 08:27:39 +00:00
q := qb . BuildWithParams ( datasource . QuerierParams {
2023-02-04 03:46:13 +00:00
EvaluationInterval : g . Interval ,
QueryParams : g . Params ,
Headers : g . Headers ,
Debug : ar . Debug ,
2022-12-01 08:27:39 +00:00
} )
2023-10-13 11:54:33 +00:00
if err := ar . restore ( ctx , q , ts , lookback ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "error while restoring rule %q: %w" , rule , err )
2020-05-10 16:58:17 +00:00
}
}
return nil
}
// updateWith updates existing group with
2020-06-01 10:46:37 +00:00
// passed group object. This function ignores group
// evaluation interval change. It supposed to be updated
2023-10-13 11:54:33 +00:00
// in group.Start function.
2020-05-17 14:12:09 +00:00
// Not thread-safe.
2020-06-01 10:46:37 +00:00
func ( g * Group ) updateWith ( newGroup * Group ) error {
rulesRegistry := make ( map [ uint64 ] Rule )
2020-05-10 16:58:17 +00:00
for _ , nr := range newGroup . Rules {
2020-06-01 10:46:37 +00:00
rulesRegistry [ nr . ID ( ) ] = nr
2020-05-10 16:58:17 +00:00
}
for i , or := range g . Rules {
2020-06-01 10:46:37 +00:00
nr , ok := rulesRegistry [ or . ID ( ) ]
2020-05-10 16:58:17 +00:00
if ! ok {
// old rule is not present in the new list
2020-05-15 06:55:22 +00:00
// so we mark it for removing
2023-10-13 11:54:33 +00:00
g . Rules [ i ] . close ( )
2020-05-15 06:55:22 +00:00
g . Rules [ i ] = nil
2020-05-10 16:58:17 +00:00
continue
}
2023-10-13 11:54:33 +00:00
if err := or . updateWith ( nr ) ; err != nil {
2020-06-01 10:46:37 +00:00
return err
}
delete ( rulesRegistry , nr . ID ( ) )
2020-05-10 16:58:17 +00:00
}
2020-06-01 10:46:37 +00:00
var newRules [ ] Rule
2020-05-15 06:55:22 +00:00
for _ , r := range g . Rules {
if r == nil {
// skip nil rules
continue
}
newRules = append ( newRules , r )
}
// add the rest of rules from registry
2020-05-10 16:58:17 +00:00
for _ , nr := range rulesRegistry {
2020-05-15 06:55:22 +00:00
newRules = append ( newRules , nr )
2020-05-10 16:58:17 +00:00
}
2021-09-23 14:55:59 +00:00
// note that g.Interval is not updated here
// so the value can be compared later in
// group.Start function
2021-02-01 13:02:44 +00:00
g . Type = newGroup . Type
2020-06-09 12:21:20 +00:00
g . Concurrency = newGroup . Concurrency
2021-12-02 12:45:08 +00:00
g . Params = newGroup . Params
2022-07-21 13:59:55 +00:00
g . Headers = newGroup . Headers
2023-04-27 10:17:26 +00:00
g . NotifierHeaders = newGroup . NotifierHeaders
2021-08-31 11:52:34 +00:00
g . Labels = newGroup . Labels
2022-06-09 06:58:25 +00:00
g . Limit = newGroup . Limit
2020-09-11 19:14:30 +00:00
g . Checksum = newGroup . Checksum
2020-05-15 06:55:22 +00:00
g . Rules = newRules
2020-06-01 10:46:37 +00:00
return nil
2020-05-10 16:58:17 +00:00
}
2023-10-13 11:54:33 +00:00
// InterruptEval interrupts in-flight rules evaluations
2023-03-01 14:48:20 +00:00
// within the group. It is expected that g.evalCancel
// will be repopulated after the call.
2023-10-13 11:54:33 +00:00
func ( g * Group ) InterruptEval ( ) {
2023-03-01 14:48:20 +00:00
g . mu . RLock ( )
defer g . mu . RUnlock ( )
if g . evalCancel != nil {
g . evalCancel ( )
}
}
2023-10-13 11:54:33 +00:00
// Close stops the group and it's rules, unregisters group metrics
func ( g * Group ) Close ( ) {
2020-05-17 14:12:09 +00:00
if g . doneCh == nil {
2020-05-10 16:58:17 +00:00
return
}
2020-05-17 14:12:09 +00:00
close ( g . doneCh )
2023-10-13 11:54:33 +00:00
g . InterruptEval ( )
2020-05-17 14:12:09 +00:00
<- g . finishedCh
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
2022-02-02 12:11:41 +00:00
g . metrics . iterationDuration . Unregister ( )
g . metrics . iterationTotal . Unregister ( )
2022-05-20 15:31:16 +00:00
g . metrics . iterationMissed . Unregister ( )
g . metrics . iterationInterval . Unregister ( )
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
for _ , rule := range g . Rules {
2023-10-13 11:54:33 +00:00
rule . close ( )
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
}
2020-05-10 16:58:17 +00:00
}
2023-10-13 11:54:33 +00:00
// SkipRandSleepOnGroupStart will skip random sleep delay in group first evaluation
var SkipRandSleepOnGroupStart bool
2020-09-02 22:00:55 +00:00
2023-10-13 11:54:33 +00:00
// Start starts group's evaluation
func ( g * Group ) Start ( ctx context . Context , nts func ( ) [ ] notifier . Notifier , rw remotewrite . RWClient , rr datasource . QuerierBuilder ) {
2020-06-09 12:21:20 +00:00
defer func ( ) { close ( g . finishedCh ) } ( )
2020-09-02 22:00:55 +00:00
2023-10-10 10:41:19 +00:00
evalTS := time . Now ( )
2023-09-06 14:29:59 +00:00
// sleep random duration to spread group rules evaluation
// over time in order to reduce load on datasource.
2023-10-13 11:54:33 +00:00
if ! SkipRandSleepOnGroupStart {
2023-10-10 10:41:19 +00:00
sleepBeforeStart := delayBeforeStart ( evalTS , g . ID ( ) , g . Interval , g . EvalOffset )
2023-09-06 14:29:59 +00:00
g . infof ( "will start in %v" , sleepBeforeStart )
sleepTimer := time . NewTimer ( sleepBeforeStart )
2024-10-18 09:18:24 +00:00
randSleep :
for {
select {
case <- ctx . Done ( ) :
sleepTimer . Stop ( )
return
case <- g . doneCh :
sleepTimer . Stop ( )
return
case ng := <- g . updateCh :
g . mu . Lock ( )
err := g . updateWith ( ng )
if err != nil {
logger . Errorf ( "group %q: failed to update: %s" , g . Name , err )
g . mu . Unlock ( )
continue
}
g . mu . Unlock ( )
g . infof ( "reload successfully" )
case <- sleepTimer . C :
break randSleep
}
2023-03-06 13:04:43 +00:00
}
2023-10-10 10:41:19 +00:00
evalTS = evalTS . Add ( sleepBeforeStart )
2023-03-06 13:04:43 +00:00
}
2022-03-29 13:09:07 +00:00
e := & executor {
2023-10-13 11:54:33 +00:00
Rw : rw ,
Notifiers : nts ,
2023-04-27 11:02:21 +00:00
notifierHeaders : g . NotifierHeaders ,
2023-10-25 11:07:13 +00:00
previouslySentSeriesToRW : make ( map [ uint64 ] map [ string ] [ ] prompbmarshal . Label ) ,
2023-04-27 10:17:26 +00:00
}
2022-03-29 13:09:07 +00:00
2023-09-06 14:29:59 +00:00
g . infof ( "started" )
2022-03-29 13:09:07 +00:00
2023-03-01 14:48:20 +00:00
eval := func ( ctx context . Context , ts time . Time ) {
2022-03-29 13:09:07 +00:00
g . metrics . iterationTotal . Inc ( )
start := time . Now ( )
if len ( g . Rules ) < 1 {
g . metrics . iterationDuration . UpdateDuration ( start )
g . LastEvaluation = start
return
}
resolveDuration := getResolveDuration ( g . Interval , * resendDelay , * maxResolveDuration )
2023-10-10 10:41:19 +00:00
ts = g . adjustReqTimestamp ( ts )
2022-06-09 06:21:30 +00:00
errs := e . execConcurrently ( ctx , g . Rules , ts , g . Concurrency , resolveDuration , g . Limit )
2022-03-29 13:09:07 +00:00
for err := range errs {
if err != nil {
logger . Errorf ( "group %q: %s" , g . Name , err )
}
}
g . metrics . iterationDuration . UpdateDuration ( start )
g . LastEvaluation = start
}
2023-03-01 14:48:20 +00:00
evalCtx , cancel := context . WithCancel ( ctx )
g . mu . Lock ( )
g . evalCancel = cancel
g . mu . Unlock ( )
defer g . evalCancel ( )
eval ( evalCtx , evalTS )
2022-03-29 13:09:07 +00:00
2020-06-01 10:46:37 +00:00
t := time . NewTicker ( g . Interval )
2020-05-10 16:58:17 +00:00
defer t . Stop ( )
2023-02-04 03:46:13 +00:00
// restore the rules state after the first evaluation
// so only active alerts can be restored.
if rr != nil {
2023-10-13 11:54:33 +00:00
err := g . restore ( ctx , rr , evalTS , * remoteReadLookBack )
2023-02-04 03:46:13 +00:00
if err != nil {
logger . Errorf ( "error while restoring ruleState for group %q: %s" , g . Name , err )
}
}
2020-05-10 16:58:17 +00:00
for {
select {
case <- ctx . Done ( ) :
logger . Infof ( "group %q: context cancelled" , g . Name )
return
2020-05-17 14:12:09 +00:00
case <- g . doneCh :
2020-05-10 16:58:17 +00:00
logger . Infof ( "group %q: received stop signal" , g . Name )
return
2020-05-17 14:12:09 +00:00
case ng := <- g . updateCh :
2020-06-01 10:46:37 +00:00
g . mu . Lock ( )
2023-03-01 14:48:20 +00:00
// it is expected that g.evalCancel will be evoked
// somewhere else to unblock group from the rules evaluation.
// we recreate the evalCtx and g.evalCancel, so it can
// be called again.
evalCtx , cancel = context . WithCancel ( ctx )
g . evalCancel = cancel
2020-06-01 10:46:37 +00:00
err := g . updateWith ( ng )
if err != nil {
logger . Errorf ( "group %q: failed to update: %s" , g . Name , err )
g . mu . Unlock ( )
continue
}
2022-05-13 08:04:49 +00:00
2023-09-06 14:29:59 +00:00
// ensure that staleness is tracked for existing rules only
2022-05-13 08:04:49 +00:00
e . purgeStaleSeries ( g . Rules )
2023-04-27 11:02:21 +00:00
e . notifierHeaders = g . NotifierHeaders
2020-06-01 10:46:37 +00:00
g . mu . Unlock ( )
2023-09-06 14:29:59 +00:00
g . infof ( "re-started" )
2020-05-10 16:58:17 +00:00
case <- t . C :
2022-03-29 13:09:07 +00:00
missed := ( time . Since ( evalTS ) / g . Interval ) - 1
2023-07-13 15:11:22 +00:00
if missed < 0 {
// missed can become < 0 due to irregular delays during evaluation
// which can result in time.Since(evalTS) < g.Interval
missed = 0
}
2022-03-29 13:09:07 +00:00
if missed > 0 {
g . metrics . iterationMissed . Inc ( )
2020-06-09 12:21:20 +00:00
}
2022-03-29 13:09:07 +00:00
evalTS = evalTS . Add ( ( missed + 1 ) * g . Interval )
2023-03-01 14:48:20 +00:00
eval ( evalCtx , evalTS )
2020-06-09 12:21:20 +00:00
}
}
}
2020-06-01 10:46:37 +00:00
2023-10-13 11:54:33 +00:00
// UpdateWith inserts new group to updateCh
func ( g * Group ) UpdateWith ( new * Group ) {
g . updateCh <- new
}
// DeepCopy returns a deep copy of group
func ( g * Group ) DeepCopy ( ) * Group {
g . mu . RLock ( )
data , _ := json . Marshal ( g )
g . mu . RUnlock ( )
newG := Group { }
_ = json . Unmarshal ( data , & newG )
newG . Rules = g . Rules
return & newG
}
2023-09-06 14:29:59 +00:00
// delayBeforeStart returns a duration on the interval between [ts..ts+interval].
// delayBeforeStart accounts for `offset`, so returned duration should be always
// bigger than the `offset`.
func delayBeforeStart ( ts time . Time , key uint64 , interval time . Duration , offset * time . Duration ) time . Duration {
var randSleep time . Duration
randSleep = time . Duration ( float64 ( interval ) * ( float64 ( key ) / ( 1 << 64 ) ) )
sleepOffset := time . Duration ( ts . UnixNano ( ) % interval . Nanoseconds ( ) )
if randSleep < sleepOffset {
randSleep += interval
}
randSleep -= sleepOffset
// check if `ts` after randSleep is before `offset`,
// if it is, add extra eval_offset to randSleep.
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3409.
if offset != nil {
tmpEvalTS := ts . Add ( randSleep )
if tmpEvalTS . Before ( tmpEvalTS . Truncate ( interval ) . Add ( * offset ) ) {
randSleep += * offset
}
}
2023-10-10 10:41:19 +00:00
return randSleep
2023-09-06 14:29:59 +00:00
}
2024-07-09 22:14:15 +00:00
func ( g * Group ) infof ( format string , args ... any ) {
2023-09-06 14:29:59 +00:00
msg := fmt . Sprintf ( format , args ... )
logger . Infof ( "group %q %s; interval=%v; eval_offset=%v; concurrency=%d" ,
g . Name , msg , g . Interval , g . EvalOffset , g . Concurrency )
}
2023-10-13 11:54:33 +00:00
// Replay performs group replay
func ( g * Group ) Replay ( start , end time . Time , rw remotewrite . RWClient , maxDataPoint , replayRuleRetryAttempts int , replayDelay time . Duration , disableProgressBar bool ) int {
var total int
step := g . Interval * time . Duration ( maxDataPoint )
ri := rangeIterator { start : start , end : end , step : step }
iterations := int ( end . Sub ( start ) / step ) + 1
fmt . Printf ( "\nGroup %q" +
"\ninterval: \t%v" +
"\nrequests to make: \t%d" +
"\nmax range per request: \t%v\n" ,
g . Name , g . Interval , iterations , step )
if g . Limit > 0 {
fmt . Printf ( "\nPlease note, `limit: %d` param has no effect during replay.\n" ,
g . Limit )
}
for _ , rule := range g . Rules {
fmt . Printf ( "> Rule %q (ID: %d)\n" , rule , rule . ID ( ) )
var bar * pb . ProgressBar
if ! disableProgressBar {
bar = pb . StartNew ( iterations )
}
ri . reset ( )
for ri . next ( ) {
n , err := replayRule ( rule , ri . s , ri . e , rw , replayRuleRetryAttempts )
if err != nil {
logger . Fatalf ( "rule %q: %s" , rule , err )
}
total += n
if bar != nil {
bar . Increment ( )
}
}
if bar != nil {
bar . Finish ( )
}
// sleep to let remote storage to flush data on-disk
// so chained rules could be calculated correctly
time . Sleep ( replayDelay )
}
return total
}
// ExecOnce evaluates all the rules under group for once with given timestamp.
func ( g * Group ) ExecOnce ( ctx context . Context , nts func ( ) [ ] notifier . Notifier , rw remotewrite . RWClient , evalTS time . Time ) chan error {
e := & executor {
Rw : rw ,
Notifiers : nts ,
notifierHeaders : g . NotifierHeaders ,
2023-10-25 11:07:13 +00:00
previouslySentSeriesToRW : make ( map [ uint64 ] map [ string ] [ ] prompbmarshal . Label ) ,
2023-10-13 11:54:33 +00:00
}
if len ( g . Rules ) < 1 {
return nil
}
resolveDuration := getResolveDuration ( g . Interval , * resendDelay , * maxResolveDuration )
return e . execConcurrently ( ctx , g . Rules , evalTS , g . Concurrency , resolveDuration , g . Limit )
}
type rangeIterator struct {
step time . Duration
start , end time . Time
iter int
s , e time . Time
}
func ( ri * rangeIterator ) reset ( ) {
ri . iter = 0
ri . s , ri . e = time . Time { } , time . Time { }
}
func ( ri * rangeIterator ) next ( ) bool {
ri . s = ri . start . Add ( ri . step * time . Duration ( ri . iter ) )
if ! ri . end . After ( ri . s ) {
return false
}
ri . e = ri . s . Add ( ri . step )
if ri . e . After ( ri . end ) {
ri . e = ri . end
}
ri . iter ++
return true
}
2022-03-16 15:26:33 +00:00
// getResolveDuration returns the duration after which firing alert
// can be considered as resolved.
2022-03-29 13:09:07 +00:00
func getResolveDuration ( groupInterval , delta , maxDuration time . Duration ) time . Duration {
2022-03-16 15:26:33 +00:00
if groupInterval > delta {
delta = groupInterval
2021-09-13 12:48:18 +00:00
}
2022-03-16 15:26:33 +00:00
resolveDuration := delta * 4
2022-03-29 13:09:07 +00:00
if maxDuration > 0 && resolveDuration > maxDuration {
resolveDuration = maxDuration
2022-03-16 15:26:33 +00:00
}
return resolveDuration
2021-09-13 12:48:18 +00:00
}
2023-10-10 10:41:19 +00:00
func ( g * Group ) adjustReqTimestamp ( timestamp time . Time ) time . Time {
if g . EvalOffset != nil {
// calculate the min timestamp on the evaluationInterval
intervalStart := timestamp . Truncate ( g . Interval )
ts := intervalStart . Add ( * g . EvalOffset )
if timestamp . Before ( ts ) {
// if passed timestamp is before the expected evaluation offset,
// then we should adjust it to the previous evaluation round.
// E.g. request with evaluationInterval=1h and evaluationOffset=30m
// was evaluated at 11:20. Then the timestamp should be adjusted
// to 10:30, to the previous evaluationInterval.
return ts . Add ( - g . Interval )
}
2023-10-25 09:54:18 +00:00
// when `eval_offset` is using, ts shouldn't be effect by `eval_alignment` and `eval_delay`
// since it should be always aligned.
2023-10-10 10:41:19 +00:00
return ts
}
2023-10-25 11:07:13 +00:00
timestamp = timestamp . Add ( - g . getEvalDelay ( ) )
2023-10-26 13:41:35 +00:00
// always apply the alignment as a last step
2023-10-10 10:41:19 +00:00
if g . evalAlignment == nil || * g . evalAlignment {
// align query time with interval to get similar result with grafana when plotting time series.
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
2023-10-25 11:07:13 +00:00
return timestamp . Truncate ( g . Interval )
2023-10-25 09:54:18 +00:00
}
2023-10-25 11:07:13 +00:00
return timestamp
2023-10-25 09:54:18 +00:00
}
func ( g * Group ) getEvalDelay ( ) time . Duration {
if g . EvalDelay != nil {
return * g . EvalDelay
2023-10-10 10:41:19 +00:00
}
2023-10-25 09:54:18 +00:00
return * evalDelay
2023-10-10 10:41:19 +00:00
}
2023-10-13 11:54:33 +00:00
// executor contains group's notify and rw configs
2020-06-09 12:21:20 +00:00
type executor struct {
2023-10-13 11:54:33 +00:00
Notifiers func ( ) [ ] notifier . Notifier
2023-04-27 11:02:21 +00:00
notifierHeaders map [ string ] string
2023-10-13 11:54:33 +00:00
Rw remotewrite . RWClient
2022-03-29 13:09:07 +00:00
2022-03-30 10:37:27 +00:00
previouslySentSeriesToRWMu sync . Mutex
2023-10-25 11:07:13 +00:00
// previouslySentSeriesToRW stores series sent to RW on previous iteration
2022-03-29 13:09:07 +00:00
// map[ruleID]map[ruleLabels][]prompb.Label
// where `ruleID` is ID of the Rule within a Group
// and `ruleLabels` is []prompb.Label marshalled to a string
2023-10-25 11:07:13 +00:00
previouslySentSeriesToRW map [ uint64 ] map [ string ] [ ] prompbmarshal . Label
2020-06-09 12:21:20 +00:00
}
2023-10-13 11:54:33 +00:00
// execConcurrently executes rules concurrently if concurrency>1
2022-06-09 06:21:30 +00:00
func ( e * executor ) execConcurrently ( ctx context . Context , rules [ ] Rule , ts time . Time , concurrency int , resolveDuration time . Duration , limit int ) chan error {
2020-06-09 12:21:20 +00:00
res := make ( chan error , len ( rules ) )
if concurrency == 1 {
// fast path
for _ , rule := range rules {
2022-06-09 06:21:30 +00:00
res <- e . exec ( ctx , rule , ts , resolveDuration , limit )
2020-06-09 12:21:20 +00:00
}
close ( res )
return res
}
sem := make ( chan struct { } , concurrency )
go func ( ) {
wg := sync . WaitGroup { }
2023-10-13 11:54:33 +00:00
for _ , r := range rules {
2020-06-09 12:21:20 +00:00
sem <- struct { } { }
wg . Add ( 1 )
go func ( r Rule ) {
2022-06-09 06:21:30 +00:00
res <- e . exec ( ctx , r , ts , resolveDuration , limit )
2020-06-09 12:21:20 +00:00
<- sem
wg . Done ( )
2023-10-13 11:54:33 +00:00
} ( r )
2020-06-09 12:21:20 +00:00
}
wg . Wait ( )
close ( res )
} ( )
return res
}
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
var (
2021-08-31 09:28:02 +00:00
alertsFired = metrics . NewCounter ( ` vmalert_alerts_fired_total ` )
execTotal = metrics . NewCounter ( ` vmalert_execution_total ` )
execErrors = metrics . NewCounter ( ` vmalert_execution_errors_total ` )
app/vmalert: extend metrics set exported by `vmalert` #573 (#654)
* app/vmalert: extend metrics set exported by `vmalert` #573
New metrics were added to improve observability:
+ vmalert_alerts_pending{alertname, group} - number of pending alerts per group
per alert;
+ vmalert_alerts_acitve{alertname, group} - number of active alerts per group
per alert;
+ vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error
during prev execution, is 0 if no errors happened;
+ vmalert_recording_rules_error{recording, group} - is 1 if recording rule
ended up with error during prev execution, is 0 if no errors happened;
* vmalert_iteration_total{group, file} - now contains group and file name labels.
This should improve control over specific groups;
* vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups;
Some collisions for alerts and recording rules are possible, because neither
group name nor alert/recording rule name are unique for compatibility reasons.
Commit contains list of TODOs for Unregistering metrics since groups and rules
are ephemeral and could be removed without application restart. In order to
unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13
* app/vmalert: extend metrics set exported by `vmalert` #573
The changes are following:
* add an ID label to rules metrics, since `name` collisions within one group is
a common case - see the k8s example alerts;
* supports metrics unregistering on rule updates. Consider the case when one rule
was added or removed from the group, or the whole group was added or removed.
The change depends on https://github.com/VictoriaMetrics/metrics/pull/16
where race condition for Unregister method was fixed.
2020-08-09 06:41:29 +00:00
)
2023-10-13 11:54:33 +00:00
func ( e * executor ) exec ( ctx context . Context , r Rule , ts time . Time , resolveDuration time . Duration , limit int ) error {
2020-06-09 12:21:20 +00:00
execTotal . Inc ( )
2023-10-13 11:54:33 +00:00
tss , err := r . exec ( ctx , ts , limit )
2020-06-09 12:21:20 +00:00
if err != nil {
2023-03-01 14:48:20 +00:00
if errors . Is ( err , context . Canceled ) {
// the context can be cancelled on graceful shutdown
// or on group update. So no need to handle the error as usual.
return nil
}
2020-06-09 12:21:20 +00:00
execErrors . Inc ( )
2023-10-13 11:54:33 +00:00
return fmt . Errorf ( "rule %q: failed to execute: %w" , r , err )
2020-06-09 12:21:20 +00:00
}
2023-10-13 11:54:33 +00:00
if e . Rw != nil {
2022-12-06 14:36:46 +00:00
pushToRW := func ( tss [ ] prompbmarshal . TimeSeries ) error {
var lastErr error
2022-03-29 13:09:07 +00:00
for _ , ts := range tss {
2023-10-13 11:54:33 +00:00
if err := e . Rw . Push ( ts ) ; err != nil {
lastErr = fmt . Errorf ( "rule %q: remote write failure: %w" , r , err )
2022-03-29 13:09:07 +00:00
}
2020-05-10 16:58:17 +00:00
}
2022-12-06 14:36:46 +00:00
return lastErr
}
if err := pushToRW ( tss ) ; err != nil {
return err
2020-05-10 16:58:17 +00:00
}
2022-12-06 14:36:46 +00:00
2023-10-13 11:54:33 +00:00
staleSeries := e . getStaleSeries ( r , tss , ts )
2022-12-06 14:36:46 +00:00
if err := pushToRW ( staleSeries ) ; err != nil {
return err
}
2020-05-10 16:58:17 +00:00
}
2020-06-09 12:21:20 +00:00
2023-10-13 11:54:33 +00:00
ar , ok := r . ( * AlertingRule )
2020-06-09 12:21:20 +00:00
if ! ok {
return nil
}
2022-03-16 15:26:33 +00:00
2024-03-28 07:55:10 +00:00
alerts := ar . alertsToSend ( resolveDuration , * resendDelay )
2020-06-09 12:21:20 +00:00
if len ( alerts ) < 1 {
return nil
}
2020-06-29 19:21:03 +00:00
2022-06-18 07:11:37 +00:00
wg := sync . WaitGroup { }
2022-12-06 14:36:46 +00:00
errGr := new ( utils . ErrGroup )
2023-10-13 11:54:33 +00:00
for _ , nt := range e . Notifiers ( ) {
2022-06-18 07:11:37 +00:00
wg . Add ( 1 )
go func ( nt notifier . Notifier ) {
2023-04-27 10:17:26 +00:00
if err := nt . Send ( ctx , alerts , e . notifierHeaders ) ; err != nil {
2023-10-13 11:54:33 +00:00
errGr . Add ( fmt . Errorf ( "rule %q: failed to send alerts to addr %q: %w" , r , nt . Addr ( ) , err ) )
2022-06-18 07:11:37 +00:00
}
wg . Done ( )
} ( nt )
2020-06-09 12:21:20 +00:00
}
2022-06-18 07:11:37 +00:00
wg . Wait ( )
2020-06-29 19:21:03 +00:00
return errGr . Err ( )
2020-05-10 16:58:17 +00:00
}
2022-03-29 13:09:07 +00:00
2024-05-14 12:43:39 +00:00
var bbPool bytesutil . ByteBufferPool
2024-04-08 08:11:12 +00:00
// getStaleSeries checks whether there are stale series from previously sent ones.
2023-10-13 11:54:33 +00:00
func ( e * executor ) getStaleSeries ( r Rule , tss [ ] prompbmarshal . TimeSeries , timestamp time . Time ) [ ] prompbmarshal . TimeSeries {
2024-05-14 12:43:39 +00:00
bb := bbPool . Get ( )
defer bbPool . Put ( bb )
2022-03-30 10:37:27 +00:00
ruleLabels := make ( map [ string ] [ ] prompbmarshal . Label , len ( tss ) )
2022-03-29 13:09:07 +00:00
for _ , ts := range tss {
2024-05-14 12:43:39 +00:00
// convert labels to strings, so we can compare with previously sent series
bb . B = labelsToString ( bb . B , ts . Labels )
ruleLabels [ string ( bb . B ) ] = ts . Labels
bb . Reset ( )
2022-03-29 13:09:07 +00:00
}
2023-10-13 11:54:33 +00:00
rID := r . ID ( )
2022-03-29 13:09:07 +00:00
var staleS [ ] prompbmarshal . TimeSeries
// check whether there are series which disappeared and need to be marked as stale
2022-03-30 10:37:27 +00:00
e . previouslySentSeriesToRWMu . Lock ( )
2023-10-25 11:07:13 +00:00
for key , labels := range e . previouslySentSeriesToRW [ rID ] {
2022-03-29 13:09:07 +00:00
if _ , ok := ruleLabels [ key ] ; ok {
continue
}
// previously sent series are missing in current series, so we mark them as stale
ss := newTimeSeriesPB ( [ ] float64 { decimal . StaleNaN } , [ ] int64 { timestamp . Unix ( ) } , labels )
staleS = append ( staleS , ss )
}
// set previous series to current
2023-10-25 11:07:13 +00:00
e . previouslySentSeriesToRW [ rID ] = ruleLabels
2022-03-30 10:37:27 +00:00
e . previouslySentSeriesToRWMu . Unlock ( )
2022-03-29 13:09:07 +00:00
return staleS
}
2022-05-13 08:04:49 +00:00
// purgeStaleSeries deletes references in tracked
// previouslySentSeriesToRW list to Rules which aren't present
// in the given activeRules list. The method is used when the list
// of loaded rules has changed and executor has to remove
// references to non-existing rules.
func ( e * executor ) purgeStaleSeries ( activeRules [ ] Rule ) {
newPreviouslySentSeriesToRW := make ( map [ uint64 ] map [ string ] [ ] prompbmarshal . Label )
e . previouslySentSeriesToRWMu . Lock ( )
for _ , rule := range activeRules {
id := rule . ID ( )
2023-10-25 11:07:13 +00:00
prev , ok := e . previouslySentSeriesToRW [ id ]
2022-05-13 08:04:49 +00:00
if ok {
// keep previous series for staleness detection
newPreviouslySentSeriesToRW [ id ] = prev
}
}
2023-10-25 11:07:13 +00:00
e . previouslySentSeriesToRW = nil
e . previouslySentSeriesToRW = newPreviouslySentSeriesToRW
2022-05-13 08:04:49 +00:00
e . previouslySentSeriesToRWMu . Unlock ( )
}
2024-05-14 12:43:39 +00:00
func labelsToString ( dst [ ] byte , labels [ ] prompbmarshal . Label ) [ ] byte {
dst = append ( dst , '{' )
2022-03-29 13:09:07 +00:00
for i , label := range labels {
if len ( label . Name ) == 0 {
2024-05-14 12:43:39 +00:00
dst = append ( dst , "__name__" ... )
2022-03-29 13:09:07 +00:00
} else {
2024-05-14 12:43:39 +00:00
dst = append ( dst , label . Name ... )
2022-03-29 13:09:07 +00:00
}
2024-05-14 12:43:39 +00:00
dst = append ( dst , '=' )
dst = strconv . AppendQuote ( dst , label . Value )
2022-03-29 13:09:07 +00:00
if i < len ( labels ) - 1 {
2024-05-14 12:43:39 +00:00
dst = append ( dst , ',' )
2022-03-29 13:09:07 +00:00
}
}
2024-05-14 12:43:39 +00:00
dst = append ( dst , '}' )
return dst
2022-03-29 13:09:07 +00:00
}