mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
0be5b09fb4
* app/vmalert: extend metrics set exported by `vmalert` #573 New metrics were added to improve observability: + vmalert_alerts_pending{alertname, group} - number of pending alerts per group per alert; + vmalert_alerts_acitve{alertname, group} - number of active alerts per group per alert; + vmalert_alerts_error{alertname, group} - is 1 if alertname ended up with error during prev execution, is 0 if no errors happened; + vmalert_recording_rules_error{recording, group} - is 1 if recording rule ended up with error during prev execution, is 0 if no errors happened; * vmalert_iteration_total{group, file} - now contains group and file name labels. This should improve control over specific groups; * vmalert_iteration_duration_seconds{group, file} - now contains group and file name labels. This should improve control over specific groups; Some collisions for alerts and recording rules are possible, because neither group name nor alert/recording rule name are unique for compatibility reasons. Commit contains list of TODOs for Unregistering metrics since groups and rules are ephemeral and could be removed without application restart. In order to unlock Unregistering feature corresponding PR was filed - https://github.com/VictoriaMetrics/metrics/pull/13 * app/vmalert: extend metrics set exported by `vmalert` #573 The changes are following: * add an ID label to rules metrics, since `name` collisions within one group is a common case - see the k8s example alerts; * supports metrics unregistering on rule updates. Consider the case when one rule was added or removed from the group, or the whole group was added or removed. The change depends on https://github.com/VictoriaMetrics/metrics/pull/16 where race condition for Unregister method was fixed.
173 lines
4.2 KiB
Go
173 lines
4.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// RecordingRule is a Rule that supposed
|
|
// to evaluate configured Expression and
|
|
// return TimeSeries as result.
|
|
type RecordingRule struct {
|
|
RuleID uint64
|
|
Name string
|
|
Expr string
|
|
Labels map[string]string
|
|
GroupID uint64
|
|
|
|
// guard status fields
|
|
mu sync.RWMutex
|
|
// stores last moment of time Exec was called
|
|
lastExecTime time.Time
|
|
// stores last error that happened in Exec func
|
|
// resets on every successful Exec
|
|
// may be used as Health state
|
|
lastExecError error
|
|
|
|
metrics *recordingRuleMetrics
|
|
}
|
|
|
|
type recordingRuleMetrics struct {
|
|
errors *gauge
|
|
}
|
|
|
|
// String implements Stringer interface
|
|
func (rr *RecordingRule) String() string {
|
|
return rr.Name
|
|
}
|
|
|
|
// ID returns unique Rule ID
|
|
// within the parent Group.
|
|
func (rr *RecordingRule) ID() uint64 {
|
|
return rr.RuleID
|
|
}
|
|
|
|
func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule {
|
|
rr := &RecordingRule{
|
|
RuleID: cfg.ID,
|
|
Name: cfg.Record,
|
|
Expr: cfg.Expr,
|
|
Labels: cfg.Labels,
|
|
GroupID: group.ID(),
|
|
metrics: &recordingRuleMetrics{},
|
|
}
|
|
labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID())
|
|
rr.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels),
|
|
func() float64 {
|
|
rr.mu.Lock()
|
|
defer rr.mu.Unlock()
|
|
if rr.lastExecError == nil {
|
|
return 0
|
|
}
|
|
return 1
|
|
})
|
|
return rr
|
|
}
|
|
|
|
// Close unregisters rule metrics
|
|
func (rr *RecordingRule) Close() {
|
|
metrics.UnregisterMetric(rr.metrics.errors.name)
|
|
}
|
|
|
|
var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels")
|
|
|
|
// Exec executes RecordingRule expression via the given Querier.
|
|
func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) {
|
|
if !series {
|
|
return nil, nil
|
|
}
|
|
|
|
qMetrics, err := q.Query(ctx, rr.Expr)
|
|
|
|
rr.mu.Lock()
|
|
defer rr.mu.Unlock()
|
|
|
|
rr.lastExecTime = time.Now()
|
|
rr.lastExecError = err
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err)
|
|
}
|
|
|
|
duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics))
|
|
var tss []prompbmarshal.TimeSeries
|
|
for _, r := range qMetrics {
|
|
ts := rr.toTimeSeries(r, rr.lastExecTime)
|
|
h := hashTimeSeries(ts)
|
|
if _, ok := duplicates[h]; ok {
|
|
rr.lastExecError = errDuplicate
|
|
return nil, errDuplicate
|
|
}
|
|
duplicates[h] = ts
|
|
tss = append(tss, ts)
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func hashTimeSeries(ts prompbmarshal.TimeSeries) uint64 {
|
|
hash := fnv.New64a()
|
|
labels := ts.Labels
|
|
sort.Slice(labels, func(i, j int) bool {
|
|
return labels[i].Name < labels[j].Name
|
|
})
|
|
for _, l := range labels {
|
|
hash.Write([]byte(l.Name))
|
|
hash.Write([]byte(l.Value))
|
|
hash.Write([]byte("\xff"))
|
|
}
|
|
return hash.Sum64()
|
|
}
|
|
|
|
func (rr *RecordingRule) toTimeSeries(m datasource.Metric, timestamp time.Time) prompbmarshal.TimeSeries {
|
|
labels := make(map[string]string)
|
|
for _, l := range m.Labels {
|
|
labels[l.Name] = l.Value
|
|
}
|
|
labels["__name__"] = rr.Name
|
|
// override existing labels with configured ones
|
|
for k, v := range rr.Labels {
|
|
labels[k] = v
|
|
}
|
|
return newTimeSeries(m.Value, labels, timestamp)
|
|
}
|
|
|
|
// UpdateWith copies all significant fields.
|
|
// alerts state isn't copied since
|
|
// it should be updated in next 2 Execs
|
|
func (rr *RecordingRule) UpdateWith(r Rule) error {
|
|
nr, ok := r.(*RecordingRule)
|
|
if !ok {
|
|
return fmt.Errorf("BUG: attempt to update recroding rule with wrong type %#v", r)
|
|
}
|
|
rr.Expr = nr.Expr
|
|
rr.Labels = nr.Labels
|
|
return nil
|
|
}
|
|
|
|
// RuleAPI returns Rule representation in form
|
|
// of APIRecordingRule
|
|
func (rr *RecordingRule) RuleAPI() APIRecordingRule {
|
|
var lastErr string
|
|
if rr.lastExecError != nil {
|
|
lastErr = rr.lastExecError.Error()
|
|
}
|
|
return APIRecordingRule{
|
|
// encode as strings to avoid rounding
|
|
ID: fmt.Sprintf("%d", rr.ID()),
|
|
GroupID: fmt.Sprintf("%d", rr.GroupID),
|
|
Name: rr.Name,
|
|
Expression: rr.Expr,
|
|
LastError: lastErr,
|
|
LastExec: rr.lastExecTime,
|
|
Labels: rr.Labels,
|
|
}
|
|
}
|