mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
* app/vmalert: extend metrics set exported by `vmalert` #573 New metrics were added to improve observability: + vmalert_alerts_pending{alertname, group} - number of pending alerts per group per alert; + vmalert_alerts_acitve{alertname, group} - number of active alerts per group per alert; + vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error during prev execution, is 0 if no errors happened; + vmalert_recording_rules_error{recording, group} - is 1 if recording rule ended up with error during prev execution, is 0 if no errors happened; * vmalert_iteration_total{group, file} - now contains group and file name labels. This should improve control over specific groups; * vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups; Some collisions for alerts and recording rules are possible, because neither group name nor alert/recording rule name are unique for compatibility reasons. Commit contains list of TODOs for Unregistering metrics since groups and rules are ephemeral and could be removed without application restart. In order to unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13 * app/vmalert: extend metrics set exported by `vmalert` #573 The changes are following: * add an ID label to rules metrics, since `name` collisions within one group is a common case - see the k8s example alerts; * supports metrics unregistering on rule updates. Consider the case when one rule was added or removed from the group, or the whole group was added or removed. The change depends on https://github.com/VictoriaMetrics/metrics/pull/16 where race condition for Unregister method was fixed.
This commit is contained in:
parent
3fea7c39be
commit
78afc61896
5 changed files with 160 additions and 19 deletions
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// AlertingRule is basic alert entity
|
||||
|
@ -36,19 +37,71 @@ type AlertingRule struct {
|
|||
// resets on every successful Exec
|
||||
// may be used as Health state
|
||||
lastExecError error
|
||||
|
||||
metrics *alertingRuleMetrics
|
||||
}
|
||||
|
||||
func newAlertingRule(gID uint64, cfg config.Rule) *AlertingRule {
|
||||
return &AlertingRule{
|
||||
type alertingRuleMetrics struct {
|
||||
errors *gauge
|
||||
pending *gauge
|
||||
active *gauge
|
||||
}
|
||||
|
||||
func newAlertingRule(group *Group, cfg config.Rule) *AlertingRule {
|
||||
ar := &AlertingRule{
|
||||
RuleID: cfg.ID,
|
||||
Name: cfg.Alert,
|
||||
Expr: cfg.Expr,
|
||||
For: cfg.For,
|
||||
Labels: cfg.Labels,
|
||||
Annotations: cfg.Annotations,
|
||||
GroupID: gID,
|
||||
GroupID: group.ID(),
|
||||
alerts: make(map[uint64]*notifier.Alert),
|
||||
metrics: &alertingRuleMetrics{},
|
||||
}
|
||||
|
||||
labels := fmt.Sprintf(`alertname=%q, group=%q, id="%d"`, ar.Name, group.Name, ar.ID())
|
||||
ar.metrics.pending = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels),
|
||||
func() float64 {
|
||||
ar.mu.Lock()
|
||||
defer ar.mu.Unlock()
|
||||
var num int
|
||||
for _, a := range ar.alerts {
|
||||
if a.State == notifier.StatePending {
|
||||
num++
|
||||
}
|
||||
}
|
||||
return float64(num)
|
||||
})
|
||||
ar.metrics.active = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels),
|
||||
func() float64 {
|
||||
ar.mu.Lock()
|
||||
defer ar.mu.Unlock()
|
||||
var num int
|
||||
for _, a := range ar.alerts {
|
||||
if a.State == notifier.StateFiring {
|
||||
num++
|
||||
}
|
||||
}
|
||||
return float64(num)
|
||||
})
|
||||
ar.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_error{%s}`, labels),
|
||||
func() float64 {
|
||||
ar.mu.Lock()
|
||||
defer ar.mu.Unlock()
|
||||
if ar.lastExecError == nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
})
|
||||
return ar
|
||||
}
|
||||
|
||||
// Close unregisters rule metrics
|
||||
func (ar *AlertingRule) Close() {
|
||||
metrics.UnregisterMetric(ar.metrics.active.name)
|
||||
metrics.UnregisterMetric(ar.metrics.pending.name)
|
||||
metrics.UnregisterMetric(ar.metrics.errors.name)
|
||||
}
|
||||
|
||||
// String implements Stringer interface
|
||||
|
|
|
@ -30,6 +30,21 @@ type Group struct {
|
|||
// channel accepts new Group obj
|
||||
// which supposed to update current group
|
||||
updateCh chan *Group
|
||||
|
||||
metrics *groupMetrics
|
||||
}
|
||||
|
||||
type groupMetrics struct {
|
||||
iterationTotal *counter
|
||||
iterationDuration *summary
|
||||
}
|
||||
|
||||
func newGroupMetrics(name, file string) *groupMetrics {
|
||||
m := &groupMetrics{}
|
||||
labels := fmt.Sprintf(`group=%q, file=%q`, name, file)
|
||||
m.iterationTotal = getOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
|
||||
m.iterationDuration = getOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
||||
return m
|
||||
}
|
||||
|
||||
func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string]string) *Group {
|
||||
|
@ -42,6 +57,7 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string
|
|||
finishedCh: make(chan struct{}),
|
||||
updateCh: make(chan *Group),
|
||||
}
|
||||
g.metrics = newGroupMetrics(g.Name, g.File)
|
||||
if g.Interval == 0 {
|
||||
g.Interval = defaultInterval
|
||||
}
|
||||
|
@ -69,9 +85,9 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string
|
|||
|
||||
func (g *Group) newRule(rule config.Rule) Rule {
|
||||
if rule.Alert != "" {
|
||||
return newAlertingRule(g.ID(), rule)
|
||||
return newAlertingRule(g, rule)
|
||||
}
|
||||
return newRecordingRule(g.ID(), rule)
|
||||
return newRecordingRule(g, rule)
|
||||
}
|
||||
|
||||
// ID return unique group ID that consists of
|
||||
|
@ -117,6 +133,7 @@ func (g *Group) updateWith(newGroup *Group) error {
|
|||
if !ok {
|
||||
// old rule is not present in the new list
|
||||
// so we mark it for removing
|
||||
g.Rules[i].Close()
|
||||
g.Rules[i] = nil
|
||||
continue
|
||||
}
|
||||
|
@ -144,18 +161,9 @@ func (g *Group) updateWith(newGroup *Group) error {
|
|||
}
|
||||
|
||||
var (
|
||||
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
|
||||
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
|
||||
|
||||
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
||||
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
||||
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
||||
|
||||
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
||||
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
||||
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
||||
|
||||
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
||||
)
|
||||
|
||||
func (g *Group) close() {
|
||||
|
@ -164,6 +172,12 @@ func (g *Group) close() {
|
|||
}
|
||||
close(g.doneCh)
|
||||
<-g.finishedCh
|
||||
|
||||
metrics.UnregisterMetric(g.metrics.iterationDuration.name)
|
||||
metrics.UnregisterMetric(g.metrics.iterationTotal.name)
|
||||
for _, rule := range g.Rules {
|
||||
rule.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) {
|
||||
|
@ -196,7 +210,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not
|
|||
g.mu.Unlock()
|
||||
logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
||||
case <-t.C:
|
||||
iterationTotal.Inc()
|
||||
g.metrics.iterationTotal.Inc()
|
||||
iterationStart := time.Now()
|
||||
|
||||
errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, g.Interval)
|
||||
|
@ -206,7 +220,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not
|
|||
}
|
||||
}
|
||||
|
||||
iterationDuration.UpdateDuration(iterationStart)
|
||||
g.metrics.iterationDuration.UpdateDuration(iterationStart)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -251,6 +265,14 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurren
|
|||
return res
|
||||
}
|
||||
|
||||
var (
|
||||
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
||||
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
||||
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
||||
|
||||
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
||||
)
|
||||
|
||||
func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, interval time.Duration) error {
|
||||
execTotal.Inc()
|
||||
execStart := time.Now()
|
||||
|
|
39
app/vmalert/metrics.go
Normal file
39
app/vmalert/metrics.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package main
|
||||
|
||||
import "github.com/VictoriaMetrics/metrics"
|
||||
|
||||
type gauge struct {
|
||||
name string
|
||||
*metrics.Gauge
|
||||
}
|
||||
|
||||
func getOrCreateGauge(name string, f func() float64) *gauge {
|
||||
return &gauge{
|
||||
name: name,
|
||||
Gauge: metrics.GetOrCreateGauge(name, f),
|
||||
}
|
||||
}
|
||||
|
||||
type counter struct {
|
||||
name string
|
||||
*metrics.Counter
|
||||
}
|
||||
|
||||
func getOrCreateCounter(name string) *counter {
|
||||
return &counter{
|
||||
name: name,
|
||||
Counter: metrics.GetOrCreateCounter(name),
|
||||
}
|
||||
}
|
||||
|
||||
type summary struct {
|
||||
name string
|
||||
*metrics.Summary
|
||||
}
|
||||
|
||||
func getOrCreateSummary(name string) *summary {
|
||||
return &summary{
|
||||
name: name,
|
||||
Summary: metrics.GetOrCreateSummary(name),
|
||||
}
|
||||
}
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// RecordingRule is a Rule that supposed
|
||||
|
@ -32,6 +33,12 @@ type RecordingRule struct {
|
|||
// resets on every successful Exec
|
||||
// may be used as Health state
|
||||
lastExecError error
|
||||
|
||||
metrics *recordingRuleMetrics
|
||||
}
|
||||
|
||||
type recordingRuleMetrics struct {
|
||||
errors *gauge
|
||||
}
|
||||
|
||||
// String implements Stringer interface
|
||||
|
@ -45,14 +52,31 @@ func (rr *RecordingRule) ID() uint64 {
|
|||
return rr.RuleID
|
||||
}
|
||||
|
||||
func newRecordingRule(gID uint64, cfg config.Rule) *RecordingRule {
|
||||
return &RecordingRule{
|
||||
func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule {
|
||||
rr := &RecordingRule{
|
||||
RuleID: cfg.ID,
|
||||
Name: cfg.Record,
|
||||
Expr: cfg.Expr,
|
||||
Labels: cfg.Labels,
|
||||
GroupID: gID,
|
||||
GroupID: group.ID(),
|
||||
metrics: &recordingRuleMetrics{},
|
||||
}
|
||||
labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID())
|
||||
rr.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels),
|
||||
func() float64 {
|
||||
rr.mu.Lock()
|
||||
defer rr.mu.Unlock()
|
||||
if rr.lastExecError == nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
})
|
||||
return rr
|
||||
}
|
||||
|
||||
// Close unregisters rule metrics
|
||||
func (rr *RecordingRule) Close() {
|
||||
metrics.UnregisterMetric(rr.metrics.errors.name)
|
||||
}
|
||||
|
||||
var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels")
|
||||
|
|
|
@ -21,4 +21,7 @@ type Rule interface {
|
|||
// UpdateWith performs modification of current Rule
|
||||
// with fields of the given Rule.
|
||||
UpdateWith(Rule) error
|
||||
// Close performs the shutdown procedures for rule
|
||||
// such as metrics unregister
|
||||
Close()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue