mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
df59ac7f0e
* app/vmalert: fix data race during hot-config reload
During hot-reload, the logic evokes the group update and rules evaluation
interruption simultaneously. Falsely assuming that interruption happens before
the update. However, it could happen that group will be updated first and only
after the rules evaluation will be cancelled. Which will result in permanent
interruption for all rules within the group.
The fix caches the cancel context function into local variable first. And only after
performs the group update. With cached cancel function we can safely call it without
worrying that we cancel the evaluation for already updated group.
Signed-off-by: hagen1778 <roman@victoriametrics.com>
* Revert "app/vmalert: fix data race during hot-config reload"
This reverts commit a4bb7e8932
.
* app/vmalert: fix data race during hot-config reload
During hot-reload, the logic evokes the group update and rules evaluation
interruption simultaneously. Falsely assuming that interruption happens before
the update. However, it could happen that group will be updated first and only
after the rules evaluation will be cancelled. Which will result in permanent
interruption for all rules within the group.
The fix cancels the evaulation context before applying the update, making sure
that the context will be cancelled for old group always.
Signed-off-by: hagen1778 <roman@victoriametrics.com>
* wip
Signed-off-by: hagen1778 <roman@victoriametrics.com>
---------
Signed-off-by: hagen1778 <roman@victoriametrics.com>
171 lines
4.2 KiB
Go
171 lines
4.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/rule"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// manager controls group states
|
|
type manager struct {
|
|
querierBuilder datasource.QuerierBuilder
|
|
notifiers func() []notifier.Notifier
|
|
|
|
rw remotewrite.RWClient
|
|
// remote read builder.
|
|
rr datasource.QuerierBuilder
|
|
|
|
wg sync.WaitGroup
|
|
labels map[string]string
|
|
|
|
groupsMu sync.RWMutex
|
|
groups map[uint64]*rule.Group
|
|
}
|
|
|
|
// ruleAPI generates apiRule object from alert by its ID(hash)
|
|
func (m *manager) ruleAPI(gID, rID uint64) (apiRule, error) {
|
|
m.groupsMu.RLock()
|
|
defer m.groupsMu.RUnlock()
|
|
|
|
g, ok := m.groups[gID]
|
|
if !ok {
|
|
return apiRule{}, fmt.Errorf("can't find group with id %d", gID)
|
|
}
|
|
for _, rule := range g.Rules {
|
|
if rule.ID() == rID {
|
|
return ruleToAPI(rule), nil
|
|
}
|
|
}
|
|
return apiRule{}, fmt.Errorf("can't find rule with id %d in group %q", rID, g.Name)
|
|
}
|
|
|
|
// alertAPI generates apiAlert object from alert by its ID(hash)
|
|
func (m *manager) alertAPI(gID, aID uint64) (*apiAlert, error) {
|
|
m.groupsMu.RLock()
|
|
defer m.groupsMu.RUnlock()
|
|
|
|
g, ok := m.groups[gID]
|
|
if !ok {
|
|
return nil, fmt.Errorf("can't find group with id %d", gID)
|
|
}
|
|
for _, r := range g.Rules {
|
|
ar, ok := r.(*rule.AlertingRule)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if apiAlert := alertToAPI(ar, aID); apiAlert != nil {
|
|
return apiAlert, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("can't find alert with id %d in group %q", aID, g.Name)
|
|
}
|
|
|
|
func (m *manager) start(ctx context.Context, groupsCfg []config.Group) error {
|
|
return m.update(ctx, groupsCfg, true)
|
|
}
|
|
|
|
func (m *manager) close() {
|
|
if m.rw != nil {
|
|
err := m.rw.Close()
|
|
if err != nil {
|
|
logger.Fatalf("cannot stop the remotewrite: %s", err)
|
|
}
|
|
}
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *manager) startGroup(ctx context.Context, g *rule.Group, restore bool) error {
|
|
m.wg.Add(1)
|
|
id := g.ID()
|
|
go func() {
|
|
defer m.wg.Done()
|
|
if restore {
|
|
g.Start(ctx, m.notifiers, m.rw, m.rr)
|
|
} else {
|
|
g.Start(ctx, m.notifiers, m.rw, nil)
|
|
}
|
|
}()
|
|
m.groups[id] = g
|
|
return nil
|
|
}
|
|
|
|
func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore bool) error {
|
|
var rrPresent, arPresent bool
|
|
groupsRegistry := make(map[uint64]*rule.Group)
|
|
for _, cfg := range groupsCfg {
|
|
for _, r := range cfg.Rules {
|
|
if rrPresent && arPresent {
|
|
continue
|
|
}
|
|
if r.Record != "" {
|
|
rrPresent = true
|
|
}
|
|
if r.Alert != "" {
|
|
arPresent = true
|
|
}
|
|
}
|
|
ng := rule.NewGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels)
|
|
groupsRegistry[ng.ID()] = ng
|
|
}
|
|
|
|
if rrPresent && m.rw == nil {
|
|
return fmt.Errorf("config contains recording rules but `-remoteWrite.url` isn't set")
|
|
}
|
|
if arPresent && m.notifiers == nil {
|
|
return fmt.Errorf("config contains alerting rules but neither `-notifier.url` nor `-notifier.config` nor `-notifier.blackhole` aren't set")
|
|
}
|
|
|
|
type updateItem struct {
|
|
old *rule.Group
|
|
new *rule.Group
|
|
}
|
|
var toUpdate []updateItem
|
|
|
|
m.groupsMu.Lock()
|
|
for _, og := range m.groups {
|
|
ng, ok := groupsRegistry[og.ID()]
|
|
if !ok {
|
|
// old group is not present in new list,
|
|
// so must be stopped and deleted
|
|
og.Close()
|
|
delete(m.groups, og.ID())
|
|
og = nil
|
|
continue
|
|
}
|
|
delete(groupsRegistry, ng.ID())
|
|
if og.Checksum != ng.Checksum {
|
|
toUpdate = append(toUpdate, updateItem{old: og, new: ng})
|
|
}
|
|
}
|
|
for _, ng := range groupsRegistry {
|
|
if err := m.startGroup(ctx, ng, restore); err != nil {
|
|
m.groupsMu.Unlock()
|
|
return err
|
|
}
|
|
}
|
|
m.groupsMu.Unlock()
|
|
|
|
if len(toUpdate) > 0 {
|
|
var wg sync.WaitGroup
|
|
for _, item := range toUpdate {
|
|
wg.Add(1)
|
|
// cancel evaluation so the Update will be applied as fast as possible.
|
|
// it is important to call InterruptEval before the update, because cancel fn
|
|
// can be re-assigned during the update.
|
|
item.old.InterruptEval()
|
|
go func(old *rule.Group, new *rule.Group) {
|
|
old.UpdateWith(new)
|
|
wg.Done()
|
|
}(item.old, item.new)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
return nil
|
|
}
|