VictoriaMetrics/app/vmalert/manager.go

162 lines
3.7 KiB
Go
Raw Normal View History

package main
import (
"context"
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// manager controls group states
type manager struct {
querierBuilder datasource.QuerierBuilder
notifiers []notifier.Notifier
rw *remotewrite.Client
// remote read builder.
rr datasource.QuerierBuilder
wg sync.WaitGroup
labels map[string]string
groupsMu sync.RWMutex
groups map[uint64]*Group
}
// AlertAPI generates APIAlert object from alert by its ID(hash)
func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
m.groupsMu.RLock()
defer m.groupsMu.RUnlock()
g, ok := m.groups[gID]
if !ok {
return nil, fmt.Errorf("can't find group with id %q", gID)
}
for _, rule := range g.Rules {
ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
if apiAlert := ar.AlertAPI(aID); apiAlert != nil {
return apiAlert, nil
}
}
return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name)
}
func (m *manager) start(ctx context.Context, groupsCfg []config.Group) error {
return m.update(ctx, groupsCfg, true)
}
func (m *manager) close() {
if m.rw != nil {
err := m.rw.Close()
if err != nil {
logger.Fatalf("cannot stop the remotewrite: %s", err)
}
}
m.wg.Wait()
}
func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) error {
if restore && m.rr != nil {
err := group.Restore(ctx, m.rr, *remoteReadLookBack, m.labels)
if err != nil {
if !*remoteReadIgnoreRestoreErrors {
return fmt.Errorf("failed to restore state for group %q: %w", group.Name, err)
}
logger.Errorf("error while restoring state for group %q: %s", group.Name, err)
}
}
m.wg.Add(1)
id := group.ID()
go func() {
group.start(ctx, m.notifiers, m.rw)
m.wg.Done()
}()
m.groups[id] = group
return nil
}
func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore bool) error {
groupsRegistry := make(map[uint64]*Group)
for _, cfg := range groupsCfg {
ng := newGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels)
groupsRegistry[ng.ID()] = ng
}
type updateItem struct {
old *Group
new *Group
}
var toUpdate []updateItem
m.groupsMu.Lock()
for _, og := range m.groups {
ng, ok := groupsRegistry[og.ID()]
if !ok {
// old group is not present in new list,
// so must be stopped and deleted
og.close()
delete(m.groups, og.ID())
og = nil
continue
}
delete(groupsRegistry, ng.ID())
if og.Checksum != ng.Checksum {
toUpdate = append(toUpdate, updateItem{old: og, new: ng})
}
}
for _, ng := range groupsRegistry {
if err := m.startGroup(ctx, ng, restore); err != nil {
return err
}
}
m.groupsMu.Unlock()
if len(toUpdate) > 0 {
var wg sync.WaitGroup
for _, item := range toUpdate {
wg.Add(1)
go func(old *Group, new *Group) {
old.updateCh <- new
wg.Done()
}(item.old, item.new)
}
wg.Wait()
}
return nil
}
func (g *Group) toAPI() APIGroup {
g.mu.RLock()
defer g.mu.RUnlock()
ag := APIGroup{
// encode as string to avoid rounding
ID: fmt.Sprintf("%d", g.ID()),
Name: g.Name,
Type: g.Type.String(),
File: g.File,
Interval: g.Interval.String(),
Concurrency: g.Concurrency,
ExtraFilterLabels: g.ExtraFilterLabels,
}
for _, r := range g.Rules {
switch v := r.(type) {
case *AlertingRule:
ag.AlertingRules = append(ag.AlertingRules, v.RuleAPI())
case *RecordingRule:
ag.RecordingRules = append(ag.RecordingRules, v.RuleAPI())
}
}
return ag
}