mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
0157566fdb
The change introduces new entity `manager` which replaces `watchdog`, decouples requestHandler and groups. Manager supposed to control life cycle of groups, rules and config reloads. Groups export an ID method which returns a hash from filename and group name. ID supposed to be unique identifier across all loaded groups. Some tests were added to improve coverage. Bug with wrong annotation value if $value is used in templates after metrics being restored fixed. Notifier interface was extended to accept context. New set of metrics was introduced for config reload.
109 lines
2.5 KiB
Go
109 lines
2.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type manager struct {
|
|
storage datasource.Querier
|
|
notifier notifier.Notifier
|
|
|
|
rw *remotewrite.Client
|
|
rr datasource.Querier
|
|
|
|
wg sync.WaitGroup
|
|
|
|
groupsMu sync.RWMutex
|
|
groups map[uint64]*Group
|
|
}
|
|
|
|
// AlertAPI generates APIAlert object from alert by its id(hash)
|
|
func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
|
|
m.groupsMu.RLock()
|
|
defer m.groupsMu.RUnlock()
|
|
|
|
g, ok := m.groups[gID]
|
|
if !ok {
|
|
return nil, fmt.Errorf("can't find group with id %q", gID)
|
|
}
|
|
for _, rule := range g.Rules {
|
|
if apiAlert := rule.AlertAPI(aID); apiAlert != nil {
|
|
return apiAlert, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("can't func alert with id %q in group %q", aID, g.Name)
|
|
}
|
|
|
|
func (m *manager) start(ctx context.Context, path []string, validate bool) error {
|
|
return m.update(ctx, path, validate, true)
|
|
}
|
|
|
|
func (m *manager) close() {
|
|
if m.rw != nil {
|
|
err := m.rw.Close()
|
|
if err != nil {
|
|
logger.Fatalf("cannot stop the remotewrite: %s", err)
|
|
}
|
|
}
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *manager) startGroup(ctx context.Context, group Group, restore bool) {
|
|
if restore {
|
|
err := group.Restore(ctx, m.rr, *remoteReadLookBack)
|
|
if err != nil {
|
|
logger.Errorf("error while restoring state for group %q: %s", group.Name, err)
|
|
}
|
|
}
|
|
|
|
m.wg.Add(1)
|
|
id := group.ID()
|
|
go func() {
|
|
group.start(ctx, *evaluationInterval, m.storage, m.notifier, m.rw)
|
|
m.wg.Done()
|
|
}()
|
|
m.groups[id] = &group
|
|
}
|
|
|
|
func (m *manager) update(ctx context.Context, path []string, validate, restore bool) error {
|
|
logger.Infof("reading alert rules configuration file from %q", strings.Join(path, ";"))
|
|
newGroups, err := Parse(path, validate)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse configuration file: %s", err)
|
|
}
|
|
|
|
groupsRegistry := make(map[uint64]Group)
|
|
for _, ng := range newGroups {
|
|
groupsRegistry[ng.ID()] = ng
|
|
}
|
|
|
|
m.groupsMu.Lock()
|
|
for _, og := range m.groups {
|
|
id := og.ID()
|
|
ng, ok := groupsRegistry[id]
|
|
if !ok {
|
|
// old group is not present in new list
|
|
// and must be stopped and deleted
|
|
og.close()
|
|
delete(m.groups, og.ID())
|
|
og = nil
|
|
continue
|
|
}
|
|
og.updateWith(ng)
|
|
delete(groupsRegistry, ng.ID())
|
|
}
|
|
|
|
for _, ng := range groupsRegistry {
|
|
m.startGroup(ctx, ng, restore)
|
|
}
|
|
m.groupsMu.Unlock()
|
|
return nil
|
|
}
|