app/vmalert: restore alerts state from datasource metrics (#461)

* app/vmalert: restore alerts state from datasource metrics

Vmalert will restore alerts state for rules that have `rule.For` > 0 from previously written timeseries via `remotewrite.url` flag.

* app/vmalert: mention remotewerite and remoteread configuration in README
This commit is contained in:
Roman Khavronenko 2020-05-04 22:51:22 +01:00 committed by Aliaksandr Valialkin
parent 89aa6dbf56
commit abce2b092f
8 changed files with 338 additions and 80 deletions

View file

@ -56,7 +56,10 @@ test-vmalert:
run-vmalert: vmalert run-vmalert: vmalert
./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \ ./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \
-datasource.url=http://localhost:8428 -notifier.url=http://localhost:9093 \ -datasource.url=http://localhost:8428 \
-notifier.url=http://localhost:9093 \
-remotewrite.url=http://localhost:8428 \
-remoteread.url=http://localhost:8428 \
-evaluationInterval=3s -evaluationInterval=3s
vmalert-amd64: vmalert-amd64:

View file

@ -13,8 +13,6 @@ sends alerts to [Alert Manager](https://github.com/prometheus/alertmanager).
* Lightweight without extra dependencies. * Lightweight without extra dependencies.
### TODO: ### TODO:
* Persist alerts state as timeseries in TSDB. Currently, alerts state is stored
in process memory only and will be lost on restart;
* Configuration hot reload. * Configuration hot reload.
### QuickStart ### QuickStart
@ -51,27 +49,52 @@ Rules in group evaluated one-by-one sequentially.
Used as alert source in AlertManager. Used as alert source in AlertManager.
* `http://<vmalert-addr>/metrics` - application metrics. * `http://<vmalert-addr>/metrics` - application metrics.
`vmalert` may be configured with `-remotewrite` flag to write alerts state in form of timeseries
via remote write protocol. Alerts state will be written as `ALERTS` timeseries. These timeseries
may be used to recover alerts state on `vmalert` restarts if `-remoteread` is configured.
### Configuration ### Configuration
The shortlist of configuration flags is the following: The shortlist of configuration flags is the following:
``` ```
Usage of vmalert: Usage of vmalert:
-datasource.url string
Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428
-datasource.basicAuth.password string -datasource.basicAuth.password string
Optional basic auth password to use for -datasource.url Optional basic auth password for -datasource.url
-datasource.basicAuth.username string -datasource.basicAuth.username string
Optional basic auth username to use for -datasource.url Optional basic auth username for -datasource.url
-datasource.url string
Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP is used
-evaluationInterval duration -evaluationInterval duration
How often to evaluate the rules. Default 1m (default 1m0s) How often to evaluate the rules. Default 1m (default 1m0s)
-external.url string -external.url string
External URL is used as alert's source for sent alerts to the notifier External URL is used as alert's source for sent alerts to the notifier
-http.maxGracefulShutdownDuration duration
The maximum duration for graceful shutdown of HTTP server. Highly loaded server may require increased value for graceful shutdown (default 7s)
-httpAuth.password string
Password for HTTP Basic Auth. The authentication is disabled if -httpAuth.username is empty
-httpAuth.username string
Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
-httpListenAddr string -httpListenAddr string
Address to listen for http connections (default ":8880") Address to listen for http connections (default ":8880")
-notifier.url string -notifier.url string
Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093 Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093
-remoteread.basicAuth.password string
Optional basic auth password for -remoteread.url
-remoteread.basicAuth.username string
Optional basic auth username for -remoteread.url
-remoteread.lookback duration
Lookback defines how far to look into past for alerts timeseries. For example, if lookback=1h then range from now() to now()-1h will be scanned. (default 1h0m0s)
-remoteread.url vmalert
Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts state. This configuration makes sense only if vmalert was configured with `remotewrite.url` before and has been successfully persisted its state. E.g. http://127.0.0.1:8428
-remotewrite.basicAuth.password string
Optional basic auth password for -remotewrite.url
-remotewrite.basicAuth.username string
Optional basic auth username for -remotewrite.url
-remotewrite.url string -remotewrite.url string
Optional URL to remote-write compatible storage where to write timeseriesbased on active alerts. E.g. http://127.0.0.1:8428 Optional URL to Victoria Metrics or VMInsert where to persist alerts state in form of timeseries. E.g. http://127.0.0.1:8428
-rule value -rule value
Path to the file with alert rules. Path to the file with alert rules.
Supports patterns. Flag can be specified multiple times. Supports patterns. Flag can be specified multiple times.

View file

@ -46,7 +46,7 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) {
return nil, fmt.Errorf("invalid labels filepath:%s, group %s:%w", file, group.Name, err) return nil, fmt.Errorf("invalid labels filepath:%s, group %s:%w", file, group.Name, err)
} }
} }
rule.group = &group rule.group = group
} }
} }
groups = append(groups, gr...) groups = append(groups, gr...)

View file

@ -32,18 +32,31 @@ Examples:
absolute path to all .yaml files in root.`) absolute path to all .yaml files in root.`)
validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates") validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428")
basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url") datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter."+
basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url") " E.g. http://127.0.0.1:8428")
remoteWriteURL = flag.String("remotewrite.url", "", "Optional URL to remote-write compatible storage where to write timeseries"+ basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username for -datasource.url")
"based on active alerts. E.g. http://127.0.0.1:8428") basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password for -datasource.url")
evaluationInterval = flag.Duration("evaluationInterval", 1*time.Minute, "How often to evaluate the rules. Default 1m")
remoteWriteURL = flag.String("remotewrite.url", "", "Optional URL to Victoria Metrics or VMInsert where to persist alerts state"+
" in form of timeseries. E.g. http://127.0.0.1:8428")
remoteWriteUsername = flag.String("remotewrite.basicAuth.username", "", "Optional basic auth username for -remotewrite.url")
remoteWritePassword = flag.String("remotewrite.basicAuth.password", "", "Optional basic auth password for -remotewrite.url")
remoteReadURL = flag.String("remoteread.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+
" state. This configuration makes sense only if `vmalert` was configured with `remotewrite.url` before and has been successfully persisted its state."+
" E.g. http://127.0.0.1:8428")
remoteReadUsername = flag.String("remoteread.basicAuth.username", "", "Optional basic auth username for -remoteread.url")
remoteReadPassword = flag.String("remoteread.basicAuth.password", "", "Optional basic auth password for -remoteread.url")
remoteReadLookBack = flag.Duration("remoteread.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries."+
" For example, if lookback=1h then range from now() to now()-1h will be scanned.")
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules. Default 1m")
notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093")
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier")
) )
// TODO: hot configuration reload // TODO: hot configuration reload
// TODO: alerts state persistence
func main() { func main() {
envflag.Parse() envflag.Parse()
buildinfo.Init() buildinfo.Init()
@ -73,6 +86,8 @@ func main() {
c, err := remotewrite.NewClient(ctx, remotewrite.Config{ c, err := remotewrite.NewClient(ctx, remotewrite.Config{
Addr: *remoteWriteURL, Addr: *remoteWriteURL,
FlushInterval: *evaluationInterval, FlushInterval: *evaluationInterval,
BasicAuthUser: *remoteWriteUsername,
BasicAuthPass: *remoteWritePassword,
}) })
if err != nil { if err != nil {
logger.Fatalf("failed to init remotewrite client: %s", err) logger.Fatalf("failed to init remotewrite client: %s", err)
@ -80,13 +95,24 @@ func main() {
w.rw = c w.rw = c
} }
var restoreDS *datasource.VMStorage
if *remoteReadURL != "" {
restoreDS = datasource.NewVMStorage(*remoteReadURL, *remoteReadUsername, *remoteReadPassword, &http.Client{})
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i := range groups { for _, g := range groups {
if restoreDS != nil {
err := g.Restore(ctx, restoreDS, *remoteReadLookBack)
if err != nil {
logger.Errorf("error while restoring state for group %q: %s", g.Name, err)
}
}
wg.Add(1) wg.Add(1)
go func(group Group) { go func(group Group) {
w.run(ctx, group, *evaluationInterval) w.run(ctx, group, *evaluationInterval)
wg.Done() wg.Done()
}(groups[i]) }(g)
} }
go httpserver.Serve(*httpListenAddr, (&requestHandler{groups: groups}).handler) go httpserver.Serve(*httpListenAddr, (&requestHandler{groups: groups}).handler)

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metricsql" "github.com/VictoriaMetrics/metricsql"
) )
@ -22,6 +23,19 @@ type Group struct {
Rules []*Rule Rules []*Rule
} }
// Restore restores alerts state for all group rules with For > 0
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
for _, rule := range g.Rules {
if rule.For == 0 {
return nil
}
if err := rule.Restore(ctx, q, lookback); err != nil {
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
}
}
return nil
}
// Rule is basic alert entity // Rule is basic alert entity
type Rule struct { type Rule struct {
Name string `yaml:"alert"` Name string `yaml:"alert"`
@ -30,7 +44,7 @@ type Rule struct {
Labels map[string]string `yaml:"labels"` Labels map[string]string `yaml:"labels"`
Annotations map[string]string `yaml:"annotations"` Annotations map[string]string `yaml:"annotations"`
group *Group group Group
// guard status fields // guard status fields
mu sync.RWMutex mu sync.RWMutex
@ -83,7 +97,9 @@ func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
for _, m := range qMetrics { for _, m := range qMetrics {
h := hash(m) h := hash(m)
updated[h] = struct{}{} updated[h] = struct{}{}
if _, ok := r.alerts[h]; ok { if a, ok := r.alerts[h]; ok {
// update Value field with latest value
a.Value = m.Value
continue continue
} }
a, err := r.newAlert(m) a, err := r.newAlert(m)
@ -125,6 +141,10 @@ func hash(m datasource.Metric) uint64 {
return labels[i].Name < labels[j].Name return labels[i].Name < labels[j].Name
}) })
for _, l := range labels { for _, l := range labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
hash.Write([]byte(l.Name)) hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value)) hash.Write([]byte(l.Value))
hash.Write([]byte("\xff")) hash.Write([]byte("\xff"))
@ -144,6 +164,10 @@ func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) {
// 1. use data labels // 1. use data labels
for _, l := range m.Labels { for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
a.Labels[l.Name] = l.Value a.Labels[l.Name] = l.Value
} }
@ -194,7 +218,8 @@ func (r *Rule) AlertsAPI() []*APIAlert {
func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert { func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert {
return &APIAlert{ return &APIAlert{
ID: a.ID, // encode as string to avoid rounding
ID: fmt.Sprintf("%d", a.ID),
Name: a.Name, Name: a.Name,
Group: a.Group, Group: a.Group,
Expression: r.Expr, Expression: r.Expr,
@ -239,6 +264,8 @@ func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prom
return newTimeSeries(1, labels, timestamp) return newTimeSeries(1, labels, timestamp)
} }
// alertForToTimeSeries returns a timeseries that represents
// state of active alerts, where value is time when alert become active
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string) labels := make(map[string]string)
for k, v := range a.Labels { for k, v := range a.Labels {
@ -268,3 +295,46 @@ func newTimeSeries(value float64, labels map[string]string, timestamp time.Time)
} }
return ts return ts
} }
// Restore restores the state of active alerts basing on previously written timeseries.
// Restore restores only Start field. Field State will be always Pending and supposed
// to be updated on next Eval, as well as Value field.
func (r *Rule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
// Get the last datapoint in range via MetricsQL `last_over_time`.
// We don't use plain PromQL since Prometheus doesn't support
// remote write protocol which is used for state persistence in vmalert.
expr := fmt.Sprintf("last_over_time(%s{alertname=%q}[%ds])",
alertForStateMetricName, r.Name, int(lookback.Seconds()))
qMetrics, err := q.Query(ctx, expr)
if err != nil {
return err
}
for _, m := range qMetrics {
labels := m.Labels
m.Labels = make([]datasource.Label, 0)
// drop all extra labels, so hash key will
// be identical to timeseries received in Eval
for _, l := range labels {
if l.Name == alertNameLabel {
continue
}
// drop all overridden labels
if _, ok := r.Labels[l.Name]; ok {
continue
}
m.Labels = append(m.Labels, l)
}
a, err := r.newAlert(m)
if err != nil {
return fmt.Errorf("failed to create alert: %s", err)
}
a.ID = hash(m)
a.State = notifier.StatePending
a.Start = time.Unix(int64(m.Value), 0)
r.alerts[a.ID] = a
logger.Infof("alert %q.%q restored to state at %v", a.Group, a.Name, a.Start)
}
return nil
}

View file

@ -157,53 +157,62 @@ func TestRule_Exec(t *testing.T) {
map[uint64]*notifier.Alert{}, map[uint64]*notifier.Alert{},
}, },
{ {
newTestRule("single-firing", 0), newTestRule("empty labels", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {datasource.Metric{}},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, hash(datasource.Metric{}): {State: notifier.StateFiring},
},
},
{
newTestRule("single-firing", 0),
[][]datasource.Metric{
{metricWithLabels(t, "name", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateFiring},
}, },
}, },
{ {
newTestRule("single-firing=>inactive", 0), newTestRule("single-firing=>inactive", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateInactive},
}, },
}, },
{ {
newTestRule("single-firing=>inactive=>firing", 0), newTestRule("single-firing=>inactive=>firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateFiring},
}, },
}, },
{ {
newTestRule("single-firing=>inactive=>firing=>inactive", 0), newTestRule("single-firing=>inactive=>firing=>inactive", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateInactive},
}, },
}, },
{ {
newTestRule("single-firing=>inactive=>firing=>inactive=>empty", 0), newTestRule("single-firing=>inactive=>firing=>inactive=>empty", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
{}, {},
}, },
@ -212,45 +221,45 @@ func TestRule_Exec(t *testing.T) {
{ {
newTestRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0), newTestRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
{}, {},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateFiring},
}, },
}, },
{ {
newTestRule("multiple-firing", 0), newTestRule("multiple-firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{ {
metricWithLabels(t, "__name__", "foo"), metricWithLabels(t, "name", "foo"),
metricWithLabels(t, "__name__", "foo1"), metricWithLabels(t, "name", "foo1"),
metricWithLabels(t, "__name__", "foo2"), metricWithLabels(t, "name", "foo2"),
}, },
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateFiring},
hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo1")): {State: notifier.StateFiring},
hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo2")): {State: notifier.StateFiring},
}, },
}, },
{ {
newTestRule("multiple-steps-firing", 0), newTestRule("multiple-steps-firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "__name__", "foo1")}, {metricWithLabels(t, "name", "foo1")},
{metricWithLabels(t, "__name__", "foo2")}, {metricWithLabels(t, "name", "foo2")},
}, },
// 1: fire first alert // 1: fire first alert
// 2: fire second alert, set first inactive // 2: fire second alert, set first inactive
// 3: fire third alert, set second inactive, delete first one // 3: fire third alert, set second inactive, delete first one
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateInactive}, hash(metricWithLabels(t, "name", "foo1")): {State: notifier.StateInactive},
hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo2")): {State: notifier.StateFiring},
}, },
}, },
{ {
@ -258,93 +267,93 @@ func TestRule_Exec(t *testing.T) {
[][]datasource.Metric{ [][]datasource.Metric{
{ {
// metrics with the same labelset should result in one alert // metrics with the same labelset should result in one alert
metricWithLabels(t, "__name__", "foo", "type", "bar"), metricWithLabels(t, "name", "foo", "type", "bar"),
metricWithLabels(t, "type", "bar", "__name__", "foo"), metricWithLabels(t, "type", "bar", "name", "foo"),
}, },
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo", "type", "bar")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo", "type", "bar")): {State: notifier.StateFiring},
}, },
}, },
{ {
newTestRule("for-pending", time.Minute), newTestRule("for-pending", time.Minute),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StatePending},
}, },
}, },
{ {
newTestRule("for-fired", time.Millisecond), newTestRule("for-fired", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateFiring},
}, },
}, },
{ {
newTestRule("for-pending=>inactive", time.Millisecond), newTestRule("for-pending=>inactive", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
// empty step to reset pending alerts // empty step to reset pending alerts
{}, {},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateInactive},
}, },
}, },
{ {
newTestRule("for-pending=>firing=>inactive", time.Millisecond), newTestRule("for-pending=>firing=>inactive", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
// empty step to reset pending alerts // empty step to reset pending alerts
{}, {},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateInactive},
}, },
}, },
{ {
newTestRule("for-pending=>firing=>inactive=>pending", time.Millisecond), newTestRule("for-pending=>firing=>inactive=>pending", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
// empty step to reset pending alerts // empty step to reset pending alerts
{}, {},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StatePending},
}, },
}, },
{ {
newTestRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond), newTestRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
// empty step to reset pending alerts // empty step to reset pending alerts
{}, {},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "__name__", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring}, hash(metricWithLabels(t, "name", "foo")): {State: notifier.StateFiring},
}, },
}, },
} }
fakeGroup := &Group{Name: "TestRule_Exec"} fakeGroup := Group{Name: "TestRule_Exec"}
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
tc.rule.group = fakeGroup tc.rule.group = fakeGroup
for _, step := range tc.steps { for _, step := range tc.steps {
fq.reset() fq.reset()
fq.add(t, step...) fq.add(step...)
if err := tc.rule.Exec(context.TODO(), fq); err != nil { if err := tc.rule.Exec(context.TODO(), fq); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
@ -390,10 +399,137 @@ func (fq *fakeQuerier) reset() {
fq.metrics = fq.metrics[:0] fq.metrics = fq.metrics[:0]
} }
func (fq *fakeQuerier) add(t *testing.T, metrics ...datasource.Metric) { func (fq *fakeQuerier) add(metrics ...datasource.Metric) {
fq.metrics = append(fq.metrics, metrics...) fq.metrics = append(fq.metrics, metrics...)
} }
func (fq fakeQuerier) Query(ctx context.Context, query string) ([]datasource.Metric, error) { func (fq fakeQuerier) Query(ctx context.Context, query string) ([]datasource.Metric, error) {
return fq.metrics, nil return fq.metrics, nil
} }
func TestRule_Restore(t *testing.T) {
testCases := []struct {
rule *Rule
metrics []datasource.Metric
expAlerts map[uint64]*notifier.Alert
}{
{
newTestRuleWithLabels("no extra labels"),
[]datasource.Metric{
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
"__name__", alertForStateMetricName,
alertNameLabel, "",
),
},
map[uint64]*notifier.Alert{
hash(datasource.Metric{}): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
},
},
{
newTestRuleWithLabels("metric labels"),
[]datasource.Metric{
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
"__name__", alertForStateMetricName,
alertNameLabel, "",
"foo", "bar",
"namespace", "baz",
),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t,
"foo", "bar",
"namespace", "baz",
)): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
},
},
{
newTestRuleWithLabels("rule labels", "source", "vm"),
[]datasource.Metric{
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
"__name__", alertForStateMetricName,
alertNameLabel, "",
"foo", "bar",
"namespace", "baz",
// following pair supposed to be dropped
"source", "vm",
),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t,
"foo", "bar",
"namespace", "baz",
)): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
},
},
{
newTestRuleWithLabels("multiple alerts"),
[]datasource.Metric{
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
"__name__", alertForStateMetricName,
"host", "localhost-1",
),
metricWithValueAndLabels(t, float64(time.Now().Truncate(2*time.Hour).Unix()),
"__name__", alertForStateMetricName,
"host", "localhost-2",
),
metricWithValueAndLabels(t, float64(time.Now().Truncate(3*time.Hour).Unix()),
"__name__", alertForStateMetricName,
"host", "localhost-3",
),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)},
hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending,
Start: time.Now().Truncate(2 * time.Hour)},
hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending,
Start: time.Now().Truncate(3 * time.Hour)},
},
},
}
fakeGroup := Group{Name: "TestRule_Exec"}
for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{}
tc.rule.group = fakeGroup
fq.add(tc.metrics...)
if err := tc.rule.Restore(context.TODO(), fq, time.Hour); err != nil {
t.Fatalf("unexpected err: %s", err)
}
if len(tc.rule.alerts) != len(tc.expAlerts) {
t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts))
}
for key, exp := range tc.expAlerts {
got, ok := tc.rule.alerts[key]
if !ok {
t.Fatalf("expected to have key %d", key)
}
if got.State != exp.State {
t.Fatalf("expected state %d; got %d", exp.State, got.State)
}
if got.Start != exp.Start {
t.Fatalf("expected Start %v; got %v", exp.Start, got.Start)
}
}
})
}
}
func newTestRuleWithLabels(name string, labels ...string) *Rule {
r := newTestRule(name, 0)
r.Labels = make(map[string]string)
for i := 0; i < len(labels); i += 2 {
r.Labels[labels[i]] = labels[i+1]
}
return r
}
func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric {
t.Helper()
m := metricWithLabels(t, labels...)
m.Value = value
return m
}

View file

@ -6,7 +6,7 @@ groups:
expr: vm_rows > 0 expr: vm_rows > 0
labels: labels:
label: bar label: bar
template: "{{ $value|humanize }}" host: "{{ $labels.instance }}"
annotations: annotations:
summary: "{{ $value|humanize }}" summary: "{{ $value|humanize }}"
description: "{{$labels}}" description: "{{$labels}}"

View file

@ -14,7 +14,7 @@ import (
// APIAlert has info for an alert. // APIAlert has info for an alert.
type APIAlert struct { type APIAlert struct {
ID uint64 `json:"id"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Group string `json:"group"` Group string `json:"group"`
Expression string `json:"expression"` Expression string `json:"expression"`
@ -75,7 +75,7 @@ func (rh *requestHandler) list() ([]byte, error) {
// sort list of alerts for deterministic output // sort list of alerts for deterministic output
sort.Slice(lr.Data.Alerts, func(i, j int) bool { sort.Slice(lr.Data.Alerts, func(i, j int) bool {
return lr.Data.Alerts[i].Name < lr.Data.Alerts[j].Name return lr.Data.Alerts[i].ID < lr.Data.Alerts[j].ID
}) })
b, err := json.Marshal(lr) b, err := json.Marshal(lr)
@ -109,8 +109,8 @@ func (rh *requestHandler) alert(path string) ([]byte, error) {
if g.Name != group { if g.Name != group {
continue continue
} }
for i := range g.Rules { for _, rule := range g.Rules {
if apiAlert := g.Rules[i].AlertAPI(id); apiAlert != nil { if apiAlert := rule.AlertAPI(id); apiAlert != nil {
return json.Marshal(apiAlert) return json.Marshal(apiAlert)
} }
} }