mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
0157566fdb
The change introduces new entity `manager` which replaces `watchdog`, decouples requestHandler and groups. Manager supposed to control life cycle of groups, rules and config reloads. Groups export an ID method which returns a hash from filename and group name. ID supposed to be unique identifier across all loaded groups. Some tests were added to improve coverage. Bug with wrong annotation value if $value is used in templates after metrics being restored fixed. Notifier interface was extended to accept context. New set of metrics was introduced for config reload.
165 lines
4.3 KiB
Go
165 lines
4.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// Group is an entity for grouping rules
|
|
type Group struct {
|
|
Name string
|
|
File string
|
|
Rules []*Rule
|
|
|
|
done chan struct{}
|
|
finished chan struct{}
|
|
}
|
|
|
|
// ID return unique group ID that consists of
|
|
// rules file and group name
|
|
func (g Group) ID() uint64 {
|
|
hash := fnv.New64a()
|
|
hash.Write([]byte(g.File))
|
|
hash.Write([]byte("\xff"))
|
|
hash.Write([]byte(g.Name))
|
|
return hash.Sum64()
|
|
}
|
|
|
|
// Restore restores alerts state for all group rules with For > 0
|
|
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
|
|
for _, rule := range g.Rules {
|
|
if rule.For == 0 {
|
|
return nil
|
|
}
|
|
if err := rule.Restore(ctx, q, lookback); err != nil {
|
|
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateWith updates existing group with
|
|
// passed group object.
|
|
func (g *Group) updateWith(newGroup Group) {
|
|
rulesRegistry := make(map[string]*Rule)
|
|
for _, nr := range newGroup.Rules {
|
|
rulesRegistry[nr.id()] = nr
|
|
}
|
|
|
|
for i, or := range g.Rules {
|
|
nr, ok := rulesRegistry[or.id()]
|
|
if !ok {
|
|
// old rule is not present in the new list
|
|
// and must be removed
|
|
or = nil
|
|
g.Rules = append(g.Rules[:i], g.Rules[i+1:]...)
|
|
continue
|
|
}
|
|
|
|
// copy all significant fields.
|
|
// alerts state isn't copied since
|
|
// it should be updated in next 2 Evals
|
|
or.For = nr.For
|
|
or.Expr = nr.Expr
|
|
or.Labels = nr.Labels
|
|
or.Annotations = nr.Annotations
|
|
delete(rulesRegistry, nr.id())
|
|
}
|
|
|
|
for _, nr := range rulesRegistry {
|
|
g.Rules = append(g.Rules, nr)
|
|
}
|
|
}
|
|
|
|
var (
|
|
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
|
|
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
|
|
|
|
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
|
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
|
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
|
|
|
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
|
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
|
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
|
|
|
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
|
|
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
|
)
|
|
|
|
func (g *Group) close() {
|
|
if g.done == nil {
|
|
return
|
|
}
|
|
close(g.done)
|
|
<-g.finished
|
|
}
|
|
|
|
func (g *Group) start(ctx context.Context, interval time.Duration,
|
|
querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
|
|
logger.Infof("group %q started", g.Name)
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Infof("group %q: context cancelled", g.Name)
|
|
close(g.finished)
|
|
return
|
|
case <-g.done:
|
|
logger.Infof("group %q: received stop signal", g.Name)
|
|
close(g.finished)
|
|
return
|
|
case <-t.C:
|
|
iterationTotal.Inc()
|
|
iterationStart := time.Now()
|
|
for _, rule := range g.Rules {
|
|
execTotal.Inc()
|
|
|
|
execStart := time.Now()
|
|
err := rule.Exec(ctx, querier)
|
|
execDuration.UpdateDuration(execStart)
|
|
|
|
if err != nil {
|
|
execErrors.Inc()
|
|
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err)
|
|
continue
|
|
}
|
|
|
|
var alertsToSend []notifier.Alert
|
|
for _, a := range rule.alerts {
|
|
if a.State != notifier.StatePending {
|
|
alertsToSend = append(alertsToSend, *a)
|
|
}
|
|
if a.State == notifier.StateInactive || rw == nil {
|
|
continue
|
|
}
|
|
tss := rule.AlertToTimeSeries(a, execStart)
|
|
for _, ts := range tss {
|
|
remoteWriteSent.Inc()
|
|
if err := rw.Push(ts); err != nil {
|
|
remoteWriteErrors.Inc()
|
|
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
|
|
}
|
|
}
|
|
}
|
|
if len(alertsToSend) > 0 {
|
|
alertsSent.Add(len(alertsToSend))
|
|
if err := nr.Send(ctx, alertsToSend); err != nil {
|
|
alertsSendErrors.Inc()
|
|
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err)
|
|
}
|
|
}
|
|
}
|
|
iterationDuration.UpdateDuration(iterationStart)
|
|
}
|
|
}
|
|
}
|