diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index eda3d0550..11538e829 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/metrics" ) // AlertingRule is basic alert entity @@ -36,19 +37,71 @@ type AlertingRule struct { // resets on every successful Exec // may be used as Health state lastExecError error + + metrics *alertingRuleMetrics } -func newAlertingRule(gID uint64, cfg config.Rule) *AlertingRule { - return &AlertingRule{ +type alertingRuleMetrics struct { + errors *gauge + pending *gauge + active *gauge +} + +func newAlertingRule(group *Group, cfg config.Rule) *AlertingRule { + ar := &AlertingRule{ RuleID: cfg.ID, Name: cfg.Alert, Expr: cfg.Expr, For: cfg.For, Labels: cfg.Labels, Annotations: cfg.Annotations, - GroupID: gID, + GroupID: group.ID(), alerts: make(map[uint64]*notifier.Alert), + metrics: &alertingRuleMetrics{}, } + + labels := fmt.Sprintf(`alertname=%q, group=%q, id="%d"`, ar.Name, group.Name, ar.ID()) + ar.metrics.pending = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels), + func() float64 { + ar.mu.Lock() + defer ar.mu.Unlock() + var num int + for _, a := range ar.alerts { + if a.State == notifier.StatePending { + num++ + } + } + return float64(num) + }) + ar.metrics.active = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels), + func() float64 { + ar.mu.Lock() + defer ar.mu.Unlock() + var num int + for _, a := range ar.alerts { + if a.State == notifier.StateFiring { + num++ + } + } + return float64(num) + }) + ar.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_error{%s}`, labels), + func() float64 { + ar.mu.Lock() + defer ar.mu.Unlock() + if ar.lastExecError == nil { + return 0 + } + return 1 + }) + return ar +} + +// Close unregisters rule metrics +func (ar *AlertingRule) Close() { + metrics.UnregisterMetric(ar.metrics.active.name) + metrics.UnregisterMetric(ar.metrics.pending.name) + metrics.UnregisterMetric(ar.metrics.errors.name) } // String implements Stringer interface diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 052815350..2d6f1ad95 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -30,6 +30,21 @@ type Group struct { // channel accepts new Group obj // which supposed to update current group updateCh chan *Group + + metrics *groupMetrics +} + +type groupMetrics struct { + iterationTotal *counter + iterationDuration *summary +} + +func newGroupMetrics(name, file string) *groupMetrics { + m := &groupMetrics{} + labels := fmt.Sprintf(`group=%q, file=%q`, name, file) + m.iterationTotal = getOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels)) + m.iterationDuration = getOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels)) + return m } func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string]string) *Group { @@ -42,6 +57,7 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string finishedCh: make(chan struct{}), updateCh: make(chan *Group), } + g.metrics = newGroupMetrics(g.Name, g.File) if g.Interval == 0 { g.Interval = defaultInterval } @@ -69,9 +85,9 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string func (g *Group) newRule(rule config.Rule) Rule { if rule.Alert != "" { - return newAlertingRule(g.ID(), rule) + return newAlertingRule(g, rule) } - return newRecordingRule(g.ID(), rule) + return newRecordingRule(g, rule) } // ID return unique group ID that consists of @@ -117,6 +133,7 @@ func (g *Group) updateWith(newGroup *Group) error { if !ok { // old rule is not present in the new list // so we mark it for removing + g.Rules[i].Close() g.Rules[i] = nil continue } @@ -144,18 +161,9 @@ func (g *Group) updateWith(newGroup *Group) error { } var ( - iterationTotal = metrics.NewCounter(`vmalert_iteration_total`) - iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`) - - execTotal = metrics.NewCounter(`vmalert_execution_total`) - execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) - execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) - alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`) alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`) - - remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) ) func (g *Group) close() { @@ -164,6 +172,12 @@ func (g *Group) close() { } close(g.doneCh) <-g.finishedCh + + metrics.UnregisterMetric(g.metrics.iterationDuration.name) + metrics.UnregisterMetric(g.metrics.iterationTotal.name) + for _, rule := range g.Rules { + rule.Close() + } } func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { @@ -196,7 +210,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not g.mu.Unlock() logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) case <-t.C: - iterationTotal.Inc() + g.metrics.iterationTotal.Inc() iterationStart := time.Now() errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, g.Interval) @@ -206,7 +220,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not } } - iterationDuration.UpdateDuration(iterationStart) + g.metrics.iterationDuration.UpdateDuration(iterationStart) } } } @@ -251,6 +265,14 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurren return res } +var ( + execTotal = metrics.NewCounter(`vmalert_execution_total`) + execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) + execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) + + remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) +) + func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, interval time.Duration) error { execTotal.Inc() execStart := time.Now() diff --git a/app/vmalert/metrics.go b/app/vmalert/metrics.go new file mode 100644 index 000000000..011e394f7 --- /dev/null +++ b/app/vmalert/metrics.go @@ -0,0 +1,39 @@ +package main + +import "github.com/VictoriaMetrics/metrics" + +type gauge struct { + name string + *metrics.Gauge +} + +func getOrCreateGauge(name string, f func() float64) *gauge { + return &gauge{ + name: name, + Gauge: metrics.GetOrCreateGauge(name, f), + } +} + +type counter struct { + name string + *metrics.Counter +} + +func getOrCreateCounter(name string) *counter { + return &counter{ + name: name, + Counter: metrics.GetOrCreateCounter(name), + } +} + +type summary struct { + name string + *metrics.Summary +} + +func getOrCreateSummary(name string) *summary { + return &summary{ + name: name, + Summary: metrics.GetOrCreateSummary(name), + } +} diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 3aadd2a4e..9dd7b4de2 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/metrics" ) // RecordingRule is a Rule that supposed @@ -32,6 +33,12 @@ type RecordingRule struct { // resets on every successful Exec // may be used as Health state lastExecError error + + metrics *recordingRuleMetrics +} + +type recordingRuleMetrics struct { + errors *gauge } // String implements Stringer interface @@ -45,14 +52,31 @@ func (rr *RecordingRule) ID() uint64 { return rr.RuleID } -func newRecordingRule(gID uint64, cfg config.Rule) *RecordingRule { - return &RecordingRule{ +func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule { + rr := &RecordingRule{ RuleID: cfg.ID, Name: cfg.Record, Expr: cfg.Expr, Labels: cfg.Labels, - GroupID: gID, + GroupID: group.ID(), + metrics: &recordingRuleMetrics{}, } + labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID()) + rr.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels), + func() float64 { + rr.mu.Lock() + defer rr.mu.Unlock() + if rr.lastExecError == nil { + return 0 + } + return 1 + }) + return rr +} + +// Close unregisters rule metrics +func (rr *RecordingRule) Close() { + metrics.UnregisterMetric(rr.metrics.errors.name) } var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels") diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index e5498ac37..1410d2bd6 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -21,4 +21,7 @@ type Rule interface { // UpdateWith performs modification of current Rule // with fields of the given Rule. UpdateWith(Rule) error + // Close performs the shutdown procedures for rule + // such as metrics unregister + Close() }