Vmalert/rules eval (#400)

* Initial rules evaluation support.

Rules are now store alerts state in private field `alerts`. Every evaluation updates
the alerts and state. Every unique metric received from datastore represents a unique alert,
uniqueness is guaranteed by hashing ordered labelset.

* merge with master

* cleanup

* support endAt parameter as 3*evaluationInterval for active alerts

* make golint happy
This commit is contained in:
Roman Khavronenko 2020-04-06 12:44:03 +01:00 committed by GitHub
parent 407bdbf2b9
commit b099d84271
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 781 additions and 446 deletions

View file

@ -1,99 +0,0 @@
package common
import (
"reflect"
"sort"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
)
func TestAlertsFromMetrics(t *testing.T) {
now := time.Now()
metrics := []datasource.Metric{
{
Labels: []datasource.Label{
{Name: "__name__", Value: "foo"},
{Name: "label", Value: "value"},
},
Timestamp: 10,
Value: 20,
},
{
Labels: []datasource.Label{
{Name: "__name__", Value: "bar"},
{Name: "label", Value: "value"},
},
Timestamp: 10,
Value: 30,
},
}
rule := Rule{
Name: "alertname",
Expr: "up==0",
Labels: map[string]string{
"label2": "value",
},
Annotations: map[string]string{
"tpl": "{{$value}} {{ $labels.label}}",
},
}
alerts := AlertsFromMetrics(metrics, "group", rule, now, now)
if len(alerts) != 2 {
t.Fatalf("expecting 2 alerts got %d", len(alerts))
}
f := func(got, exp Alert) {
t.Helper()
if got.Group != exp.Group ||
got.Value != exp.Value ||
got.End != exp.End ||
got.Name != exp.Name ||
got.Start != exp.Start {
t.Errorf("alerts are not equal: \nwant %#v \ngot %#v", exp, got)
}
sort.Slice(got.Labels, func(i, j int) bool {
return got.Labels[i].Name < got.Labels[j].Name
})
sort.Slice(exp.Labels, func(i, j int) bool {
return got.Labels[i].Name < got.Labels[j].Name
})
if !reflect.DeepEqual(got.Labels, exp.Labels) {
t.Errorf("alerts labels are not equal: want %+v got %+v", exp.Labels, got.Labels)
}
if !reflect.DeepEqual(got.Annotations, exp.Annotations) {
t.Errorf("alerts annotations are not equal: want %+v got %+v", exp.Annotations, got.Annotations)
}
}
f(alerts[0], Alert{
Group: "group",
Name: "alertname",
Labels: []datasource.Label{
{Name: "__name__", Value: "foo"},
{Name: "label", Value: "value"},
{Name: "label2", Value: "value"},
},
Annotations: map[string]string{
"tpl": "20 value",
},
Start: now,
End: now,
Value: 20,
})
f(alerts[1], Alert{
Group: "group",
Name: "alertname",
Labels: []datasource.Label{
{Name: "__name__", Value: "bar"},
{Name: "label", Value: "value"},
{Name: "label2", Value: "value"},
},
Annotations: map[string]string{
"tpl": "30 value",
},
Start: now,
End: now,
Value: 30,
})
}

View file

@ -1,38 +0,0 @@
package common
import (
"errors"
"fmt"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
)
// Rule is basic alert entity
type Rule struct {
Name string `yaml:"alert"`
Expr string `yaml:"expr"`
For time.Duration `yaml:"for"`
Labels map[string]string `yaml:"labels"`
Annotations map[string]string `yaml:"annotations"`
}
// Validate validates rule
func (r Rule) Validate() error {
if r.Name == "" {
return errors.New("rule name can not be empty")
}
if r.Expr == "" {
return fmt.Errorf("rule %s expression can not be empty", r.Name)
}
if _, err := metricsql.Parse(r.Expr); err != nil {
return fmt.Errorf("rule %s invalid expression: %w", r.Name, err)
}
return nil
}
// Group grouping array of alert
type Group struct {
Name string
Rules []Rule
}

View file

@ -1,18 +0,0 @@
package common
import "testing"
func TestRule_Validate(t *testing.T) {
if err := (Rule{}).Validate(); err == nil {
t.Errorf("exptected empty name error")
}
if err := (Rule{Name: "alert"}).Validate(); err == nil {
t.Errorf("exptected empty expr error")
}
if err := (Rule{Name: "alert", Expr: "test{"}).Validate(); err == nil {
t.Errorf("exptected invalid expr error")
}
if err := (Rule{Name: "alert", Expr: "test>0"}).Validate(); err != nil {
t.Errorf("exptected valid rule got %s", err)
}
}

View file

@ -1,17 +1,17 @@
package config
package main
import (
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
"path/filepath"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/common"
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
// Parse parses rule configs from given file patterns
func Parse(pathPatterns []string, validateAnnotations bool) ([]common.Group, error) {
func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) {
var fp []string
for _, pattern := range pathPatterns {
matches, err := filepath.Glob(pattern)
@ -20,7 +20,7 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]common.Group, err
}
fp = append(fp, matches...)
}
var groups []common.Group
var groups []Group
for _, file := range fp {
groupsNames := map[string]struct{}{}
gr, err := parseFile(file)
@ -36,11 +36,14 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]common.Group, err
if err = rule.Validate(); err != nil {
return nil, fmt.Errorf("invalid rule filepath:%s, group %s:%w", file, group.Name, err)
}
// TODO: this init looks weird here
rule.alerts = make(map[uint64]*notifier.Alert)
if validateAnnotations {
if err = common.ValidateAnnotations(rule.Annotations); err != nil {
if err = notifier.ValidateAnnotations(rule.Annotations); err != nil {
return nil, fmt.Errorf("invalida annotations filepath:%s, group %s:%w", file, group.Name, err)
}
}
rule.group = &group
}
}
groups = append(groups, gr...)
@ -51,13 +54,13 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]common.Group, err
return groups, nil
}
func parseFile(path string) ([]common.Group, error) {
func parseFile(path string) ([]Group, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("error reading alert rule file: %w", err)
}
g := struct {
Groups []common.Group `yaml:"groups"`
Groups []Group `yaml:"groups"`
}{}
err = yaml.Unmarshal(data, &g)
return g.Groups, err

View file

@ -1,16 +1,16 @@
package config
package main
import (
"net/url"
"os"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
func TestMain(m *testing.M) {
u, _ := url.Parse("https://victoriametrics.com/path")
common.InitTemplateFunc(u)
notifier.InitTemplateFunc(u)
os.Exit(m.Run())
}

View file

@ -1,5 +1,14 @@
package datasource
import "context"
// Querier interface wraps Query method which
// executes given query and returns list of Metrics
// as result
type Querier interface {
Query(ctx context.Context, query string) ([]Metric, error)
}
// Metric is the basic entity which should be return by datasource
// It represents single data point with full list of labels
type Metric struct {

View file

@ -8,12 +8,11 @@ import (
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/provider"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -38,6 +37,9 @@ Examples:
externalURL = flag.String("external.url", "", "Reachable external url. URL is used to generate sharable alert url and in annotation templates")
)
// TODO: hot configuration reload
// TODO: alerts state persistence
// TODO: metrics
func main() {
envflag.Parse()
buildinfo.Init()
@ -48,76 +50,65 @@ func main() {
if err != nil {
logger.Fatalf("can not get external url:%s ", err)
}
common.InitTemplateFunc(eu)
notifier.InitTemplateFunc(eu)
logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";"))
alertGroups, err := config.Parse(*rulePath, *validateAlertAnnotations)
groups, err := Parse(*rulePath, *validateAlertAnnotations)
if err != nil {
logger.Fatalf("Cannot parse configuration file: %s", err)
}
w := &watchdog{
storage: datasource.NewVMStorage(*datasourceURL, *basicAuthUsername, *basicAuthPassword, &http.Client{}),
alertProvider: provider.NewAlertManager(*providerURL, func(group, name string) string {
alertProvider: notifier.NewAlertManager(*providerURL, func(group, name string) string {
return strings.Replace(fmt.Sprintf("%s/%s/%s/status", eu, group, name), "//", "/", -1)
}, &http.Client{}),
}
for id := range alertGroups {
go func(group common.Group) {
wg := sync.WaitGroup{}
for i := range groups {
wg.Add(1)
go func(group Group) {
w.run(ctx, group, *evaluationInterval)
}(alertGroups[id])
wg.Done()
}(groups[i])
}
go func() {
httpserver.Serve(*httpListenAddr, func(w http.ResponseWriter, r *http.Request) bool {
panic("not implemented")
})
}()
go httpserver.Serve(*httpListenAddr, func(w http.ResponseWriter, r *http.Request) bool {
panic("not implemented")
})
sig := procutil.WaitForSigterm()
logger.Infof("service received signal %s", sig)
if err := httpserver.Stop(*httpListenAddr); err != nil {
logger.Fatalf("cannot stop the webservice: %s", err)
}
cancel()
w.stop()
wg.Wait()
}
type watchdog struct {
storage *datasource.VMStorage
alertProvider provider.AlertProvider
alertProvider notifier.Notifier
}
func (w *watchdog) run(ctx context.Context, a common.Group, evaluationInterval time.Duration) {
logger.Infof("watchdog for %s has been run", a.Name)
func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration) {
logger.Infof("watchdog for %s has been run", group.Name)
t := time.NewTicker(evaluationInterval)
var metrics []datasource.Metric
var err error
var alerts []common.Alert
defer t.Stop()
for {
select {
case <-t.C:
start := time.Now()
for _, r := range a.Rules {
if metrics, err = w.storage.Query(ctx, r.Expr); err != nil {
logger.Errorf("error reading metrics %s", err)
for _, rule := range group.Rules {
if err := rule.Exec(ctx, w.storage); err != nil {
logger.Errorf("failed to execute rule %q.%q: %s", group.Name, rule.Name, err)
continue
}
// todo check for and calculate alert states
if len(metrics) < 1 {
continue
if err := rule.Send(ctx, w.alertProvider); err != nil {
logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err)
}
// todo define alert end time
alerts = common.AlertsFromMetrics(metrics, a.Name, r, start, time.Time{})
// todo save to storage
if err := w.alertProvider.Send(alerts); err != nil {
logger.Errorf("error sending alerts %s", err)
continue
}
// todo is alert still active/pending?
}
case <-ctx.Done():
logger.Infof("%s receive stop signal", a.Name)
logger.Infof("%s received stop signal", group.Name)
return
}
}
@ -142,10 +133,6 @@ func getExternalURL(externalURL, httpListenAddr string, isSecure bool) (*url.URL
return url.Parse(fmt.Sprintf("%s%s%s", schema, hname, port))
}
func (w *watchdog) stop() {
panic("not implemented")
}
func checkFlags() {
if *providerURL == "" {
flag.PrintDefaults()

View file

@ -1,4 +1,4 @@
package common
package notifier
import (
"bytes"
@ -7,71 +7,57 @@ import (
"strings"
"text/template"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// Alert the triggered alert
// TODO: Looks like alert name isn't unique
type Alert struct {
Group string
Name string
Labels []datasource.Label
Labels map[string]string
Annotations map[string]string
State AlertState
Start time.Time
End time.Time
Value float64
}
// AlertState type indicates the Alert state
type AlertState int
const (
// StateInactive is the state of an alert that is neither firing nor pending.
StateInactive AlertState = iota
// StatePending is the state of an alert that has been active for less than
// the configured threshold duration.
StatePending
// StateFiring is the state of an alert that has been active for longer than
// the configured threshold duration.
StateFiring
)
type alertTplData struct {
Labels map[string]string
ExternalLabels map[string]string
Value float64
Labels map[string]string
Value float64
}
const tplHeader = `{{ $value := .Value }}{{ $labels := .Labels }}{{ $externalLabels := .ExternalLabels }}`
const tplHeader = `{{ $value := .Value }}{{ $labels := .Labels }}`
// AlertsFromMetrics converts metrics to alerts by alert Rule
func AlertsFromMetrics(metrics []datasource.Metric, group string, rule Rule, start, end time.Time) []Alert {
alerts := make([]Alert, 0, len(metrics))
var err error
for i, m := range metrics {
a := Alert{
Group: group,
Name: rule.Name,
Start: start,
End: end,
Value: m.Value,
}
tplData := alertTplData{Value: m.Value, ExternalLabels: make(map[string]string)}
tplData.Labels, a.Labels = mergeLabels(metrics[i].Labels, rule.Labels)
a.Annotations, err = templateAnnotations(rule.Annotations, tplHeader, tplData)
if err != nil {
logger.Errorf("%s", err)
}
alerts = append(alerts, a)
}
return alerts
// ExecTemplate executes the Alert template for give
// map of annotations.
func (a *Alert) ExecTemplate(annotations map[string]string) (map[string]string, error) {
tplData := alertTplData{Value: a.Value, Labels: a.Labels}
return templateAnnotations(annotations, tplHeader, tplData)
}
func mergeLabels(ml []datasource.Label, rl map[string]string) (map[string]string, []datasource.Label) {
set := make(map[string]string, len(ml)+len(rl))
sl := append([]datasource.Label(nil), ml...)
for _, i := range ml {
set[i.Name] = i.Value
}
for name, value := range rl {
if _, ok := set[name]; ok {
continue
}
set[name] = value
sl = append(sl, datasource.Label{
Name: name,
Value: value,
})
}
return set, sl
// ValidateAnnotations validate annotations for possible template error, uses empty data for template population
func ValidateAnnotations(annotations map[string]string) error {
_, err := templateAnnotations(annotations, tplHeader, alertTplData{
Labels: map[string]string{},
Value: 0,
})
return err
}
func templateAnnotations(annotations map[string]string, header string, data alertTplData) (map[string]string, error) {
@ -95,16 +81,6 @@ func templateAnnotations(annotations map[string]string, header string, data aler
return r, eg.err()
}
// ValidateAnnotations validate annotations for possible template error, uses empty data for template population
func ValidateAnnotations(annotations map[string]string) error {
_, err := templateAnnotations(annotations, tplHeader, alertTplData{
Labels: map[string]string{},
ExternalLabels: map[string]string{},
Value: 0,
})
return err
}
func templateAnnotation(dst io.Writer, text string, data alertTplData) error {
tpl, err := template.New("").Funcs(tmplFunc).Option("missingkey=zero").Parse(text)
if err != nil {

View file

@ -0,0 +1,65 @@
package notifier
import (
"fmt"
"testing"
)
func TestAlert_ExecTemplate(t *testing.T) {
testCases := []struct {
alert *Alert
annotations map[string]string
expTpl map[string]string
}{
{
alert: &Alert{},
annotations: map[string]string{},
expTpl: map[string]string{},
},
{
alert: &Alert{
Value: 1e4,
Labels: map[string]string{
"instance": "localhost",
},
},
annotations: map[string]string{},
expTpl: map[string]string{},
},
{
alert: &Alert{
Value: 1e4,
Labels: map[string]string{
"job": "staging",
"instance": "localhost",
},
},
annotations: map[string]string{
"summary": "Too high connection number for {{$labels.instance}} for job {{$labels.job}}",
"description": "It is {{ $value }} connections for {{$labels.instance}}",
},
expTpl: map[string]string{
"summary": "Too high connection number for localhost for job staging",
"description": "It is 10000 connections for localhost",
},
},
}
for i, tc := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
tpl, err := tc.alert.ExecTemplate(tc.annotations)
if err != nil {
t.Fatal(err)
}
if len(tpl) != len(tc.expTpl) {
t.Fatalf("expected %d elements; got %d", len(tc.expTpl), len(tpl))
}
for k := range tc.expTpl {
got, exp := tpl[k], tc.expTpl[k]
if got != exp {
t.Fatalf("expected %q=%q; got %q=%q", k, exp, k, got)
}
}
})
}
}

View file

@ -1,64 +1,51 @@
package provider
package notifier
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
const alertsPath = "/api/v2/alerts"
// AlertProvider is common interface for alert manager provider
type AlertProvider interface {
Send(alerts []common.Alert) error
}
var pool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
}}
// AlertManager represents integration provider with Prometheus alert manager
// https://github.com/prometheus/alertmanager
type AlertManager struct {
alertURL string
argFunc AlertURLGenerator
client *http.Client
}
// AlertURLGenerator returns URL to single alert by given name
type AlertURLGenerator func(group, name string) string
// NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager {
return &AlertManager{
alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertsPath,
argFunc: fn,
client: c,
}
}
// Send an alert or resolve message
func (am *AlertManager) Send(alerts []common.Alert) error {
b := pool.Get().(*bytes.Buffer)
b.Reset()
defer pool.Put(b)
func (am *AlertManager) Send(alerts []Alert) error {
b := &bytes.Buffer{}
writeamRequest(b, alerts, am.argFunc)
resp, err := am.client.Post(am.alertURL, "application/json", b)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
b.Reset()
if _, err := io.Copy(b, resp.Body); err != nil {
logger.Errorf("unable to copy error response body to buffer %s", err)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response from %q: %s", am.alertURL, err)
}
return fmt.Errorf("invalid response from alertmanager %s", b)
return fmt.Errorf("invalid SC %d from %q; response body: %s", resp.StatusCode, am.alertURL, string(body))
}
return nil
}
// AlertURLGenerator returns URL to single alert by given name
type AlertURLGenerator func(group, name string) string
const alertManagerPath = "/api/v2/alerts"
// NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager {
return &AlertManager{
alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath,
argFunc: fn,
client: c,
}
}

View file

@ -1,10 +1,9 @@
{% import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/common"
) %}
{% stripspace %}
{% func amRequest(alerts []common.Alert, generatorURL func(string, string) string) %}
{% func amRequest(alerts []Alert, generatorURL func(string, string) string) %}
[
{% for i, alert := range alerts %}
{
@ -15,8 +14,8 @@
{% endif %}
"labels": {
"alertname":{%q= alert.Name %}
{% for _,v := range alert.Labels %}
,{%q= v.Name %}:{%q= v.Value %}
{% for k,v := range alert.Labels %}
,{%q= k %}:{%q= v %}
{% endfor %}
},
"annotations": {

View file

@ -0,0 +1,130 @@
// Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line notifier/alertmanager_request.qtpl:1
package notifier
//line notifier/alertmanager_request.qtpl:1
import (
"time"
)
//line notifier/alertmanager_request.qtpl:6
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line notifier/alertmanager_request.qtpl:6
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line notifier/alertmanager_request.qtpl:6
func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line notifier/alertmanager_request.qtpl:6
qw422016.N().S(`[`)
//line notifier/alertmanager_request.qtpl:8
for i, alert := range alerts {
//line notifier/alertmanager_request.qtpl:8
qw422016.N().S(`{"startsAt":`)
//line notifier/alertmanager_request.qtpl:10
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
//line notifier/alertmanager_request.qtpl:10
qw422016.N().S(`,"generatorURL":`)
//line notifier/alertmanager_request.qtpl:11
qw422016.N().Q(generatorURL(alert.Group, alert.Name))
//line notifier/alertmanager_request.qtpl:11
qw422016.N().S(`,`)
//line notifier/alertmanager_request.qtpl:12
if !alert.End.IsZero() {
//line notifier/alertmanager_request.qtpl:12
qw422016.N().S(`"endsAt":`)
//line notifier/alertmanager_request.qtpl:13
qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
//line notifier/alertmanager_request.qtpl:13
qw422016.N().S(`,`)
//line notifier/alertmanager_request.qtpl:14
}
//line notifier/alertmanager_request.qtpl:14
qw422016.N().S(`"labels": {"alertname":`)
//line notifier/alertmanager_request.qtpl:16
qw422016.N().Q(alert.Name)
//line notifier/alertmanager_request.qtpl:17
for k, v := range alert.Labels {
//line notifier/alertmanager_request.qtpl:17
qw422016.N().S(`,`)
//line notifier/alertmanager_request.qtpl:18
qw422016.N().Q(k)
//line notifier/alertmanager_request.qtpl:18
qw422016.N().S(`:`)
//line notifier/alertmanager_request.qtpl:18
qw422016.N().Q(v)
//line notifier/alertmanager_request.qtpl:19
}
//line notifier/alertmanager_request.qtpl:19
qw422016.N().S(`},"annotations": {`)
//line notifier/alertmanager_request.qtpl:22
c := len(alert.Annotations)
//line notifier/alertmanager_request.qtpl:23
for k, v := range alert.Annotations {
//line notifier/alertmanager_request.qtpl:24
c = c - 1
//line notifier/alertmanager_request.qtpl:25
qw422016.N().Q(k)
//line notifier/alertmanager_request.qtpl:25
qw422016.N().S(`:`)
//line notifier/alertmanager_request.qtpl:25
qw422016.N().Q(v)
//line notifier/alertmanager_request.qtpl:25
if c > 0 {
//line notifier/alertmanager_request.qtpl:25
qw422016.N().S(`,`)
//line notifier/alertmanager_request.qtpl:25
}
//line notifier/alertmanager_request.qtpl:26
}
//line notifier/alertmanager_request.qtpl:26
qw422016.N().S(`}}`)
//line notifier/alertmanager_request.qtpl:29
if i != len(alerts)-1 {
//line notifier/alertmanager_request.qtpl:29
qw422016.N().S(`,`)
//line notifier/alertmanager_request.qtpl:29
}
//line notifier/alertmanager_request.qtpl:30
}
//line notifier/alertmanager_request.qtpl:30
qw422016.N().S(`]`)
//line notifier/alertmanager_request.qtpl:32
}
//line notifier/alertmanager_request.qtpl:32
func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line notifier/alertmanager_request.qtpl:32
qw422016 := qt422016.AcquireWriter(qq422016)
//line notifier/alertmanager_request.qtpl:32
streamamRequest(qw422016, alerts, generatorURL)
//line notifier/alertmanager_request.qtpl:32
qt422016.ReleaseWriter(qw422016)
//line notifier/alertmanager_request.qtpl:32
}
//line notifier/alertmanager_request.qtpl:32
func amRequest(alerts []Alert, generatorURL func(string, string) string) string {
//line notifier/alertmanager_request.qtpl:32
qb422016 := qt422016.AcquireByteBuffer()
//line notifier/alertmanager_request.qtpl:32
writeamRequest(qb422016, alerts, generatorURL)
//line notifier/alertmanager_request.qtpl:32
qs422016 := string(qb422016.B)
//line notifier/alertmanager_request.qtpl:32
qt422016.ReleaseByteBuffer(qb422016)
//line notifier/alertmanager_request.qtpl:32
return qs422016
//line notifier/alertmanager_request.qtpl:32
}

View file

@ -1,4 +1,4 @@
package provider
package notifier
import (
"encoding/json"
@ -6,8 +6,6 @@ import (
"net/http/httptest"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/common"
)
func TestAlertManager_Send(t *testing.T) {
@ -16,7 +14,7 @@ func TestAlertManager_Send(t *testing.T) {
t.Errorf("should not be called")
})
c := -1
mux.HandleFunc(alertsPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(alertManagerPath, func(w http.ResponseWriter, r *http.Request) {
c++
if r.Method != http.MethodPost {
t.Errorf("expected POST method got %s", r.Method)
@ -61,13 +59,13 @@ func TestAlertManager_Send(t *testing.T) {
am := NewAlertManager(srv.URL, func(group, name string) string {
return group + name
}, srv.Client())
if err := am.Send([]common.Alert{{}, {}}); err == nil {
if err := am.Send([]Alert{{}, {}}); err == nil {
t.Error("expected connection error got nil")
}
if err := am.Send([]common.Alert{}); err == nil {
if err := am.Send([]Alert{}); err == nil {
t.Error("expected wrong http code error got nil")
}
if err := am.Send([]common.Alert{{
if err := am.Send([]Alert{{
Group: "group0",
Name: "alert0",
Start: time.Now().UTC(),

View file

@ -0,0 +1,6 @@
package notifier
// Notifier is common interface for alert manager provider
type Notifier interface {
Send(alerts []Alert) error
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package common
package notifier
import (
"fmt"

View file

@ -1,131 +0,0 @@
// Code generated by qtc from "alert_manager_request.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/provider/alert_manager_request.qtpl:1
package provider
//line app/vmalert/provider/alert_manager_request.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/common"
"time"
)
//line app/vmalert/provider/alert_manager_request.qtpl:7
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/provider/alert_manager_request.qtpl:7
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/provider/alert_manager_request.qtpl:7
func streamamRequest(qw422016 *qt422016.Writer, alerts []common.Alert, generatorURL func(string, string) string) {
//line app/vmalert/provider/alert_manager_request.qtpl:7
qw422016.N().S(`[`)
//line app/vmalert/provider/alert_manager_request.qtpl:9
for i, alert := range alerts {
//line app/vmalert/provider/alert_manager_request.qtpl:9
qw422016.N().S(`{"startsAt":`)
//line app/vmalert/provider/alert_manager_request.qtpl:11
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
//line app/vmalert/provider/alert_manager_request.qtpl:11
qw422016.N().S(`,"generatorURL":`)
//line app/vmalert/provider/alert_manager_request.qtpl:12
qw422016.N().Q(generatorURL(alert.Group, alert.Name))
//line app/vmalert/provider/alert_manager_request.qtpl:12
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:13
if !alert.End.IsZero() {
//line app/vmalert/provider/alert_manager_request.qtpl:13
qw422016.N().S(`"endsAt":`)
//line app/vmalert/provider/alert_manager_request.qtpl:14
qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
//line app/vmalert/provider/alert_manager_request.qtpl:14
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:15
}
//line app/vmalert/provider/alert_manager_request.qtpl:15
qw422016.N().S(`"labels": {"alertname":`)
//line app/vmalert/provider/alert_manager_request.qtpl:17
qw422016.N().Q(alert.Name)
//line app/vmalert/provider/alert_manager_request.qtpl:18
for _, v := range alert.Labels {
//line app/vmalert/provider/alert_manager_request.qtpl:18
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:19
qw422016.N().Q(v.Name)
//line app/vmalert/provider/alert_manager_request.qtpl:19
qw422016.N().S(`:`)
//line app/vmalert/provider/alert_manager_request.qtpl:19
qw422016.N().Q(v.Value)
//line app/vmalert/provider/alert_manager_request.qtpl:20
}
//line app/vmalert/provider/alert_manager_request.qtpl:20
qw422016.N().S(`},"annotations": {`)
//line app/vmalert/provider/alert_manager_request.qtpl:23
c := len(alert.Annotations)
//line app/vmalert/provider/alert_manager_request.qtpl:24
for k, v := range alert.Annotations {
//line app/vmalert/provider/alert_manager_request.qtpl:25
c = c - 1
//line app/vmalert/provider/alert_manager_request.qtpl:26
qw422016.N().Q(k)
//line app/vmalert/provider/alert_manager_request.qtpl:26
qw422016.N().S(`:`)
//line app/vmalert/provider/alert_manager_request.qtpl:26
qw422016.N().Q(v)
//line app/vmalert/provider/alert_manager_request.qtpl:26
if c > 0 {
//line app/vmalert/provider/alert_manager_request.qtpl:26
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:26
}
//line app/vmalert/provider/alert_manager_request.qtpl:27
}
//line app/vmalert/provider/alert_manager_request.qtpl:27
qw422016.N().S(`}}`)
//line app/vmalert/provider/alert_manager_request.qtpl:30
if i != len(alerts)-1 {
//line app/vmalert/provider/alert_manager_request.qtpl:30
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:30
}
//line app/vmalert/provider/alert_manager_request.qtpl:31
}
//line app/vmalert/provider/alert_manager_request.qtpl:31
qw422016.N().S(`]`)
//line app/vmalert/provider/alert_manager_request.qtpl:33
}
//line app/vmalert/provider/alert_manager_request.qtpl:33
func writeamRequest(qq422016 qtio422016.Writer, alerts []common.Alert, generatorURL func(string, string) string) {
//line app/vmalert/provider/alert_manager_request.qtpl:33
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/provider/alert_manager_request.qtpl:33
streamamRequest(qw422016, alerts, generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:33
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/provider/alert_manager_request.qtpl:33
}
//line app/vmalert/provider/alert_manager_request.qtpl:33
func amRequest(alerts []common.Alert, generatorURL func(string, string) string) string {
//line app/vmalert/provider/alert_manager_request.qtpl:33
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/provider/alert_manager_request.qtpl:33
writeamRequest(qb422016, alerts, generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:33
qs422016 := string(qb422016.B)
//line app/vmalert/provider/alert_manager_request.qtpl:33
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/provider/alert_manager_request.qtpl:33
return qs422016
//line app/vmalert/provider/alert_manager_request.qtpl:33
}

180
app/vmalert/rule.go Normal file
View file

@ -0,0 +1,180 @@
package main
import (
"context"
"errors"
"fmt"
"hash/fnv"
"sort"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
)
// Group grouping array of alert
type Group struct {
Name string
Rules []*Rule
}
// Rule is basic alert entity
type Rule struct {
Name string `yaml:"alert"`
Expr string `yaml:"expr"`
For time.Duration `yaml:"for"`
Labels map[string]string `yaml:"labels"`
Annotations map[string]string `yaml:"annotations"`
group *Group
// guard status fields
mu sync.Mutex
// stores list of active alerts
alerts map[uint64]*notifier.Alert
// stores last moment of time Exec was called
lastExecTime time.Time
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health state
lastExecError error
}
// Validate validates rule
func (r *Rule) Validate() error {
if r.Name == "" {
return errors.New("rule name can not be empty")
}
if r.Expr == "" {
return fmt.Errorf("expression for rule %q can't be empty", r.Name)
}
if _, err := metricsql.Parse(r.Expr); err != nil {
return fmt.Errorf("invalid expression for rule %q: %w", r.Name, err)
}
return nil
}
// Exec executes Rule expression via the given Querier.
// Based on the Querier results Rule maintains notifier.Alerts
func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
metrics, err := q.Query(ctx, r.Expr)
r.mu.Lock()
defer r.mu.Unlock()
r.lastExecError = err
r.lastExecTime = time.Now()
if err != nil {
return fmt.Errorf("failed to execute query %q: %s", r.Expr, err)
}
for h, a := range r.alerts {
// cleanup inactive alerts from previous Eval
if a.State == notifier.StateInactive {
delete(r.alerts, h)
}
}
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range metrics {
h := hash(m)
updated[h] = struct{}{}
if _, ok := r.alerts[h]; ok {
continue
}
a, err := r.newAlert(m)
if err != nil {
r.lastExecError = err
return fmt.Errorf("failed to create alert: %s", err)
}
a.State = notifier.StatePending
r.alerts[h] = a
}
for h, a := range r.alerts {
// if alert wasn't updated in this iteration
// means it is resolved already
if _, ok := updated[h]; !ok {
a.State = notifier.StateInactive
// set endTime to last execution time
// so it can be sent by notifier on next step
a.End = r.lastExecTime
continue
}
if a.State == notifier.StatePending && time.Since(a.Start) >= r.For {
a.State = notifier.StateFiring
}
if a.State == notifier.StateFiring {
a.End = r.lastExecTime.Add(3 * *evaluationInterval)
}
}
return nil
}
// Send sends the active alerts via given
// notifier.Notifier.
// See for reference https://prometheus.io/docs/alerting/clients/
// TODO: add tests for endAt value
func (r *Rule) Send(ctx context.Context, ap notifier.Notifier) error {
// copy alerts to new list to avoid locks
var alertsCopy []notifier.Alert
r.mu.Lock()
for _, a := range r.alerts {
if a.State == notifier.StatePending {
continue
}
// it is safe to dereference instead of deep-copy
// because only simple types may be changed during rule.Exec
alertsCopy = append(alertsCopy, *a)
}
r.mu.Unlock()
if len(alertsCopy) < 1 {
logger.Infof("no alerts to send")
return nil
}
logger.Infof("sending %d alerts", len(alertsCopy))
return ap.Send(alertsCopy)
}
// TODO: consider hashing algorithm in VM
func hash(m datasource.Metric) uint64 {
hash := fnv.New64a()
labels := m.Labels
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
for _, l := range labels {
hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value))
hash.Write([]byte("\xff"))
}
return hash.Sum64()
}
func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) {
a := &notifier.Alert{
Group: r.group.Name,
Name: r.Name,
Labels: map[string]string{},
Value: m.Value,
Start: time.Now(),
// TODO: support End time
}
for _, l := range m.Labels {
a.Labels[l.Name] = l.Value
}
// metric labels may be overridden by
// rule labels
for k, v := range r.Labels {
a.Labels[k] = v
}
var err error
a.Annotations, err = a.ExecTemplate(r.Annotations)
return a, err
}

282
app/vmalert/rule_test.go Normal file
View file

@ -0,0 +1,282 @@
package main
import (
"context"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
func TestRule_Validate(t *testing.T) {
if err := (&Rule{}).Validate(); err == nil {
t.Errorf("exptected empty name error")
}
if err := (&Rule{Name: "alert"}).Validate(); err == nil {
t.Errorf("exptected empty expr error")
}
if err := (&Rule{Name: "alert", Expr: "test{"}).Validate(); err == nil {
t.Errorf("exptected invalid expr error")
}
if err := (&Rule{Name: "alert", Expr: "test>0"}).Validate(); err != nil {
t.Errorf("exptected valid rule got %s", err)
}
}
func newTestRule(name string, waitFor time.Duration) *Rule {
return &Rule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor}
}
func TestRule_Exec(t *testing.T) {
testCases := []struct {
rule *Rule
steps [][]datasource.Metric
expAlerts map[uint64]*notifier.Alert
}{
{
newTestRule("empty", 0),
[][]datasource.Metric{},
map[uint64]*notifier.Alert{},
},
{
newTestRule("single-firing", 0),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
},
},
{
newTestRule("single-firing=>inactive", 0),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
},
},
{
newTestRule("single-firing=>inactive=>firing", 0),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{},
{metricWithLabels(t, "__name__", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
},
},
{
newTestRule("single-firing=>inactive=>firing=>inactive", 0),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{},
{metricWithLabels(t, "__name__", "foo")},
{},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
},
},
{
newTestRule("single-firing=>inactive=>firing=>inactive=>empty", 0),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{},
{metricWithLabels(t, "__name__", "foo")},
{},
{},
},
map[uint64]*notifier.Alert{},
},
{
newTestRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{},
{metricWithLabels(t, "__name__", "foo")},
{},
{},
{metricWithLabels(t, "__name__", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
},
},
{
newTestRule("multiple-firing", 0),
[][]datasource.Metric{
{
metricWithLabels(t, "__name__", "foo"),
metricWithLabels(t, "__name__", "foo1"),
metricWithLabels(t, "__name__", "foo2"),
},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateFiring},
hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring},
},
},
{
newTestRule("multiple-steps-firing", 0),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{metricWithLabels(t, "__name__", "foo1")},
{metricWithLabels(t, "__name__", "foo2")},
},
// 1: fire first alert
// 2: fire second alert, set first inactive
// 3: fire third alert, set second inactive, delete first one
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateInactive},
hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring},
},
},
{
newTestRule("duplicate", 0),
[][]datasource.Metric{
{
// metrics with the same labelset should result in one alert
metricWithLabels(t, "__name__", "foo", "type", "bar"),
metricWithLabels(t, "type", "bar", "__name__", "foo"),
},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo", "type", "bar")): {State: notifier.StateFiring},
},
},
{
newTestRule("for-pending", time.Minute),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending},
},
},
{
newTestRule("for-fired", time.Millisecond),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{metricWithLabels(t, "__name__", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
},
},
{
newTestRule("for-pending=>inactive", time.Millisecond),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{metricWithLabels(t, "__name__", "foo")},
// empty step to reset pending alerts
{},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
},
},
{
newTestRule("for-pending=>firing=>inactive", time.Millisecond),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{metricWithLabels(t, "__name__", "foo")},
// empty step to reset pending alerts
{},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
},
},
{
newTestRule("for-pending=>firing=>inactive=>pending", time.Millisecond),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{metricWithLabels(t, "__name__", "foo")},
// empty step to reset pending alerts
{},
{metricWithLabels(t, "__name__", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending},
},
},
{
newTestRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond),
[][]datasource.Metric{
{metricWithLabels(t, "__name__", "foo")},
{metricWithLabels(t, "__name__", "foo")},
// empty step to reset pending alerts
{},
{metricWithLabels(t, "__name__", "foo")},
{metricWithLabels(t, "__name__", "foo")},
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
},
},
}
fakeGroup := &Group{Name: "TestRule_Exec"}
for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{}
tc.rule.group = fakeGroup
for _, step := range tc.steps {
fq.reset()
fq.add(t, step...)
if err := tc.rule.Exec(context.TODO(), fq); err != nil {
t.Fatalf("unexpected err: %s", err)
}
// artificial delay between applying steps
time.Sleep(time.Millisecond)
}
if len(tc.rule.alerts) != len(tc.expAlerts) {
t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts))
}
for key, exp := range tc.expAlerts {
got, ok := tc.rule.alerts[key]
if !ok {
t.Fatalf("expected to have key %d", key)
}
if got.State != exp.State {
t.Fatalf("expected state %d; got %d", exp.State, got.State)
}
}
})
}
}
func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
t.Helper()
if len(labels) == 0 || len(labels)%2 != 0 {
t.Fatalf("expected to get even number of labels")
}
m := datasource.Metric{}
for i := 0; i < len(labels); i += 2 {
m.Labels = append(m.Labels, datasource.Label{
Name: labels[i],
Value: labels[i+1],
})
}
return m
}
type fakeQuerier struct {
metrics []datasource.Metric
}
func (fq *fakeQuerier) reset() {
fq.metrics = fq.metrics[:0]
}
func (fq *fakeQuerier) add(t *testing.T, metrics ...datasource.Metric) {
fq.metrics = append(fq.metrics, metrics...)
}
func (fq fakeQuerier) Query(ctx context.Context, query string) ([]datasource.Metric, error) {
return fq.metrics, nil
}

View file

@ -1 +0,0 @@
package storage