mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
app/vmalert: properly register group and rules metrics
Commit 9ca74d1fff
introduced an issue with metrics registration. Due to metrics.Summary type always registered at the global state of metrics package, vmalert had increased memory and CPU usage after multiple configuration reloads.
This commit addresses this issue and properly registers metrics.Summary metric. Now metrics for group and rules must be explicitly registered before group.Start with group.Init method. It simplifies metrics usage an ensures that all needed metrics were registered and group is ready to start.
Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8532
This commit is contained in:
parent
348991d1b3
commit
bd11e00a59
9 changed files with 41 additions and 45 deletions
app/vmalert
docs/changelog
|
@ -169,7 +169,6 @@ groups:
|
|||
checkCfg(nil)
|
||||
groupsLen = lenLocked(m)
|
||||
if groupsLen != 2 {
|
||||
fmt.Println(m.groups)
|
||||
t.Fatalf("expected to have exactly 2 groups loaded; got %d", groupsLen)
|
||||
}
|
||||
|
||||
|
|
|
@ -84,6 +84,7 @@ func (m *manager) close() {
|
|||
func (m *manager) startGroup(ctx context.Context, g *rule.Group, restore bool) error {
|
||||
m.wg.Add(1)
|
||||
id := g.GetID()
|
||||
g.Init()
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
if restore {
|
||||
|
|
|
@ -159,12 +159,11 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
|
|||
ar.state = &ruleState{
|
||||
entries: make([]StateEntry, entrySize),
|
||||
}
|
||||
ar.metrics = newAlertingRuleMetrics(group.metrics.set, ar)
|
||||
return ar
|
||||
}
|
||||
|
||||
func (ar *AlertingRule) registerMetrics(g *Group) {
|
||||
ar.metrics = newAlertingRuleMetrics(g.metrics.set, ar)
|
||||
func (ar *AlertingRule) registerMetrics(set *metrics.Set) {
|
||||
ar.metrics = newAlertingRuleMetrics(set, ar)
|
||||
}
|
||||
|
||||
// close unregisters rule metrics
|
||||
|
|
|
@ -789,6 +789,7 @@ func TestGroup_Restore(t *testing.T) {
|
|||
}
|
||||
|
||||
fg := NewGroup(config.Group{Name: "TestRestore", Rules: rules}, fqr, time.Second, nil)
|
||||
fg.Init()
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
|
|
|
@ -71,7 +71,7 @@ type Group struct {
|
|||
// evalCancel stores the cancel fn for interrupting
|
||||
// rules evaluation. Used on groups update() and close().
|
||||
evalCancel context.CancelFunc
|
||||
|
||||
// metrics contains metrics for group and its rules, will be created during Init()
|
||||
metrics *groupMetrics
|
||||
// evalAlignment will make the timestamp of group query
|
||||
// requests be aligned with interval
|
||||
|
@ -87,29 +87,6 @@ type groupMetrics struct {
|
|||
iterationInterval *metrics.Gauge
|
||||
}
|
||||
|
||||
func newGroupMetrics(g *Group) *groupMetrics {
|
||||
m := &groupMetrics{}
|
||||
m.set = metrics.NewSet()
|
||||
|
||||
labels := fmt.Sprintf(`group=%q, file=%q`, g.Name, g.File)
|
||||
m.iterationTotal = m.set.NewCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
|
||||
m.iterationDuration = m.set.NewSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
||||
m.iterationMissed = m.set.NewCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
|
||||
m.iterationInterval = m.set.NewGauge(fmt.Sprintf(`vmalert_iteration_interval_seconds{%s}`, labels), func() float64 {
|
||||
i := g.Interval.Seconds()
|
||||
return i
|
||||
})
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *groupMetrics) start() {
|
||||
metrics.RegisterSet(m.set)
|
||||
}
|
||||
|
||||
func (m *groupMetrics) close() {
|
||||
metrics.UnregisterSet(m.set, true)
|
||||
}
|
||||
|
||||
// merges group rule labels into result map
|
||||
// set2 has priority over set1.
|
||||
func mergeLabels(groupName, ruleName string, set1, set2 map[string]string) map[string]string {
|
||||
|
@ -166,7 +143,6 @@ func NewGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
|
|||
for _, h := range cfg.NotifierHeaders {
|
||||
g.NotifierHeaders[h.Key] = h.Value
|
||||
}
|
||||
g.metrics = newGroupMetrics(g)
|
||||
rules := make([]Rule, len(cfg.Rules))
|
||||
for i, r := range cfg.Rules {
|
||||
var extraLabels map[string]string
|
||||
|
@ -283,7 +259,7 @@ func (g *Group) updateWith(newGroup *Group) error {
|
|||
}
|
||||
// add the rest of rules from registry
|
||||
for _, nr := range rulesRegistry {
|
||||
nr.registerMetrics(g)
|
||||
nr.registerMetrics(g.metrics.set)
|
||||
newRules = append(newRules, nr)
|
||||
}
|
||||
|
||||
|
@ -319,18 +295,37 @@ func (g *Group) Close() {
|
|||
g.InterruptEval()
|
||||
<-g.finishedCh
|
||||
|
||||
g.metrics.close()
|
||||
g.closeGroupMetrics()
|
||||
}
|
||||
|
||||
func (g *Group) closeGroupMetrics() {
|
||||
metrics.UnregisterSet(g.metrics.set, true)
|
||||
}
|
||||
|
||||
// SkipRandSleepOnGroupStart will skip random sleep delay in group first evaluation
|
||||
var SkipRandSleepOnGroupStart bool
|
||||
|
||||
// Init must be called before group Start()
|
||||
func (g *Group) Init() {
|
||||
ns := metrics.NewSet()
|
||||
g.metrics = &groupMetrics{set: ns}
|
||||
labels := fmt.Sprintf(`group=%q, file=%q`, g.Name, g.File)
|
||||
g.metrics.iterationTotal = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
|
||||
g.metrics.iterationDuration = g.metrics.set.NewSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
||||
g.metrics.iterationMissed = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
|
||||
g.metrics.iterationInterval = g.metrics.set.NewGauge(fmt.Sprintf(`vmalert_iteration_interval_seconds{%s}`, labels), func() float64 {
|
||||
i := g.Interval.Seconds()
|
||||
return i
|
||||
})
|
||||
for i := range g.Rules {
|
||||
g.Rules[i].registerMetrics(g.metrics.set)
|
||||
}
|
||||
metrics.RegisterSet(g.metrics.set)
|
||||
}
|
||||
|
||||
// Start starts group's evaluation
|
||||
func (g *Group) Start(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
|
||||
defer func() { close(g.finishedCh) }()
|
||||
|
||||
g.metrics.start()
|
||||
|
||||
evalTS := time.Now()
|
||||
// sleep random duration to spread group rules evaluation
|
||||
// over time in order to reduce load on datasource.
|
||||
|
|
|
@ -39,10 +39,11 @@ func TestUpdateWith(t *testing.T) {
|
|||
f := func(currentRules, newRules []config.Rule) {
|
||||
t.Helper()
|
||||
|
||||
ns := metrics.NewSet()
|
||||
g := &Group{
|
||||
Name: "test",
|
||||
Name: "test",
|
||||
metrics: &groupMetrics{set: ns},
|
||||
}
|
||||
g.metrics = newGroupMetrics(g)
|
||||
qb := &datasource.FakeQuerier{}
|
||||
for _, r := range currentRules {
|
||||
r.ID = config.HashRule(r)
|
||||
|
@ -52,7 +53,6 @@ func TestUpdateWith(t *testing.T) {
|
|||
ng := &Group{
|
||||
Name: "test",
|
||||
}
|
||||
ng.metrics = newGroupMetrics(ng)
|
||||
for _, r := range newRules {
|
||||
r.ID = config.HashRule(r)
|
||||
ng.Rules = append(ng.Rules, ng.newRule(qb, r))
|
||||
|
@ -198,7 +198,7 @@ func TestUpdateDuringRandSleep(t *testing.T) {
|
|||
Interval: 100 * time.Hour,
|
||||
updateCh: make(chan *Group),
|
||||
}
|
||||
g.metrics = newGroupMetrics(g)
|
||||
g.Init()
|
||||
go g.Start(context.Background(), nil, nil, nil)
|
||||
|
||||
rule1 := AlertingRule{
|
||||
|
@ -213,7 +213,6 @@ func TestUpdateDuringRandSleep(t *testing.T) {
|
|||
&rule1,
|
||||
},
|
||||
}
|
||||
g1.metrics = newGroupMetrics(g1)
|
||||
g.updateCh <- g1
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
g.mu.RLock()
|
||||
|
@ -237,7 +236,6 @@ func TestUpdateDuringRandSleep(t *testing.T) {
|
|||
&rule2,
|
||||
},
|
||||
}
|
||||
g2.metrics = newGroupMetrics(g2)
|
||||
g.updateCh <- g2
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
g.mu.RLock()
|
||||
|
@ -331,6 +329,7 @@ func TestGroupStart(t *testing.T) {
|
|||
finished := make(chan struct{})
|
||||
fs.Add(m1)
|
||||
fs.Add(m2)
|
||||
g.Init()
|
||||
go func() {
|
||||
g.Start(context.Background(), func() []notifier.Notifier { return []notifier.Notifier{fn} }, nil, fs)
|
||||
close(finished)
|
||||
|
@ -487,6 +486,7 @@ func TestCloseWithEvalInterruption(t *testing.T) {
|
|||
|
||||
const evalInterval = time.Millisecond
|
||||
g := NewGroup(groups[0], fq, evalInterval, nil)
|
||||
g.Init()
|
||||
|
||||
go g.Start(context.Background(), nil, nil, nil)
|
||||
|
||||
|
|
|
@ -110,13 +110,11 @@ func NewRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
|
|||
rr.state = &ruleState{
|
||||
entries: make([]StateEntry, entrySize),
|
||||
}
|
||||
rr.metrics = newRecordingRuleMetrics(group.metrics.set, rr)
|
||||
|
||||
return rr
|
||||
}
|
||||
|
||||
func (rr *RecordingRule) registerMetrics(g *Group) {
|
||||
rr.metrics = newRecordingRuleMetrics(g.metrics.set, rr)
|
||||
func (rr *RecordingRule) registerMetrics(set *metrics.Set) {
|
||||
rr.metrics = newRecordingRuleMetrics(set, rr)
|
||||
}
|
||||
|
||||
// close unregisters rule metrics
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
|
@ -30,7 +32,7 @@ type Rule interface {
|
|||
// unregister Rule metrics
|
||||
unregisterMetrics()
|
||||
// register Rule metrics with the given group
|
||||
registerMetrics(g *Group)
|
||||
registerMetrics(set *metrics.Set)
|
||||
}
|
||||
|
||||
var errDuplicate = errors.New("result contains metrics with the same labelset during evaluation. See https://docs.victoriametrics.com/vmalert/#series-with-the-same-labelset for details")
|
||||
|
|
|
@ -41,6 +41,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* BUGFIX: fix typo in metric `vm_mmaped_files` by renaming it to `vm_mmapped_files`.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix memory leak when sending alerts with `-notifier.blackhole` enabled. Bug was introduced in [v1.112.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.112.0).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): properly compare rules `group.checksum` and statically define `group.id` at creation time. See [this PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8540) for details.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix memory leak during rule group updates on reload. Bug was introduced in [v1.112.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.112.0). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8532).
|
||||
|
||||
## [v1.113.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.113.0)
|
||||
|
||||
|
|
Loading…
Reference in a new issue