2020-05-10 16:58:17 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"hash/fnv"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Group is an entity for grouping rules
|
|
|
|
type Group struct {
|
|
|
|
Name string
|
|
|
|
File string
|
|
|
|
Rules []*Rule
|
|
|
|
|
2020-05-17 14:12:09 +00:00
|
|
|
doneCh chan struct{}
|
|
|
|
finishedCh chan struct{}
|
|
|
|
// channel accepts new Group obj
|
|
|
|
// which supposed to update current group
|
|
|
|
updateCh chan Group
|
2020-05-10 16:58:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ID return unique group ID that consists of
|
|
|
|
// rules file and group name
|
2020-05-17 14:12:09 +00:00
|
|
|
func (g *Group) ID() uint64 {
|
2020-05-10 16:58:17 +00:00
|
|
|
hash := fnv.New64a()
|
|
|
|
hash.Write([]byte(g.File))
|
|
|
|
hash.Write([]byte("\xff"))
|
|
|
|
hash.Write([]byte(g.Name))
|
|
|
|
return hash.Sum64()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Restore restores alerts state for all group rules with For > 0
|
|
|
|
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
|
|
|
|
for _, rule := range g.Rules {
|
|
|
|
if rule.For == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if err := rule.Restore(ctx, q, lookback); err != nil {
|
|
|
|
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// updateWith updates existing group with
|
2020-05-17 14:12:09 +00:00
|
|
|
// passed group object.
|
|
|
|
// Not thread-safe.
|
2020-05-10 16:58:17 +00:00
|
|
|
func (g *Group) updateWith(newGroup Group) {
|
|
|
|
rulesRegistry := make(map[string]*Rule)
|
|
|
|
for _, nr := range newGroup.Rules {
|
|
|
|
rulesRegistry[nr.id()] = nr
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, or := range g.Rules {
|
|
|
|
nr, ok := rulesRegistry[or.id()]
|
|
|
|
if !ok {
|
|
|
|
// old rule is not present in the new list
|
2020-05-15 06:55:22 +00:00
|
|
|
// so we mark it for removing
|
|
|
|
g.Rules[i] = nil
|
2020-05-10 16:58:17 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// copy all significant fields.
|
|
|
|
// alerts state isn't copied since
|
2020-05-17 14:13:22 +00:00
|
|
|
// it should be updated in next 2 Execs
|
2020-05-10 16:58:17 +00:00
|
|
|
or.For = nr.For
|
|
|
|
or.Expr = nr.Expr
|
|
|
|
or.Labels = nr.Labels
|
|
|
|
or.Annotations = nr.Annotations
|
|
|
|
delete(rulesRegistry, nr.id())
|
|
|
|
}
|
|
|
|
|
2020-05-15 06:55:22 +00:00
|
|
|
var newRules []*Rule
|
|
|
|
for _, r := range g.Rules {
|
|
|
|
if r == nil {
|
|
|
|
// skip nil rules
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newRules = append(newRules, r)
|
|
|
|
}
|
|
|
|
// add the rest of rules from registry
|
2020-05-10 16:58:17 +00:00
|
|
|
for _, nr := range rulesRegistry {
|
2020-05-15 06:55:22 +00:00
|
|
|
newRules = append(newRules, nr)
|
2020-05-10 16:58:17 +00:00
|
|
|
}
|
2020-05-15 06:55:22 +00:00
|
|
|
g.Rules = newRules
|
2020-05-10 16:58:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
|
|
|
|
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
|
|
|
|
|
|
|
|
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
|
|
|
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
|
|
|
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
|
|
|
|
|
|
|
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
|
|
|
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
|
|
|
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
|
|
|
|
|
|
|
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
|
|
|
|
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
|
|
|
)
|
|
|
|
|
|
|
|
func (g *Group) close() {
|
2020-05-17 14:12:09 +00:00
|
|
|
if g.doneCh == nil {
|
2020-05-10 16:58:17 +00:00
|
|
|
return
|
|
|
|
}
|
2020-05-17 14:12:09 +00:00
|
|
|
close(g.doneCh)
|
|
|
|
<-g.finishedCh
|
2020-05-10 16:58:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Group) start(ctx context.Context, interval time.Duration,
|
|
|
|
querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
|
|
|
|
logger.Infof("group %q started", g.Name)
|
|
|
|
t := time.NewTicker(interval)
|
|
|
|
defer t.Stop()
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
logger.Infof("group %q: context cancelled", g.Name)
|
2020-05-17 14:12:09 +00:00
|
|
|
close(g.finishedCh)
|
2020-05-10 16:58:17 +00:00
|
|
|
return
|
2020-05-17 14:12:09 +00:00
|
|
|
case <-g.doneCh:
|
2020-05-10 16:58:17 +00:00
|
|
|
logger.Infof("group %q: received stop signal", g.Name)
|
2020-05-17 14:12:09 +00:00
|
|
|
close(g.finishedCh)
|
2020-05-10 16:58:17 +00:00
|
|
|
return
|
2020-05-17 14:12:09 +00:00
|
|
|
case ng := <-g.updateCh:
|
|
|
|
g.updateWith(ng)
|
2020-05-10 16:58:17 +00:00
|
|
|
case <-t.C:
|
|
|
|
iterationTotal.Inc()
|
|
|
|
iterationStart := time.Now()
|
|
|
|
for _, rule := range g.Rules {
|
|
|
|
execTotal.Inc()
|
|
|
|
|
|
|
|
execStart := time.Now()
|
|
|
|
err := rule.Exec(ctx, querier)
|
|
|
|
execDuration.UpdateDuration(execStart)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
execErrors.Inc()
|
|
|
|
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
var alertsToSend []notifier.Alert
|
|
|
|
for _, a := range rule.alerts {
|
2020-05-17 14:13:22 +00:00
|
|
|
switch a.State {
|
|
|
|
case notifier.StateFiring:
|
|
|
|
// set End to execStart + 3 intervals
|
|
|
|
// so notifier can resolve it automatically if `vmalert`
|
|
|
|
// won't be able to send resolve for some reason
|
|
|
|
a.End = execStart.Add(3 * interval)
|
|
|
|
alertsToSend = append(alertsToSend, *a)
|
|
|
|
pushToRW(rw, rule, a, execStart)
|
|
|
|
case notifier.StatePending:
|
|
|
|
pushToRW(rw, rule, a, execStart)
|
|
|
|
case notifier.StateInactive:
|
|
|
|
// set End to execStart to notify
|
|
|
|
// that it was just resolved
|
|
|
|
a.End = execStart
|
2020-05-10 16:58:17 +00:00
|
|
|
alertsToSend = append(alertsToSend, *a)
|
|
|
|
}
|
|
|
|
}
|
2020-05-17 14:13:22 +00:00
|
|
|
if len(alertsToSend) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
alertsSent.Add(len(alertsToSend))
|
|
|
|
if err := nr.Send(ctx, alertsToSend); err != nil {
|
|
|
|
alertsSendErrors.Inc()
|
|
|
|
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err)
|
2020-05-10 16:58:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
iterationDuration.UpdateDuration(iterationStart)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-05-17 14:13:22 +00:00
|
|
|
|
|
|
|
func pushToRW(rw *remotewrite.Client, rule *Rule, a *notifier.Alert, timestamp time.Time) {
|
|
|
|
if rw == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
tss := rule.AlertToTimeSeries(a, timestamp)
|
|
|
|
remoteWriteSent.Add(len(tss))
|
|
|
|
for _, ts := range tss {
|
|
|
|
if err := rw.Push(ts); err != nil {
|
|
|
|
remoteWriteErrors.Inc()
|
|
|
|
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|