VictoriaMetrics/app/vmalert/recording.go
Roman Khavronenko 2914ce5ca5
vmalert: remove dependency on datasource pkg from config (#2905)
* vmalert: remove dependency on datasource pkg from config

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2022-07-22 10:44:55 +02:00

228 lines
6 KiB
Go

package main
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// RecordingRule is a Rule that supposed
// to evaluate configured Expression and
// return TimeSeries as result.
type RecordingRule struct {
Type config.Type
RuleID uint64
Name string
Expr string
Labels map[string]string
GroupID uint64
q datasource.Querier
// guard status fields
mu sync.RWMutex
// stores last moment of time Exec was called
lastExecTime time.Time
// stores the duration of the last Exec call
lastExecDuration time.Duration
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health state
lastExecError error
// stores the number of samples returned during
// the last evaluation
lastExecSamples int
metrics *recordingRuleMetrics
}
type recordingRuleMetrics struct {
errors *utils.Gauge
samples *utils.Gauge
}
// String implements Stringer interface
func (rr *RecordingRule) String() string {
return rr.Name
}
// ID returns unique Rule ID
// within the parent Group.
func (rr *RecordingRule) ID() uint64 {
return rr.RuleID
}
func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *RecordingRule {
rr := &RecordingRule{
Type: group.Type,
RuleID: cfg.ID,
Name: cfg.Record,
Expr: cfg.Expr,
Labels: cfg.Labels,
GroupID: group.ID(),
metrics: &recordingRuleMetrics{},
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: group.Type.String(),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,
}),
}
labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID())
rr.metrics.errors = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels),
func() float64 {
rr.mu.RLock()
defer rr.mu.RUnlock()
if rr.lastExecError == nil {
return 0
}
return 1
})
rr.metrics.samples = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_last_evaluation_samples{%s}`, labels),
func() float64 {
rr.mu.RLock()
defer rr.mu.RUnlock()
return float64(rr.lastExecSamples)
})
return rr
}
// Close unregisters rule metrics
func (rr *RecordingRule) Close() {
rr.metrics.errors.Unregister()
rr.metrics.samples.Unregister()
}
// ExecRange executes recording rule on the given time range similarly to Exec.
// It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling.
func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
series, err := rr.q.QueryRange(ctx, rr.Expr, start, end)
if err != nil {
return nil, err
}
duplicates := make(map[string]struct{}, len(series))
var tss []prompbmarshal.TimeSeries
for _, s := range series {
ts := rr.toTimeSeries(s)
key := stringifyLabels(ts)
if _, ok := duplicates[key]; ok {
return nil, fmt.Errorf("original metric %v; resulting labels %q: %w", s.Labels, key, errDuplicate)
}
duplicates[key] = struct{}{}
tss = append(tss, ts)
}
return tss, nil
}
// Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
qMetrics, err := rr.q.Query(ctx, rr.Expr, ts)
rr.mu.Lock()
defer rr.mu.Unlock()
rr.lastExecTime = ts
rr.lastExecDuration = time.Since(ts)
rr.lastExecError = err
rr.lastExecSamples = len(qMetrics)
if err != nil {
return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err)
}
numSeries := len(qMetrics)
if limit > 0 && numSeries > limit {
return nil, fmt.Errorf("exec exceeded limit of %d with %d series", limit, numSeries)
}
duplicates := make(map[string]struct{}, len(qMetrics))
var tss []prompbmarshal.TimeSeries
for _, r := range qMetrics {
ts := rr.toTimeSeries(r)
key := stringifyLabels(ts)
if _, ok := duplicates[key]; ok {
rr.lastExecError = errDuplicate
return nil, fmt.Errorf("original metric %v; resulting labels %q: %w", r, key, errDuplicate)
}
duplicates[key] = struct{}{}
tss = append(tss, ts)
}
return tss, nil
}
func stringifyLabels(ts prompbmarshal.TimeSeries) string {
labels := ts.Labels
if len(labels) > 1 {
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
}
b := strings.Builder{}
for i, l := range labels {
b.WriteString(l.Name)
b.WriteString("=")
b.WriteString(l.Value)
if i != len(labels)-1 {
b.WriteString(",")
}
}
return b.String()
}
func (rr *RecordingRule) toTimeSeries(m datasource.Metric) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for _, l := range m.Labels {
labels[l.Name] = l.Value
}
labels["__name__"] = rr.Name
// override existing labels with configured ones
for k, v := range rr.Labels {
labels[k] = v
}
return newTimeSeries(m.Values, m.Timestamps, labels)
}
// UpdateWith copies all significant fields.
func (rr *RecordingRule) UpdateWith(r Rule) error {
nr, ok := r.(*RecordingRule)
if !ok {
return fmt.Errorf("BUG: attempt to update recroding rule with wrong type %#v", r)
}
rr.Expr = nr.Expr
rr.Labels = nr.Labels
rr.q = nr.q
return nil
}
// ToAPI returns Rule's representation in form
// of APIRule
func (rr *RecordingRule) ToAPI() APIRule {
r := APIRule{
Type: "recording",
DatasourceType: rr.Type.String(),
Name: rr.Name,
Query: rr.Expr,
Labels: rr.Labels,
LastEvaluation: rr.lastExecTime,
EvaluationTime: rr.lastExecDuration.Seconds(),
Health: "ok",
LastSamples: rr.lastExecSamples,
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", rr.ID()),
GroupID: fmt.Sprintf("%d", rr.GroupID),
}
if rr.lastExecError != nil {
r.LastError = rr.lastExecError.Error()
r.Health = "err"
}
return r
}