From 0be5b09fb41f5490a18898fc85bc58534ed4d75b Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Sun, 9 Aug 2020 07:41:29 +0100 Subject: [PATCH] app/vmalert: extend metrics set exported by `vmalert` #573 (#654) * app/vmalert: extend metrics set exported by `vmalert` #573 New metrics were added to improve observability: + vmalert_alerts_pending{alertname, group} - number of pending alerts per group per alert; + vmalert_alerts_acitve{alertname, group} - number of active alerts per group per alert; + vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error during prev execution, is 0 if no errors happened; + vmalert_recording_rules_error{recording, group} - is 1 if recording rule ended up with error during prev execution, is 0 if no errors happened; * vmalert_iteration_total{group, file} - now contains group and file name labels. This should improve control over specific groups; * vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups; Some collisions for alerts and recording rules are possible, because neither group name nor alert/recording rule name are unique for compatibility reasons. Commit contains list of TODOs for Unregistering metrics since groups and rules are ephemeral and could be removed without application restart. In order to unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13 * app/vmalert: extend metrics set exported by `vmalert` #573 The changes are following: * add an ID label to rules metrics, since `name` collisions within one group is a common case - see the k8s example alerts; * supports metrics unregistering on rule updates. Consider the case when one rule was added or removed from the group, or the whole group was added or removed. The change depends on https://github.com/VictoriaMetrics/metrics/pull/16 where race condition for Unregister method was fixed. --- app/vmalert/alerting.go | 59 ++++++++++++++++++++++++++++++++++++++-- app/vmalert/group.go | 48 +++++++++++++++++++++++--------- app/vmalert/metrics.go | 39 ++++++++++++++++++++++++++ app/vmalert/recording.go | 30 ++++++++++++++++++-- app/vmalert/rule.go | 3 ++ 5 files changed, 160 insertions(+), 19 deletions(-) create mode 100644 app/vmalert/metrics.go diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index eda3d0550..11538e829 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/metrics" ) // AlertingRule is basic alert entity @@ -36,19 +37,71 @@ type AlertingRule struct { // resets on every successful Exec // may be used as Health state lastExecError error + + metrics *alertingRuleMetrics } -func newAlertingRule(gID uint64, cfg config.Rule) *AlertingRule { - return &AlertingRule{ +type alertingRuleMetrics struct { + errors *gauge + pending *gauge + active *gauge +} + +func newAlertingRule(group *Group, cfg config.Rule) *AlertingRule { + ar := &AlertingRule{ RuleID: cfg.ID, Name: cfg.Alert, Expr: cfg.Expr, For: cfg.For, Labels: cfg.Labels, Annotations: cfg.Annotations, - GroupID: gID, + GroupID: group.ID(), alerts: make(map[uint64]*notifier.Alert), + metrics: &alertingRuleMetrics{}, } + + labels := fmt.Sprintf(`alertname=%q, group=%q, id="%d"`, ar.Name, group.Name, ar.ID()) + ar.metrics.pending = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels), + func() float64 { + ar.mu.Lock() + defer ar.mu.Unlock() + var num int + for _, a := range ar.alerts { + if a.State == notifier.StatePending { + num++ + } + } + return float64(num) + }) + ar.metrics.active = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels), + func() float64 { + ar.mu.Lock() + defer ar.mu.Unlock() + var num int + for _, a := range ar.alerts { + if a.State == notifier.StateFiring { + num++ + } + } + return float64(num) + }) + ar.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_error{%s}`, labels), + func() float64 { + ar.mu.Lock() + defer ar.mu.Unlock() + if ar.lastExecError == nil { + return 0 + } + return 1 + }) + return ar +} + +// Close unregisters rule metrics +func (ar *AlertingRule) Close() { + metrics.UnregisterMetric(ar.metrics.active.name) + metrics.UnregisterMetric(ar.metrics.pending.name) + metrics.UnregisterMetric(ar.metrics.errors.name) } // String implements Stringer interface diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 052815350..2d6f1ad95 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -30,6 +30,21 @@ type Group struct { // channel accepts new Group obj // which supposed to update current group updateCh chan *Group + + metrics *groupMetrics +} + +type groupMetrics struct { + iterationTotal *counter + iterationDuration *summary +} + +func newGroupMetrics(name, file string) *groupMetrics { + m := &groupMetrics{} + labels := fmt.Sprintf(`group=%q, file=%q`, name, file) + m.iterationTotal = getOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels)) + m.iterationDuration = getOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels)) + return m } func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string]string) *Group { @@ -42,6 +57,7 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string finishedCh: make(chan struct{}), updateCh: make(chan *Group), } + g.metrics = newGroupMetrics(g.Name, g.File) if g.Interval == 0 { g.Interval = defaultInterval } @@ -69,9 +85,9 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string func (g *Group) newRule(rule config.Rule) Rule { if rule.Alert != "" { - return newAlertingRule(g.ID(), rule) + return newAlertingRule(g, rule) } - return newRecordingRule(g.ID(), rule) + return newRecordingRule(g, rule) } // ID return unique group ID that consists of @@ -117,6 +133,7 @@ func (g *Group) updateWith(newGroup *Group) error { if !ok { // old rule is not present in the new list // so we mark it for removing + g.Rules[i].Close() g.Rules[i] = nil continue } @@ -144,18 +161,9 @@ func (g *Group) updateWith(newGroup *Group) error { } var ( - iterationTotal = metrics.NewCounter(`vmalert_iteration_total`) - iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`) - - execTotal = metrics.NewCounter(`vmalert_execution_total`) - execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) - execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) - alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`) alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`) - - remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) ) func (g *Group) close() { @@ -164,6 +172,12 @@ func (g *Group) close() { } close(g.doneCh) <-g.finishedCh + + metrics.UnregisterMetric(g.metrics.iterationDuration.name) + metrics.UnregisterMetric(g.metrics.iterationTotal.name) + for _, rule := range g.Rules { + rule.Close() + } } func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { @@ -196,7 +210,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not g.mu.Unlock() logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) case <-t.C: - iterationTotal.Inc() + g.metrics.iterationTotal.Inc() iterationStart := time.Now() errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, g.Interval) @@ -206,7 +220,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not } } - iterationDuration.UpdateDuration(iterationStart) + g.metrics.iterationDuration.UpdateDuration(iterationStart) } } } @@ -251,6 +265,14 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurren return res } +var ( + execTotal = metrics.NewCounter(`vmalert_execution_total`) + execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) + execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) + + remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) +) + func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, interval time.Duration) error { execTotal.Inc() execStart := time.Now() diff --git a/app/vmalert/metrics.go b/app/vmalert/metrics.go new file mode 100644 index 000000000..011e394f7 --- /dev/null +++ b/app/vmalert/metrics.go @@ -0,0 +1,39 @@ +package main + +import "github.com/VictoriaMetrics/metrics" + +type gauge struct { + name string + *metrics.Gauge +} + +func getOrCreateGauge(name string, f func() float64) *gauge { + return &gauge{ + name: name, + Gauge: metrics.GetOrCreateGauge(name, f), + } +} + +type counter struct { + name string + *metrics.Counter +} + +func getOrCreateCounter(name string) *counter { + return &counter{ + name: name, + Counter: metrics.GetOrCreateCounter(name), + } +} + +type summary struct { + name string + *metrics.Summary +} + +func getOrCreateSummary(name string) *summary { + return &summary{ + name: name, + Summary: metrics.GetOrCreateSummary(name), + } +} diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 3aadd2a4e..9dd7b4de2 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/metrics" ) // RecordingRule is a Rule that supposed @@ -32,6 +33,12 @@ type RecordingRule struct { // resets on every successful Exec // may be used as Health state lastExecError error + + metrics *recordingRuleMetrics +} + +type recordingRuleMetrics struct { + errors *gauge } // String implements Stringer interface @@ -45,14 +52,31 @@ func (rr *RecordingRule) ID() uint64 { return rr.RuleID } -func newRecordingRule(gID uint64, cfg config.Rule) *RecordingRule { - return &RecordingRule{ +func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule { + rr := &RecordingRule{ RuleID: cfg.ID, Name: cfg.Record, Expr: cfg.Expr, Labels: cfg.Labels, - GroupID: gID, + GroupID: group.ID(), + metrics: &recordingRuleMetrics{}, } + labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID()) + rr.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels), + func() float64 { + rr.mu.Lock() + defer rr.mu.Unlock() + if rr.lastExecError == nil { + return 0 + } + return 1 + }) + return rr +} + +// Close unregisters rule metrics +func (rr *RecordingRule) Close() { + metrics.UnregisterMetric(rr.metrics.errors.name) } var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels") diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index e5498ac37..1410d2bd6 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -21,4 +21,7 @@ type Rule interface { // UpdateWith performs modification of current Rule // with fields of the given Rule. UpdateWith(Rule) error + // Close performs the shutdown procedures for rule + // such as metrics unregister + Close() }