mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
88538df267
* app/vmalert: support multiple notifier urls (#584) User now can set multiple notifier URLs in the same fashion as for other vmutils (e.g. vmagent). The same is correct for TLS setting for every configured URL. Alerts sending is done in sequential way for respecting the specified URLs order. * app/vmalert: add basicAuth support for notifier client (#585) The change adds possibility to set basicAuth creds for notifier client in the same fasion as for remote write/read and datasource.
135 lines
3.2 KiB
Go
135 lines
3.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// manager controls group states
|
|
type manager struct {
|
|
querier datasource.Querier
|
|
notifiers []notifier.Notifier
|
|
|
|
rw *remotewrite.Client
|
|
rr datasource.Querier
|
|
|
|
wg sync.WaitGroup
|
|
|
|
groupsMu sync.RWMutex
|
|
groups map[uint64]*Group
|
|
}
|
|
|
|
// AlertAPI generates APIAlert object from alert by its ID(hash)
|
|
func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
|
|
m.groupsMu.RLock()
|
|
defer m.groupsMu.RUnlock()
|
|
|
|
g, ok := m.groups[gID]
|
|
if !ok {
|
|
return nil, fmt.Errorf("can't find group with id %q", gID)
|
|
}
|
|
for _, rule := range g.Rules {
|
|
ar, ok := rule.(*AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if apiAlert := ar.AlertAPI(aID); apiAlert != nil {
|
|
return apiAlert, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name)
|
|
}
|
|
|
|
func (m *manager) start(ctx context.Context, path []string, validateTpl, validateExpr bool) error {
|
|
return m.update(ctx, path, validateTpl, validateExpr, true)
|
|
}
|
|
|
|
func (m *manager) close() {
|
|
if m.rw != nil {
|
|
err := m.rw.Close()
|
|
if err != nil {
|
|
logger.Fatalf("cannot stop the remotewrite: %s", err)
|
|
}
|
|
}
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) {
|
|
if restore && m.rr != nil {
|
|
err := group.Restore(ctx, m.rr, *remoteReadLookBack)
|
|
if err != nil {
|
|
logger.Errorf("error while restoring state for group %q: %s", group.Name, err)
|
|
}
|
|
}
|
|
|
|
m.wg.Add(1)
|
|
id := group.ID()
|
|
go func() {
|
|
group.start(ctx, m.querier, m.notifiers, m.rw)
|
|
m.wg.Done()
|
|
}()
|
|
m.groups[id] = group
|
|
}
|
|
|
|
func (m *manager) update(ctx context.Context, path []string, validateTpl, validateExpr, restore bool) error {
|
|
logger.Infof("reading rules configuration file from %q", strings.Join(path, ";"))
|
|
groupsCfg, err := config.Parse(path, validateTpl, validateExpr)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse configuration file: %s", err)
|
|
}
|
|
|
|
groupsRegistry := make(map[uint64]*Group)
|
|
for _, cfg := range groupsCfg {
|
|
ng := newGroup(cfg, *evaluationInterval)
|
|
groupsRegistry[ng.ID()] = ng
|
|
}
|
|
|
|
m.groupsMu.Lock()
|
|
for _, og := range m.groups {
|
|
ng, ok := groupsRegistry[og.ID()]
|
|
if !ok {
|
|
// old group is not present in new list
|
|
// and must be stopped and deleted
|
|
og.close()
|
|
delete(m.groups, og.ID())
|
|
og = nil
|
|
continue
|
|
}
|
|
og.updateCh <- ng
|
|
delete(groupsRegistry, ng.ID())
|
|
}
|
|
|
|
for _, ng := range groupsRegistry {
|
|
m.startGroup(ctx, ng, restore)
|
|
}
|
|
m.groupsMu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (g *Group) toAPI() APIGroup {
|
|
ag := APIGroup{
|
|
// encode as strings to avoid rounding
|
|
ID: fmt.Sprintf("%d", g.ID()),
|
|
Name: g.Name,
|
|
File: g.File,
|
|
Interval: g.Interval.String(),
|
|
Concurrency: g.Concurrency,
|
|
}
|
|
for _, r := range g.Rules {
|
|
switch v := r.(type) {
|
|
case *AlertingRule:
|
|
ag.AlertingRules = append(ag.AlertingRules, v.RuleAPI())
|
|
case *RecordingRule:
|
|
ag.RecordingRules = append(ag.RecordingRules, v.RuleAPI())
|
|
}
|
|
}
|
|
return ag
|
|
}
|