mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
15609ee447
* changes vmalert Querier with per rule querier it allows to changes some parametrs based on rule setting for instance - alert type, tenant for cluster version or event endpoint url.
176 lines
4.3 KiB
Go
176 lines
4.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// RecordingRule is a Rule that supposed
|
|
// to evaluate configured Expression and
|
|
// return TimeSeries as result.
|
|
type RecordingRule struct {
|
|
Type datasource.Type
|
|
RuleID uint64
|
|
Name string
|
|
Expr string
|
|
Labels map[string]string
|
|
GroupID uint64
|
|
|
|
q datasource.Querier
|
|
|
|
// guard status fields
|
|
mu sync.RWMutex
|
|
// stores last moment of time Exec was called
|
|
lastExecTime time.Time
|
|
// stores last error that happened in Exec func
|
|
// resets on every successful Exec
|
|
// may be used as Health state
|
|
lastExecError error
|
|
|
|
metrics *recordingRuleMetrics
|
|
}
|
|
|
|
type recordingRuleMetrics struct {
|
|
errors *gauge
|
|
}
|
|
|
|
// String implements Stringer interface
|
|
func (rr *RecordingRule) String() string {
|
|
return rr.Name
|
|
}
|
|
|
|
// ID returns unique Rule ID
|
|
// within the parent Group.
|
|
func (rr *RecordingRule) ID() uint64 {
|
|
return rr.RuleID
|
|
}
|
|
|
|
func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *RecordingRule {
|
|
rr := &RecordingRule{
|
|
Type: cfg.Type,
|
|
RuleID: cfg.ID,
|
|
Name: cfg.Record,
|
|
Expr: cfg.Expr,
|
|
Labels: cfg.Labels,
|
|
GroupID: group.ID(),
|
|
metrics: &recordingRuleMetrics{},
|
|
q: qb.BuildWithParams(datasource.QuerierParams{DataSourceType: &cfg.Type}),
|
|
}
|
|
|
|
labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID())
|
|
rr.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels),
|
|
func() float64 {
|
|
rr.mu.Lock()
|
|
defer rr.mu.Unlock()
|
|
if rr.lastExecError == nil {
|
|
return 0
|
|
}
|
|
return 1
|
|
})
|
|
return rr
|
|
}
|
|
|
|
// Close unregisters rule metrics
|
|
func (rr *RecordingRule) Close() {
|
|
metrics.UnregisterMetric(rr.metrics.errors.name)
|
|
}
|
|
|
|
// Exec executes RecordingRule expression via the given Querier.
|
|
func (rr *RecordingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.TimeSeries, error) {
|
|
if !series {
|
|
return nil, nil
|
|
}
|
|
|
|
qMetrics, err := rr.q.Query(ctx, rr.Expr)
|
|
rr.mu.Lock()
|
|
defer rr.mu.Unlock()
|
|
|
|
rr.lastExecTime = time.Now()
|
|
rr.lastExecError = err
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err)
|
|
}
|
|
|
|
duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics))
|
|
var tss []prompbmarshal.TimeSeries
|
|
for _, r := range qMetrics {
|
|
ts := rr.toTimeSeries(r, time.Unix(r.Timestamp, 0))
|
|
h := hashTimeSeries(ts)
|
|
if _, ok := duplicates[h]; ok {
|
|
rr.lastExecError = errDuplicate
|
|
return nil, errDuplicate
|
|
}
|
|
duplicates[h] = ts
|
|
tss = append(tss, ts)
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func hashTimeSeries(ts prompbmarshal.TimeSeries) uint64 {
|
|
hash := fnv.New64a()
|
|
labels := ts.Labels
|
|
sort.Slice(labels, func(i, j int) bool {
|
|
return labels[i].Name < labels[j].Name
|
|
})
|
|
for _, l := range labels {
|
|
hash.Write([]byte(l.Name))
|
|
hash.Write([]byte(l.Value))
|
|
hash.Write([]byte("\xff"))
|
|
}
|
|
return hash.Sum64()
|
|
}
|
|
|
|
func (rr *RecordingRule) toTimeSeries(m datasource.Metric, timestamp time.Time) prompbmarshal.TimeSeries {
|
|
labels := make(map[string]string)
|
|
for _, l := range m.Labels {
|
|
labels[l.Name] = l.Value
|
|
}
|
|
labels["__name__"] = rr.Name
|
|
// override existing labels with configured ones
|
|
for k, v := range rr.Labels {
|
|
labels[k] = v
|
|
}
|
|
return newTimeSeries(m.Value, labels, timestamp)
|
|
}
|
|
|
|
// UpdateWith copies all significant fields.
|
|
// alerts state isn't copied since
|
|
// it should be updated in next 2 Execs
|
|
func (rr *RecordingRule) UpdateWith(r Rule) error {
|
|
nr, ok := r.(*RecordingRule)
|
|
if !ok {
|
|
return fmt.Errorf("BUG: attempt to update recroding rule with wrong type %#v", r)
|
|
}
|
|
rr.Expr = nr.Expr
|
|
rr.Labels = nr.Labels
|
|
return nil
|
|
}
|
|
|
|
// RuleAPI returns Rule representation in form
|
|
// of APIRecordingRule
|
|
func (rr *RecordingRule) RuleAPI() APIRecordingRule {
|
|
var lastErr string
|
|
if rr.lastExecError != nil {
|
|
lastErr = rr.lastExecError.Error()
|
|
}
|
|
return APIRecordingRule{
|
|
// encode as strings to avoid rounding
|
|
ID: fmt.Sprintf("%d", rr.ID()),
|
|
GroupID: fmt.Sprintf("%d", rr.GroupID),
|
|
Name: rr.Name,
|
|
Type: rr.Type.String(),
|
|
Expression: rr.Expr,
|
|
LastError: lastErr,
|
|
LastExec: rr.lastExecTime,
|
|
Labels: rr.Labels,
|
|
}
|
|
}
|