VictoriaMetrics/app/vmalert/group.go
Roman Khavronenko a249cd9d22
vmalert: fix the access to rules slice element by wrong index (#486)
During group's update rules deletion was causing slice
mutations while slice index was assumed to be unchanged.
This caused "slice bounds out of range" errors when multiple
rules were deleted sequentially.
2020-05-15 07:55:22 +01:00

175 lines
4.5 KiB
Go

package main
import (
"context"
"fmt"
"hash/fnv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// Group is an entity for grouping rules
type Group struct {
Name string
File string
Rules []*Rule
done chan struct{}
finished chan struct{}
}
// ID return unique group ID that consists of
// rules file and group name
func (g Group) ID() uint64 {
hash := fnv.New64a()
hash.Write([]byte(g.File))
hash.Write([]byte("\xff"))
hash.Write([]byte(g.Name))
return hash.Sum64()
}
// Restore restores alerts state for all group rules with For > 0
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
for _, rule := range g.Rules {
if rule.For == 0 {
return nil
}
if err := rule.Restore(ctx, q, lookback); err != nil {
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err)
}
}
return nil
}
// updateWith updates existing group with
// passed group object. Must be called
// under mutex lock.
func (g *Group) updateWith(newGroup Group) {
rulesRegistry := make(map[string]*Rule)
for _, nr := range newGroup.Rules {
rulesRegistry[nr.id()] = nr
}
for i, or := range g.Rules {
nr, ok := rulesRegistry[or.id()]
if !ok {
// old rule is not present in the new list
// so we mark it for removing
g.Rules[i] = nil
continue
}
// copy all significant fields.
// alerts state isn't copied since
// it should be updated in next 2 Evals
or.For = nr.For
or.Expr = nr.Expr
or.Labels = nr.Labels
or.Annotations = nr.Annotations
delete(rulesRegistry, nr.id())
}
var newRules []*Rule
for _, r := range g.Rules {
if r == nil {
// skip nil rules
continue
}
newRules = append(newRules, r)
}
// add the rest of rules from registry
for _, nr := range rulesRegistry {
newRules = append(newRules, nr)
}
g.Rules = newRules
}
var (
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
execTotal = metrics.NewCounter(`vmalert_execution_total`)
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
)
func (g *Group) close() {
if g.done == nil {
return
}
close(g.done)
<-g.finished
}
func (g *Group) start(ctx context.Context, interval time.Duration,
querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
logger.Infof("group %q started", g.Name)
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("group %q: context cancelled", g.Name)
close(g.finished)
return
case <-g.done:
logger.Infof("group %q: received stop signal", g.Name)
close(g.finished)
return
case <-t.C:
iterationTotal.Inc()
iterationStart := time.Now()
for _, rule := range g.Rules {
execTotal.Inc()
execStart := time.Now()
err := rule.Exec(ctx, querier)
execDuration.UpdateDuration(execStart)
if err != nil {
execErrors.Inc()
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err)
continue
}
var alertsToSend []notifier.Alert
for _, a := range rule.alerts {
if a.State != notifier.StatePending {
alertsToSend = append(alertsToSend, *a)
}
if a.State == notifier.StateInactive || rw == nil {
continue
}
tss := rule.AlertToTimeSeries(a, execStart)
for _, ts := range tss {
remoteWriteSent.Inc()
if err := rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
}
}
}
if len(alertsToSend) > 0 {
alertsSent.Add(len(alertsToSend))
if err := nr.Send(ctx, alertsToSend); err != nil {
alertsSendErrors.Inc()
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err)
}
}
}
iterationDuration.UpdateDuration(iterationStart)
}
}
}