mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
app/vmalert: fix possible data race on group checksum
1. fix possible data race on group checksum when reload is called concurrently. Before, it didn't affect much but might update the group one more time. 2. remove the unnecessary g.mu.RLock() and compute group.id at newGroup creation. Changes to group.ID() indicate that type and interval have changed, and the group is new. Related PR: https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8540
This commit is contained in:
parent
acdd6faecf
commit
b97cacad45
10 changed files with 48 additions and 38 deletions
|
@ -83,7 +83,7 @@ func (m *manager) close() {
|
|||
|
||||
func (m *manager) startGroup(ctx context.Context, g *rule.Group, restore bool) error {
|
||||
m.wg.Add(1)
|
||||
id := g.ID()
|
||||
id := g.GetID()
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
if restore {
|
||||
|
@ -112,7 +112,7 @@ func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore
|
|||
}
|
||||
}
|
||||
ng := rule.NewGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels)
|
||||
groupsRegistry[ng.ID()] = ng
|
||||
groupsRegistry[ng.GetID()] = ng
|
||||
}
|
||||
|
||||
if rrPresent && m.rw == nil {
|
||||
|
@ -130,17 +130,17 @@ func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore
|
|||
|
||||
m.groupsMu.Lock()
|
||||
for _, og := range m.groups {
|
||||
ng, ok := groupsRegistry[og.ID()]
|
||||
ng, ok := groupsRegistry[og.GetID()]
|
||||
if !ok {
|
||||
// old group is not present in new list,
|
||||
// so must be stopped and deleted
|
||||
og.Close()
|
||||
delete(m.groups, og.ID())
|
||||
delete(m.groups, og.GetID())
|
||||
og = nil
|
||||
continue
|
||||
}
|
||||
delete(groupsRegistry, ng.ID())
|
||||
if og.Checksum != ng.Checksum {
|
||||
delete(groupsRegistry, ng.GetID())
|
||||
if og.GetCheckSum() != ng.GetCheckSum() {
|
||||
toUpdate = append(toUpdate, updateItem{old: og, new: ng})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ func TestManagerUpdate_Success(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, wantG := range groupsExpected {
|
||||
gotG, ok := m.groups[wantG.ID()]
|
||||
gotG, ok := m.groups[wantG.CreateID()]
|
||||
if !ok {
|
||||
t.Fatalf("expected to have group %q", wantG.Name)
|
||||
}
|
||||
|
@ -258,7 +258,7 @@ func compareGroups(t *testing.T, a, b *rule.Group) {
|
|||
}
|
||||
for i, r := range a.Rules {
|
||||
got, want := r, b.Rules[i]
|
||||
if a.ID() != b.ID() {
|
||||
if a.CreateID() != b.CreateID() {
|
||||
t.Fatalf("expected to have rule %q; got %q", want.ID(), got.ID())
|
||||
}
|
||||
if err := rule.CompareRules(t, want, got); err != nil {
|
||||
|
|
|
@ -133,7 +133,7 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
|
|||
KeepFiringFor: cfg.KeepFiringFor.Duration(),
|
||||
Labels: cfg.Labels,
|
||||
Annotations: cfg.Annotations,
|
||||
GroupID: group.ID(),
|
||||
GroupID: group.GetID(),
|
||||
GroupName: group.Name,
|
||||
File: group.File,
|
||||
EvalInterval: group.Interval,
|
||||
|
|
|
@ -198,7 +198,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||
fakeGroup := Group{
|
||||
Name: "TestRule_Exec",
|
||||
}
|
||||
rule.GroupID = fakeGroup.ID()
|
||||
rule.GroupID = fakeGroup.GetID()
|
||||
for i, step := range steps {
|
||||
fq.Reset()
|
||||
fq.Add(step...)
|
||||
|
@ -556,7 +556,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||
|
||||
fq := &datasource.FakeQuerier{}
|
||||
rule.q = fq
|
||||
rule.GroupID = fakeGroup.ID()
|
||||
rule.GroupID = fakeGroup.GetID()
|
||||
fq.Add(data...)
|
||||
gotTS, err := rule.execRange(context.TODO(), time.Unix(1, 0), time.Unix(5, 0))
|
||||
if err != nil {
|
||||
|
@ -625,7 +625,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||
{State: notifier.StatePending, ActiveAt: time.Unix(5, 0)},
|
||||
}, map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{"alertname": "for-pending"}): {
|
||||
GroupID: fakeGroup.ID(),
|
||||
GroupID: fakeGroup.GetID(),
|
||||
Name: "for-pending",
|
||||
Labels: map[string]string{"alertname": "for-pending"},
|
||||
Annotations: map[string]string{"activeAt": "5000"},
|
||||
|
@ -644,7 +644,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||
{State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
|
||||
}, map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{"alertname": "for-firing"}): {
|
||||
GroupID: fakeGroup.ID(),
|
||||
GroupID: fakeGroup.GetID(),
|
||||
Name: "for-firing",
|
||||
Labels: map[string]string{"alertname": "for-firing"},
|
||||
Annotations: map[string]string{"activeAt": "1000"},
|
||||
|
@ -664,7 +664,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||
{State: notifier.StatePending, ActiveAt: time.Unix(5, 0)},
|
||||
}, map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{"alertname": "for-hold-pending"}): {
|
||||
GroupID: fakeGroup.ID(),
|
||||
GroupID: fakeGroup.GetID(),
|
||||
Name: "for-hold-pending",
|
||||
Labels: map[string]string{"alertname": "for-hold-pending"},
|
||||
Annotations: map[string]string{"activeAt": "5000"},
|
||||
|
@ -719,7 +719,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||
},
|
||||
}, map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{"alertname": "multi-series"}): {
|
||||
GroupID: fakeGroup.ID(),
|
||||
GroupID: fakeGroup.GetID(),
|
||||
Name: "multi-series",
|
||||
Labels: map[string]string{"alertname": "multi-series"},
|
||||
Annotations: map[string]string{},
|
||||
|
@ -730,7 +730,7 @@ func TestAlertingRuleExecRange(t *testing.T) {
|
|||
For: 3 * time.Second,
|
||||
},
|
||||
hash(map[string]string{"alertname": "multi-series", "foo": "bar"}): {
|
||||
GroupID: fakeGroup.ID(),
|
||||
GroupID: fakeGroup.GetID(),
|
||||
Name: "multi-series",
|
||||
Labels: map[string]string{"alertname": "multi-series", "foo": "bar"},
|
||||
Annotations: map[string]string{},
|
||||
|
@ -1041,7 +1041,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
|||
Name: "TestRule_Exec",
|
||||
}
|
||||
fq := &datasource.FakeQuerier{}
|
||||
rule.GroupID = fakeGroup.ID()
|
||||
rule.GroupID = fakeGroup.GetID()
|
||||
rule.q = fq
|
||||
rule.state = &ruleState{
|
||||
entries: make([]StateEntry, 10),
|
||||
|
|
|
@ -40,7 +40,9 @@ var (
|
|||
|
||||
// Group is an entity for grouping rules
|
||||
type Group struct {
|
||||
mu sync.RWMutex
|
||||
mu sync.RWMutex
|
||||
// id stores the unique id for the group, and shouldn't change after the group create.
|
||||
id uint64
|
||||
Name string
|
||||
File string
|
||||
Rules []Rule
|
||||
|
@ -49,10 +51,11 @@ type Group struct {
|
|||
EvalOffset *time.Duration
|
||||
// EvalDelay will adjust timestamp for rule evaluation requests to compensate intentional query delay from datasource.
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155
|
||||
EvalDelay *time.Duration
|
||||
Limit int
|
||||
Concurrency int
|
||||
Checksum string
|
||||
EvalDelay *time.Duration
|
||||
Limit int
|
||||
Concurrency int
|
||||
// checksum stores the hash of yaml definition for this group.
|
||||
checksum string
|
||||
LastEvaluation time.Time
|
||||
|
||||
Labels map[string]string
|
||||
|
@ -93,9 +96,7 @@ func newGroupMetrics(g *Group) *groupMetrics {
|
|||
m.iterationDuration = m.set.NewSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
||||
m.iterationMissed = m.set.NewCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
|
||||
m.iterationInterval = m.set.NewGauge(fmt.Sprintf(`vmalert_iteration_interval_seconds{%s}`, labels), func() float64 {
|
||||
g.mu.RLock()
|
||||
i := g.Interval.Seconds()
|
||||
g.mu.RUnlock()
|
||||
return i
|
||||
})
|
||||
return m
|
||||
|
@ -135,7 +136,7 @@ func NewGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
|
|||
Interval: cfg.Interval.Duration(),
|
||||
Limit: cfg.Limit,
|
||||
Concurrency: cfg.Concurrency,
|
||||
Checksum: cfg.Checksum,
|
||||
checksum: cfg.Checksum,
|
||||
Params: cfg.Params,
|
||||
Headers: make(map[string]string),
|
||||
NotifierHeaders: make(map[string]string),
|
||||
|
@ -158,6 +159,7 @@ func NewGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
|
|||
if cfg.EvalDelay != nil {
|
||||
g.EvalDelay = &cfg.EvalDelay.D
|
||||
}
|
||||
g.id = g.CreateID()
|
||||
for _, h := range cfg.Headers {
|
||||
g.Headers[h.Key] = h.Value
|
||||
}
|
||||
|
@ -194,12 +196,22 @@ func (g *Group) newRule(qb datasource.QuerierBuilder, r config.Rule) Rule {
|
|||
return NewRecordingRule(qb, g, r)
|
||||
}
|
||||
|
||||
// ID return unique group ID that consists of
|
||||
// rules file and group Name
|
||||
func (g *Group) ID() uint64 {
|
||||
// GetCheckSum returns group checksum
|
||||
func (g *Group) GetCheckSum() string {
|
||||
g.mu.RLock()
|
||||
defer g.mu.RUnlock()
|
||||
return g.checksum
|
||||
}
|
||||
|
||||
// GetID returns the unique group ID
|
||||
func (g *Group) GetID() uint64 {
|
||||
return g.id
|
||||
}
|
||||
|
||||
// CreateID returns the unique ID based on group basic fields.
|
||||
// Should only be called when creating new group,
|
||||
// and use GetID() afterward.
|
||||
func (g *Group) CreateID() uint64 {
|
||||
hash := fnv.New64a()
|
||||
hash.Write([]byte(g.File))
|
||||
hash.Write([]byte("\xff"))
|
||||
|
@ -274,17 +286,14 @@ func (g *Group) updateWith(newGroup *Group) error {
|
|||
nr.registerMetrics(g)
|
||||
newRules = append(newRules, nr)
|
||||
}
|
||||
// note that g.Interval is not updated here
|
||||
// so the value can be compared later in
|
||||
// group.Start function
|
||||
g.Type = newGroup.Type
|
||||
|
||||
g.Concurrency = newGroup.Concurrency
|
||||
g.Params = newGroup.Params
|
||||
g.Headers = newGroup.Headers
|
||||
g.NotifierHeaders = newGroup.NotifierHeaders
|
||||
g.Labels = newGroup.Labels
|
||||
g.Limit = newGroup.Limit
|
||||
g.Checksum = newGroup.Checksum
|
||||
g.checksum = newGroup.checksum
|
||||
g.Rules = newRules
|
||||
return nil
|
||||
}
|
||||
|
@ -326,7 +335,7 @@ func (g *Group) Start(ctx context.Context, nts func() []notifier.Notifier, rw re
|
|||
// sleep random duration to spread group rules evaluation
|
||||
// over time in order to reduce load on datasource.
|
||||
if !SkipRandSleepOnGroupStart {
|
||||
sleepBeforeStart := delayBeforeStart(evalTS, g.ID(), g.Interval, g.EvalOffset)
|
||||
sleepBeforeStart := delayBeforeStart(evalTS, g.GetID(), g.Interval, g.EvalOffset)
|
||||
g.infof("will start in %v", sleepBeforeStart)
|
||||
|
||||
sleepTimer := time.NewTimer(sleepBeforeStart)
|
||||
|
|
|
@ -88,7 +88,7 @@ func NewRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
|
|||
Name: cfg.Record,
|
||||
Expr: cfg.Expr,
|
||||
Labels: cfg.Labels,
|
||||
GroupID: group.ID(),
|
||||
GroupID: group.GetID(),
|
||||
GroupName: group.Name,
|
||||
File: group.File,
|
||||
q: qb.BuildWithParams(datasource.QuerierParams{
|
||||
|
|
|
@ -36,7 +36,7 @@ func TestHandler(t *testing.T) {
|
|||
g.ExecOnce(context.Background(), func() []notifier.Notifier { return nil }, nil, time.Time{})
|
||||
|
||||
m := &manager{groups: map[uint64]*rule.Group{
|
||||
g.ID(): g,
|
||||
g.CreateID(): g,
|
||||
}}
|
||||
rh := &requestHandler{m: m}
|
||||
|
||||
|
|
|
@ -325,7 +325,7 @@ func groupToAPI(g *rule.Group) apiGroup {
|
|||
g = g.DeepCopy()
|
||||
ag := apiGroup{
|
||||
// encode as string to avoid rounding
|
||||
ID: fmt.Sprintf("%d", g.ID()),
|
||||
ID: fmt.Sprintf("%d", g.GetID()),
|
||||
|
||||
Name: g.Name,
|
||||
Type: g.Type.String(),
|
||||
|
|
|
@ -41,7 +41,7 @@ func TestRecordingToApi(t *testing.T) {
|
|||
Type: ruleTypeRecording,
|
||||
DatasourceType: "prometheus",
|
||||
ID: "1248",
|
||||
GroupID: fmt.Sprintf("%d", g.ID()),
|
||||
GroupID: fmt.Sprintf("%d", g.CreateID()),
|
||||
GroupName: "group",
|
||||
File: "rules.yaml",
|
||||
MaxUpdates: 44,
|
||||
|
|
|
@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), [vmagent](https://docs.victoriametrics.com/vmagent/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): reduce number of allocations that could increase CPU usage on ingestion. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8501) for details.
|
||||
* BUGFIX: fix typo in metric `vm_mmaped_files` by renaming it to `vm_mmapped_files`.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix memory leak when sending alerts with `-notifier.blackhole` enabled. Bug was introduced in [v1.112.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.112.0).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): properly compare rules `group.checksum` and statically define `group.id` at creation time. See [this PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8540) for details.
|
||||
|
||||
## [v1.113.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.113.0)
|
||||
|
||||
|
|
Loading…
Reference in a new issue