vmalert: Add recording rules support. (#519)

* vmalert: Add recording rules support.

Recording rules support required additional service refactoring since
it wasn't planned to support them from the very beginning. The list
of changes is following:
* new entity RecordingRule was added for writing results of MetricsQL
expressions into remote storage;
* interface Rule now unites both recording and alerting rules;
* configuration parser was moved to separate package and now performs
more strict validation;
* new endpoint for listing all groups and rules in json format was added;
* evaluation interval may be set to every particular group;

* vmalert: uncomment tests

* vmalert: rm outdated TODO

* vmalert: fix typos in README
This commit is contained in:
Roman Khavronenko 2020-06-01 11:46:37 +01:00 committed by GitHub
parent 32652485e3
commit 270552fde4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 1728 additions and 861 deletions

View file

@ -55,9 +55,10 @@ test-vmalert:
go test -v -race -cover ./app/vmalert -loggerLevel=ERROR go test -v -race -cover ./app/vmalert -loggerLevel=ERROR
go test -v -race -cover ./app/vmalert/datasource go test -v -race -cover ./app/vmalert/datasource
go test -v -race -cover ./app/vmalert/notifier go test -v -race -cover ./app/vmalert/notifier
go test -v -race -cover ./app/vmalert/config
run-vmalert: vmalert run-vmalert: vmalert
./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \ ./bin/vmalert -rule=app/vmalert/config/testdata/rules2-good.rules \
-datasource.url=http://localhost:8428 \ -datasource.url=http://localhost:8428 \
-notifier.url=http://localhost:9093 \ -notifier.url=http://localhost:9093 \
-remoteWrite.url=http://localhost:8428 \ -remoteWrite.url=http://localhost:8428 \

View file

@ -1,20 +1,18 @@
## VM Alert ## vmalert
`vmalert` executes a list of given MetricsQL expressions (rules) and `vmalert` executes a list of given [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/)
sends alerts to [Alert Manager](https://github.com/prometheus/alertmanager). or [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/)
rules against configured address.
### Features: ### Features:
* Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) TSDB; * Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) TSDB;
* VictoriaMetrics [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL) * VictoriaMetrics [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL)
expressions validation; support and expressions validation;
* Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules) * Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules)
support; support;
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager); * Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
* Lightweight without extra dependencies. * Lightweight without extra dependencies.
### TODO:
* Support recording rules.
### QuickStart ### QuickStart
To build `vmalert` from sources: To build `vmalert` from sources:
@ -26,9 +24,9 @@ make vmalert
The build binary will be placed to `VictoriaMetrics/bin` folder. The build binary will be placed to `VictoriaMetrics/bin` folder.
To start using `vmalert` you will need the following things: To start using `vmalert` you will need the following things:
* list of alert rules - PromQL/MetricsQL expressions to execute; * list of rules - PromQL/MetricsQL expressions to execute;
* datasource address - reachable VictoriaMetrics instance for rules execution; * datasource address - reachable VictoriaMetrics instance for rules execution;
* notifier address - reachable Alertmanager instance for processing, * notifier address - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing,
aggregating alerts and sending notifications. aggregating alerts and sending notifications.
Then configure `vmalert` accordingly: Then configure `vmalert` accordingly:
@ -38,23 +36,28 @@ Then configure `vmalert` accordingly:
-notifier.url=http://localhost:9093 -notifier.url=http://localhost:9093
``` ```
Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules) Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata).
`vmalert` may be configured with `-remoteWrite` flag to write recording rules and
alerts state in form of timeseries via remote write protocol. Alerts state will be written
as `ALERTS` timeseries. These timeseries may be used to recover alerts state on `vmalert`
restarts if `-remoteRead` is configured.
`vmalert` runs evaluation for every group in a separate goroutine. `vmalert` runs evaluation for every group in a separate goroutine.
Rules in group evaluated one-by-one sequentially. Rules in group evaluated one-by-one sequentially.
**Important:** while recording rules execution is sequential, writing of timeseries results to remote
storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous
recording rule is reused in next one.
`vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: `vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
* `http://<vmalert-addr>/api/v1/groups` - list of all loaded groups and rules;
* `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts; * `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts;
* `http://<vmalert-addr>/api/v1/<groupName>/<alertID>/status" ` - get alert status by ID. * `http://<vmalert-addr>/api/v1/<groupName>/<alertID>/status" ` - get alert status by ID.
Used as alert source in AlertManager. Used as alert source in AlertManager.
* `http://<vmalert-addr>/metrics` - application metrics. * `http://<vmalert-addr>/metrics` - application metrics.
* `http://<vmalert-addr>/-/reload` - hot configuration reload. * `http://<vmalert-addr>/-/reload` - hot configuration reload.
`vmalert` may be configured with `-remoteWrite` flag to write alerts state in form of timeseries
via remote write protocol. Alerts state will be written as `ALERTS` timeseries. These timeseries
may be used to recover alerts state on `vmalert` restarts if `-remoteRead` is configured.
### Configuration ### Configuration
The shortlist of configuration flags is the following: The shortlist of configuration flags is the following:
@ -66,20 +69,14 @@ Usage of vmalert:
Optional basic auth username for -datasource.url Optional basic auth username for -datasource.url
-datasource.url string -datasource.url string
Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428 Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP is used
-evaluationInterval duration -evaluationInterval duration
How often to evaluate the rules. Default 1m (default 1m0s) How often to evaluate the rules (default 1m0s)
-external.url string -external.url string
External URL is used as alert's source for sent alerts to the notifier External URL is used as alert's source for sent alerts to the notifier
-http.maxGracefulShutdownDuration duration
The maximum duration for graceful shutdown of HTTP server. Highly loaded server may require increased value for graceful shutdown (default 7s)
-httpAuth.password string
Password for HTTP Basic Auth. The authentication is disabled if -httpAuth.username is empty
-httpAuth.username string
Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
-httpListenAddr string -httpListenAddr string
Address to listen for http connections (default ":8880") Address to listen for http connections (default ":8880")
-metricsAuthKey string
Auth key for /metrics. It overrides httpAuth settings
-notifier.url string -notifier.url string
Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093 Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093
-remoteRead.basicAuth.password string -remoteRead.basicAuth.password string
@ -94,8 +91,12 @@ Usage of vmalert:
Optional basic auth password for -remoteWrite.url Optional basic auth password for -remoteWrite.url
-remoteWrite.basicAuth.username string -remoteWrite.basicAuth.username string
Optional basic auth username for -remoteWrite.url Optional basic auth username for -remoteWrite.url
-remoteWrite.maxQueueSize -remoteWrite.concurrency int
Defines the max number of pending datapoints to remote write endpoint Defines number of readers that concurrently write into remote storage (default 1)
-remoteWrite.maxBatchSize int
Defines defines max number of timeseries to be flushed at once (default 1000)
-remoteWrite.maxQueueSize int
Defines the max number of pending datapoints to remote write endpoint (default 100000)
-remoteWrite.url string -remoteWrite.url string
Optional URL to Victoria Metrics or VMInsert where to persist alerts state in form of timeseries. E.g. http://127.0.0.1:8428 Optional URL to Victoria Metrics or VMInsert where to persist alerts state in form of timeseries. E.g. http://127.0.0.1:8428
-rule value -rule value

376
app/vmalert/alerting.go Normal file
View file

@ -0,0 +1,376 @@
package main
import (
"context"
"fmt"
"hash/fnv"
"sort"
"strconv"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// AlertingRule is basic alert entity
type AlertingRule struct {
Name string
Expr string
For time.Duration
Labels map[string]string
Annotations map[string]string
GroupID uint64
// guard status fields
mu sync.RWMutex
// stores list of active alerts
alerts map[uint64]*notifier.Alert
// stores last moment of time Exec was called
lastExecTime time.Time
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health state
lastExecError error
}
func newAlertingRule(gID uint64, cfg config.Rule) *AlertingRule {
return &AlertingRule{
Name: cfg.Alert,
Expr: cfg.Expr,
For: cfg.For,
Labels: cfg.Labels,
Annotations: cfg.Annotations,
GroupID: gID,
alerts: make(map[uint64]*notifier.Alert),
}
}
// String implements Stringer interface
func (ar *AlertingRule) String() string {
return ar.Name
}
// ID returns unique Rule ID
// within the parent Group.
func (ar *AlertingRule) ID() uint64 {
hash := fnv.New64a()
hash.Write([]byte("alerting"))
hash.Write([]byte("\xff"))
hash.Write([]byte(ar.Name))
return hash.Sum64()
}
// Exec executes AlertingRule expression via the given Querier.
// Based on the Querier results AlertingRule maintains notifier.Alerts
func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) {
qMetrics, err := q.Query(ctx, ar.Expr)
ar.mu.Lock()
defer ar.mu.Unlock()
ar.lastExecError = err
ar.lastExecTime = time.Now()
if err != nil {
return nil, fmt.Errorf("failed to execute query %q: %s", ar.Expr, err)
}
for h, a := range ar.alerts {
// cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive {
delete(ar.alerts, h)
}
}
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range qMetrics {
h := hash(m)
updated[h] = struct{}{}
if a, ok := ar.alerts[h]; ok {
if a.Value != m.Value {
// update Value field with latest value
a.Value = m.Value
// and re-exec template since Value can be used
// in templates
err = ar.template(a)
if err != nil {
return nil, err
}
}
continue
}
a, err := ar.newAlert(m, ar.lastExecTime)
if err != nil {
ar.lastExecError = err
return nil, fmt.Errorf("failed to create alert: %s", err)
}
a.ID = h
a.State = notifier.StatePending
ar.alerts[h] = a
}
for h, a := range ar.alerts {
// if alert wasn't updated in this iteration
// means it is resolved already
if _, ok := updated[h]; !ok {
if a.State == notifier.StatePending {
// alert was in Pending state - it is not
// active anymore
delete(ar.alerts, h)
continue
}
a.State = notifier.StateInactive
continue
}
if a.State == notifier.StatePending && time.Since(a.Start) >= ar.For {
a.State = notifier.StateFiring
alertsFired.Inc()
}
}
if series {
return ar.toTimeSeries(ar.lastExecTime), nil
}
return nil, nil
}
func (ar *AlertingRule) toTimeSeries(timestamp time.Time) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries
for _, a := range ar.alerts {
if a.State == notifier.StateInactive {
continue
}
ts := ar.alertToTimeSeries(a, timestamp)
tss = append(tss, ts...)
}
return tss
}
// copy all significant fields.
// alerts state isn't copied since
// it should be updated in next 2 Execs
func (ar *AlertingRule) UpdateWith(r Rule) error {
nr, ok := r.(*AlertingRule)
if !ok {
return fmt.Errorf("BUG: attempt to update alerting rule with wrong type %#v", r)
}
ar.Expr = nr.Expr
ar.For = nr.For
ar.Labels = nr.Labels
ar.Annotations = nr.Annotations
return nil
}
// TODO: consider hashing algorithm in VM
func hash(m datasource.Metric) uint64 {
hash := fnv.New64a()
labels := m.Labels
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
for _, l := range labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value))
hash.Write([]byte("\xff"))
}
return hash.Sum64()
}
func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time) (*notifier.Alert, error) {
a := &notifier.Alert{
GroupID: ar.GroupID,
Name: ar.Name,
Labels: map[string]string{},
Value: m.Value,
Start: start,
Expr: ar.Expr,
}
for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
a.Labels[l.Name] = l.Value
}
return a, ar.template(a)
}
func (ar *AlertingRule) template(a *notifier.Alert) error {
// 1. template rule labels with data labels
rLabels, err := a.ExecTemplate(ar.Labels)
if err != nil {
return err
}
// 2. merge data labels and rule labels
// metric labels may be overridden by
// rule labels
for k, v := range rLabels {
a.Labels[k] = v
}
// 3. template merged labels
a.Labels, err = a.ExecTemplate(a.Labels)
if err != nil {
return err
}
a.Annotations, err = a.ExecTemplate(ar.Annotations)
return err
}
// AlertAPI generates APIAlert object from alert by its id(hash)
func (ar *AlertingRule) AlertAPI(id uint64) *APIAlert {
ar.mu.RLock()
defer ar.mu.RUnlock()
a, ok := ar.alerts[id]
if !ok {
return nil
}
return ar.newAlertAPI(*a)
}
// RuleAPI returns Rule representation in form
// of APIAlertingRule
func (ar *AlertingRule) RuleAPI() APIAlertingRule {
var lastErr string
if ar.lastExecError != nil {
lastErr = ar.lastExecError.Error()
}
return APIAlertingRule{
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", ar.ID()),
GroupID: fmt.Sprintf("%d", ar.GroupID),
Name: ar.Name,
Expression: ar.Expr,
For: ar.For.String(),
LastError: lastErr,
LastExec: ar.lastExecTime,
Labels: ar.Labels,
Annotations: ar.Annotations,
}
}
// AlertsAPI generates list of APIAlert objects from existing alerts
func (ar *AlertingRule) AlertsAPI() []*APIAlert {
var alerts []*APIAlert
ar.mu.RLock()
for _, a := range ar.alerts {
alerts = append(alerts, ar.newAlertAPI(*a))
}
ar.mu.RUnlock()
return alerts
}
func (ar *AlertingRule) newAlertAPI(a notifier.Alert) *APIAlert {
return &APIAlert{
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", a.ID),
GroupID: fmt.Sprintf("%d", a.GroupID),
Name: a.Name,
Expression: ar.Expr,
Labels: a.Labels,
Annotations: a.Annotations,
State: a.State.String(),
ActiveAt: a.Start,
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
}
}
const (
// AlertMetricName is the metric name for synthetic alert timeseries.
alertMetricName = "ALERTS"
// AlertForStateMetricName is the metric name for 'for' state of alert.
alertForStateMetricName = "ALERTS_FOR_STATE"
// AlertNameLabel is the label name indicating the name of an alert.
alertNameLabel = "alertname"
// AlertStateLabel is the label name indicating the state of an alert.
alertStateLabel = "alertstate"
)
// alertToTimeSeries converts the given alert with the given timestamp to timeseries
func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries
tss = append(tss, alertToTimeSeries(ar.Name, a, timestamp))
if ar.For > 0 {
tss = append(tss, alertForToTimeSeries(ar.Name, a, timestamp))
}
return tss
}
func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for k, v := range a.Labels {
labels[k] = v
}
labels["__name__"] = alertMetricName
labels[alertNameLabel] = name
labels[alertStateLabel] = a.State.String()
return newTimeSeries(1, labels, timestamp)
}
// alertForToTimeSeries returns a timeseries that represents
// state of active alerts, where value is time when alert become active
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for k, v := range a.Labels {
labels[k] = v
}
labels["__name__"] = alertForStateMetricName
labels[alertNameLabel] = name
return newTimeSeries(float64(a.Start.Unix()), labels, timestamp)
}
// Restore restores the state of active alerts basing on previously written timeseries.
// Restore restores only Start field. Field State will be always Pending and supposed
// to be updated on next Exec, as well as Value field.
// Only rules with For > 0 will be restored.
func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
if q == nil {
return fmt.Errorf("querier is nil")
}
// Get the last datapoint in range via MetricsQL `last_over_time`.
// We don't use plain PromQL since Prometheus doesn't support
// remote write protocol which is used for state persistence in vmalert.
expr := fmt.Sprintf("last_over_time(%s{alertname=%q}[%ds])",
alertForStateMetricName, ar.Name, int(lookback.Seconds()))
qMetrics, err := q.Query(ctx, expr)
if err != nil {
return err
}
for _, m := range qMetrics {
labels := m.Labels
m.Labels = make([]datasource.Label, 0)
// drop all extra labels, so hash key will
// be identical to timeseries received in Exec
for _, l := range labels {
if l.Name == alertNameLabel {
continue
}
// drop all overridden labels
if _, ok := ar.Labels[l.Name]; ok {
continue
}
m.Labels = append(m.Labels, l)
}
a, err := ar.newAlert(m, time.Unix(int64(m.Value), 0))
if err != nil {
return fmt.Errorf("failed to create alert: %s", err)
}
a.ID = hash(m)
a.State = notifier.StatePending
ar.alerts[a.ID] = a
logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start)
}
return nil
}

View file

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"sync"
"testing" "testing"
"time" "time"
@ -11,30 +10,15 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
func TestRule_Validate(t *testing.T) { func TestAlertingRule_ToTimeSeries(t *testing.T) {
if err := (&Rule{}).Validate(); err == nil {
t.Errorf("exptected empty name error")
}
if err := (&Rule{Name: "alert"}).Validate(); err == nil {
t.Errorf("exptected empty expr error")
}
if err := (&Rule{Name: "alert", Expr: "test{"}).Validate(); err == nil {
t.Errorf("exptected invalid expr error")
}
if err := (&Rule{Name: "alert", Expr: "test>0"}).Validate(); err != nil {
t.Errorf("exptected valid rule got %s", err)
}
}
func TestRule_AlertToTimeSeries(t *testing.T) {
timestamp := time.Now() timestamp := time.Now()
testCases := []struct { testCases := []struct {
rule *Rule rule *AlertingRule
alert *notifier.Alert alert *notifier.Alert
expTS []prompbmarshal.TimeSeries expTS []prompbmarshal.TimeSeries
}{ }{
{ {
newTestRule("instant", 0), newTestAlertingRule("instant", 0),
&notifier.Alert{State: notifier.StateFiring}, &notifier.Alert{State: notifier.StateFiring},
[]prompbmarshal.TimeSeries{ []prompbmarshal.TimeSeries{
newTimeSeries(1, map[string]string{ newTimeSeries(1, map[string]string{
@ -45,7 +29,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) {
}, },
}, },
{ {
newTestRule("instant extra labels", 0), newTestAlertingRule("instant extra labels", 0),
&notifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ &notifier.Alert{State: notifier.StateFiring, Labels: map[string]string{
"job": "foo", "job": "foo",
"instance": "bar", "instance": "bar",
@ -61,7 +45,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) {
}, },
}, },
{ {
newTestRule("instant labels override", 0), newTestAlertingRule("instant labels override", 0),
&notifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ &notifier.Alert{State: notifier.StateFiring, Labels: map[string]string{
alertStateLabel: "foo", alertStateLabel: "foo",
"__name__": "bar", "__name__": "bar",
@ -75,7 +59,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) {
}, },
}, },
{ {
newTestRule("for", time.Second), newTestAlertingRule("for", time.Second),
&notifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)}, &notifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)},
[]prompbmarshal.TimeSeries{ []prompbmarshal.TimeSeries{
newTimeSeries(1, map[string]string{ newTimeSeries(1, map[string]string{
@ -90,7 +74,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) {
}, },
}, },
{ {
newTestRule("for pending", 10*time.Second), newTestAlertingRule("for pending", 10*time.Second),
&notifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)}, &notifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)},
[]prompbmarshal.TimeSeries{ []prompbmarshal.TimeSeries{
newTimeSeries(1, map[string]string{ newTimeSeries(1, map[string]string{
@ -107,58 +91,28 @@ func TestRule_AlertToTimeSeries(t *testing.T) {
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
tss := tc.rule.AlertToTimeSeries(tc.alert, timestamp) tc.rule.alerts[tc.alert.ID] = tc.alert
if len(tc.expTS) != len(tss) { tss := tc.rule.toTimeSeries(timestamp)
t.Fatalf("expected number of timeseries %d; got %d", len(tc.expTS), len(tss)) if err := compareTimeSeries(t, tc.expTS, tss); err != nil {
} t.Fatalf("timeseries missmatch: %s", err)
for i := range tc.expTS {
expTS, gotTS := tc.expTS[i], tss[i]
if len(expTS.Samples) != len(gotTS.Samples) {
t.Fatalf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples))
}
for i, exp := range expTS.Samples {
got := gotTS.Samples[i]
if got.Value != exp.Value {
t.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value)
}
if got.Timestamp != exp.Timestamp {
t.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp)
}
}
if len(expTS.Labels) != len(gotTS.Labels) {
t.Fatalf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels))
}
for i, exp := range expTS.Labels {
got := gotTS.Labels[i]
if got.Name != exp.Name {
t.Errorf("expected label name %q; got %q", exp.Name, got.Name)
}
if got.Value != exp.Value {
t.Errorf("expected label value %q; got %q", exp.Value, got.Value)
}
}
} }
}) })
} }
} }
func newTestRule(name string, waitFor time.Duration) *Rule { func TestAlertingRule_Exec(t *testing.T) {
return &Rule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor}
}
func TestRule_Exec(t *testing.T) {
testCases := []struct { testCases := []struct {
rule *Rule rule *AlertingRule
steps [][]datasource.Metric steps [][]datasource.Metric
expAlerts map[uint64]*notifier.Alert expAlerts map[uint64]*notifier.Alert
}{ }{
{ {
newTestRule("empty", 0), newTestAlertingRule("empty", 0),
[][]datasource.Metric{}, [][]datasource.Metric{},
map[uint64]*notifier.Alert{}, map[uint64]*notifier.Alert{},
}, },
{ {
newTestRule("empty labels", 0), newTestAlertingRule("empty labels", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{datasource.Metric{}}, {datasource.Metric{}},
}, },
@ -167,7 +121,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("single-firing", 0), newTestAlertingRule("single-firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
@ -176,7 +130,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("single-firing=>inactive", 0), newTestAlertingRule("single-firing=>inactive", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
@ -186,7 +140,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("single-firing=>inactive=>firing", 0), newTestAlertingRule("single-firing=>inactive=>firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
@ -197,7 +151,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("single-firing=>inactive=>firing=>inactive", 0), newTestAlertingRule("single-firing=>inactive=>firing=>inactive", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
@ -209,7 +163,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("single-firing=>inactive=>firing=>inactive=>empty", 0), newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
@ -220,7 +174,7 @@ func TestRule_Exec(t *testing.T) {
map[uint64]*notifier.Alert{}, map[uint64]*notifier.Alert{},
}, },
{ {
newTestRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0), newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
@ -234,7 +188,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("multiple-firing", 0), newTestAlertingRule("multiple-firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{ {
metricWithLabels(t, "name", "foo"), metricWithLabels(t, "name", "foo"),
@ -249,7 +203,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("multiple-steps-firing", 0), newTestAlertingRule("multiple-steps-firing", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "name", "foo1")}, {metricWithLabels(t, "name", "foo1")},
@ -264,7 +218,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("duplicate", 0), newTestAlertingRule("duplicate", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{ {
// metrics with the same labelset should result in one alert // metrics with the same labelset should result in one alert
@ -277,7 +231,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("for-pending", time.Minute), newTestAlertingRule("for-pending", time.Minute),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
}, },
@ -286,7 +240,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("for-fired", time.Millisecond), newTestAlertingRule("for-fired", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
@ -296,7 +250,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("for-pending=>empty", time.Second), newTestAlertingRule("for-pending=>empty", time.Second),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
@ -306,7 +260,7 @@ func TestRule_Exec(t *testing.T) {
map[uint64]*notifier.Alert{}, map[uint64]*notifier.Alert{},
}, },
{ {
newTestRule("for-pending=>firing=>inactive", time.Millisecond), newTestAlertingRule("for-pending=>firing=>inactive", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
@ -318,10 +272,10 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("for-pending=>firing=>inactive=>pending", time.Millisecond), newTestAlertingRule("for-pending=>firing=>inactive=>pending", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, //{metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "name", "foo")}, //{metricWithLabels(t, "name", "foo")},
// empty step to reset pending alerts // empty step to reset pending alerts
{}, {},
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
@ -331,7 +285,7 @@ func TestRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond), newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
@ -349,11 +303,11 @@ func TestRule_Exec(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
tc.rule.group = fakeGroup tc.rule.GroupID = fakeGroup.ID()
for _, step := range tc.steps { for _, step := range tc.steps {
fq.reset() fq.reset()
fq.add(step...) fq.add(step...)
if err := tc.rule.Exec(context.TODO(), fq); err != nil { if _, err := tc.rule.Exec(context.TODO(), fq, false); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
// artificial delay between applying steps // artificial delay between applying steps
@ -375,49 +329,9 @@ func TestRule_Exec(t *testing.T) {
} }
} }
func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { func TestAlertingRule_Restore(t *testing.T) {
t.Helper()
if len(labels) == 0 || len(labels)%2 != 0 {
t.Fatalf("expected to get even number of labels")
}
m := datasource.Metric{}
for i := 0; i < len(labels); i += 2 {
m.Labels = append(m.Labels, datasource.Label{
Name: labels[i],
Value: labels[i+1],
})
}
return m
}
type fakeQuerier struct {
sync.Mutex
metrics []datasource.Metric
}
func (fq *fakeQuerier) reset() {
fq.Lock()
fq.metrics = fq.metrics[:0]
fq.Unlock()
}
func (fq *fakeQuerier) add(metrics ...datasource.Metric) {
fq.Lock()
fq.metrics = append(fq.metrics, metrics...)
fq.Unlock()
}
func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) {
fq.Lock()
cpy := make([]datasource.Metric, len(fq.metrics))
copy(cpy, fq.metrics)
fq.Unlock()
return cpy, nil
}
func TestRule_Restore(t *testing.T) {
testCases := []struct { testCases := []struct {
rule *Rule rule *AlertingRule
metrics []datasource.Metric metrics []datasource.Metric
expAlerts map[uint64]*notifier.Alert expAlerts map[uint64]*notifier.Alert
}{ }{
@ -502,7 +416,7 @@ func TestRule_Restore(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
tc.rule.group = fakeGroup tc.rule.GroupID = fakeGroup.ID()
fq.add(tc.metrics...) fq.add(tc.metrics...)
if err := tc.rule.Restore(context.TODO(), fq, time.Hour); err != nil { if err := tc.rule.Restore(context.TODO(), fq, time.Hour); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
@ -526,8 +440,8 @@ func TestRule_Restore(t *testing.T) {
} }
} }
func newTestRuleWithLabels(name string, labels ...string) *Rule { func newTestRuleWithLabels(name string, labels ...string) *AlertingRule {
r := newTestRule(name, 0) r := newTestAlertingRule(name, 0)
r.Labels = make(map[string]string) r.Labels = make(map[string]string)
for i := 0; i < len(labels); i += 2 { for i := 0; i < len(labels); i += 2 {
r.Labels[labels[i]] = labels[i+1] r.Labels[labels[i]] = labels[i+1]
@ -535,9 +449,6 @@ func newTestRuleWithLabels(name string, labels ...string) *Rule {
return r return r
} }
func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric { func newTestAlertingRule(name string, waitFor time.Duration) *AlertingRule {
t.Helper() return &AlertingRule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor}
m := metricWithLabels(t, labels...)
m.Value = value
return m
} }

View file

@ -1,74 +0,0 @@
package main
import (
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
"path/filepath"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
// Parse parses rule configs from given file patterns
func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) {
var fp []string
for _, pattern := range pathPatterns {
matches, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("error reading file patther %s:%v", pattern, err)
}
fp = append(fp, matches...)
}
var groups []Group
for _, file := range fp {
groupsNames := map[string]struct{}{}
gr, err := parseFile(file)
if err != nil {
return nil, fmt.Errorf("file %s: %w", file, err)
}
for _, g := range gr {
if _, ok := groupsNames[g.Name]; ok {
return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", g.Name, file)
}
g.File = file
g.doneCh = make(chan struct{})
g.finishedCh = make(chan struct{})
g.updateCh = make(chan Group)
groupsNames[g.Name] = struct{}{}
for _, rule := range g.Rules {
if err = rule.Validate(); err != nil {
return nil, fmt.Errorf("invalid rule filepath: %s, group %s: %w", file, g.Name, err)
}
if validateAnnotations {
if err = notifier.ValidateTemplates(rule.Annotations); err != nil {
return nil, fmt.Errorf("invalid annotations filepath: %s, group %s: %w", file, g.Name, err)
}
if err = notifier.ValidateTemplates(rule.Labels); err != nil {
return nil, fmt.Errorf("invalid labels filepath: %s, group %s: %w", file, g.Name, err)
}
}
rule.group = g
rule.alerts = make(map[uint64]*notifier.Alert)
}
groups = append(groups, g)
}
}
if len(groups) < 1 {
return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";"))
}
return groups, nil
}
func parseFile(path string) ([]Group, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("error reading alert rule file: %w", err)
}
g := struct {
Groups []Group `yaml:"groups"`
}{}
err = yaml.Unmarshal(data, &g)
return g.Groups, err
}

View file

@ -0,0 +1,147 @@
package config
import (
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
"path/filepath"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/metricsql"
)
// Group contains list of Rules grouped into
// entity with one name and evaluation interval
type Group struct {
File string
Name string `yaml:"name"`
Interval time.Duration `yaml:"interval,omitempty"`
Rules []Rule `yaml:"rules"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
// Validate check for internal Group or Rule configuration errors
func (g *Group) Validate(validateAnnotations bool) error {
if g.Name == "" {
return fmt.Errorf("group name must be set")
}
if len(g.Rules) == 0 {
return fmt.Errorf("group %q can't contain no rules", g.Name)
}
uniqueRules := map[string]struct{}{}
for _, r := range g.Rules {
ruleName := r.Record
if r.Alert != "" {
ruleName = r.Alert
}
if _, ok := uniqueRules[ruleName]; ok {
return fmt.Errorf("rule name %q duplicate", ruleName)
}
uniqueRules[ruleName] = struct{}{}
if err := r.Validate(); err != nil {
return fmt.Errorf("invalid rule %q.%q: %s", g.Name, ruleName, err)
}
if !validateAnnotations {
continue
}
if err := notifier.ValidateTemplates(r.Annotations); err != nil {
return fmt.Errorf("invalid annotations for rule %q.%q: %s", g.Name, ruleName, err)
}
if err := notifier.ValidateTemplates(r.Labels); err != nil {
return fmt.Errorf("invalid labels for rule %q.%q: %s", g.Name, ruleName, err)
}
}
return checkOverflow(g.XXX, fmt.Sprintf("group %q", g.Name))
}
// Rule describes entity that represent either
// recording rule or alerting rule.
type Rule struct {
Record string `yaml:"record,omitempty"`
Alert string `yaml:"alert,omitempty"`
Expr string `yaml:"expr"`
For time.Duration `yaml:"for,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
Annotations map[string]string `yaml:"annotations,omitempty"`
}
// Validate check for Rule configuration errors
func (r *Rule) Validate() error {
if (r.Record == "" && r.Alert == "") || (r.Record != "" && r.Alert != "") {
return fmt.Errorf("either `record` or `alert` must be set")
}
if r.Expr == "" {
return fmt.Errorf("expression can't be empty")
}
if _, err := metricsql.Parse(r.Expr); err != nil {
return fmt.Errorf("invalid expression: %w", err)
}
return nil
}
// Parse parses rule configs from given file patterns
func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) {
var fp []string
for _, pattern := range pathPatterns {
matches, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("error reading file pattern %s: %v", pattern, err)
}
fp = append(fp, matches...)
}
var groups []Group
for _, file := range fp {
uniqueGroups := map[string]struct{}{}
gr, err := parseFile(file)
if err != nil {
return nil, fmt.Errorf("failed to parse file %q: %w", file, err)
}
for _, g := range gr {
if err := g.Validate(validateAnnotations); err != nil {
return nil, fmt.Errorf("invalid group %q in file %q: %s", g.Name, file, err)
}
if _, ok := uniqueGroups[g.Name]; ok {
return nil, fmt.Errorf("group name %q duplicate in file %q", g.Name, file)
}
uniqueGroups[g.Name] = struct{}{}
g.File = file
groups = append(groups, g)
}
}
if len(groups) < 1 {
return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";"))
}
return groups, nil
}
func parseFile(path string) ([]Group, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("error reading alert rule file: %w", err)
}
g := struct {
Groups []Group `yaml:"groups"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}{}
err = yaml.Unmarshal(data, &g)
if err != nil {
return nil, err
}
return g.Groups, checkOverflow(g.XXX, "config")
}
func checkOverflow(m map[string]interface{}, ctx string) error {
if len(m) > 0 {
var keys []string
for k := range m {
keys = append(keys, k)
}
return fmt.Errorf("unknown fields in %s: %s", ctx, strings.Join(keys, ", "))
}
return nil
}

View file

@ -0,0 +1,83 @@
package config
import (
"net/url"
"os"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
func TestMain(m *testing.M) {
u, _ := url.Parse("https://victoriametrics.com/path")
notifier.InitTemplateFunc(u)
os.Exit(m.Run())
}
func TestParseGood(t *testing.T) {
if _, err := Parse([]string{"testdata/*good.rules", "testdata/dir/*good.*"}, true); err != nil {
t.Errorf("error parsing files %s", err)
}
}
func TestParseBad(t *testing.T) {
testCases := []struct {
path []string
expErr string
}{
{
[]string{"testdata/rules0-bad.rules"},
"unexpected token",
},
{
[]string{"testdata/dir/rules0-bad.rules"},
"error parsing annotation",
},
{
[]string{"testdata/dir/rules1-bad.rules"},
"duplicate in file",
},
{
[]string{"testdata/dir/rules2-bad.rules"},
"function \"value\" not defined",
},
{
[]string{"testdata/dir/rules3-bad.rules"},
"either `record` or `alert` must be set",
},
{
[]string{"testdata/dir/rules4-bad.rules"},
"either `record` or `alert` must be set",
},
{
[]string{"testdata/*.yaml"},
"no groups found",
},
}
for _, tc := range testCases {
_, err := Parse(tc.path, true)
if err == nil {
t.Errorf("expected to get error")
return
}
if !strings.Contains(err.Error(), tc.expErr) {
t.Errorf("expected err to contain %q; got %q instead", tc.expErr, err)
}
}
}
func TestRule_Validate(t *testing.T) {
if err := (&Rule{}).Validate(); err == nil {
t.Errorf("exptected empty name error")
}
if err := (&Rule{Alert: "alert"}).Validate(); err == nil {
t.Errorf("exptected empty expr error")
}
if err := (&Rule{Alert: "alert", Expr: "test{"}).Validate(); err == nil {
t.Errorf("exptected invalid expr error")
}
if err := (&Rule{Alert: "alert", Expr: "test>0"}).Validate(); err != nil {
t.Errorf("exptected valid rule got %s", err)
}
}

View file

@ -9,5 +9,3 @@ groups:
annotations: annotations:
summary: "{{ $value }}" summary: "{{ $value }}"
description: "{{$labels}}" description: "{{$labels}}"

View file

@ -0,0 +1,5 @@
groups:
- name: group
rules:
- for: 5m
expr: vm_rows > 0

View file

@ -0,0 +1,7 @@
groups:
- name: group
rules:
- alert: rows
record: record
for: 5m
expr: vm_rows > 0

View file

@ -0,0 +1,7 @@
groups:
- name: group
rules:
- alert: rows
expr: vm_rows > 0
- record: rows
expr: sum(vm_rows)

View file

@ -0,0 +1,28 @@
groups:
- name: TestGroup
interval: 2s
rules:
- alert: Conns
expr: sum(vm_tcplistener_conns) by(instance) > 1
for: 3m
annotations:
summary: "Too high connection number for {{$labels.instance}}"
description: "It is {{ $value }} connections for {{$labels.instance}}"
- alert: ExampleAlertAlwaysFiring
expr: sum by(job)
(up == 1)
- record: handler:requests:rate5m
expr: sum(rate(prometheus_http_requests_total[5m])) by (handler)
labels:
recording: true
- record: code:requests:rate5m
expr: sum(rate(promhttp_metric_handler_requests_total[5m])) by (code)
labels:
recording: true
- record: successful_requests:ratio_rate5m
labels:
recording: true
expr: |2
sum(code:requests:rate5m{code="200"})
/
sum(code:requests:rate5m)

View file

@ -1,39 +0,0 @@
package main
import (
"net/url"
"os"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
)
func TestMain(m *testing.M) {
u, _ := url.Parse("https://victoriametrics.com/path")
notifier.InitTemplateFunc(u)
os.Exit(m.Run())
}
func TestParseGood(t *testing.T) {
if _, err := Parse([]string{"testdata/*good.rules", "testdata/dir/*good.*"}, true); err != nil {
t.Errorf("error parsing files %s", err)
}
}
func TestParseBad(t *testing.T) {
if _, err := Parse([]string{"testdata/rules0-bad.rules"}, true); err == nil {
t.Errorf("expected syntaxt error")
}
if _, err := Parse([]string{"testdata/dir/rules0-bad.rules"}, true); err == nil {
t.Errorf("expected template annotation error")
}
if _, err := Parse([]string{"testdata/dir/rules1-bad.rules"}, true); err == nil {
t.Errorf("expected same group error")
}
if _, err := Parse([]string{"testdata/dir/rules2-bad.rules"}, true); err == nil {
t.Errorf("expected template label error")
}
if _, err := Parse([]string{"testdata/*.yaml"}, true); err == nil {
t.Errorf("expected empty group")
}
}

View file

@ -4,8 +4,10 @@ import (
"context" "context"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
@ -15,15 +17,44 @@ import (
// Group is an entity for grouping rules // Group is an entity for grouping rules
type Group struct { type Group struct {
Name string Name string
File string File string
Rules []*Rule Rules []Rule
Interval time.Duration
doneCh chan struct{} doneCh chan struct{}
finishedCh chan struct{} finishedCh chan struct{}
// channel accepts new Group obj // channel accepts new Group obj
// which supposed to update current group // which supposed to update current group
updateCh chan Group updateCh chan *Group
mu sync.RWMutex
}
func newGroup(cfg config.Group, defaultInterval time.Duration) *Group {
g := &Group{
Name: cfg.Name,
File: cfg.File,
Interval: cfg.Interval,
doneCh: make(chan struct{}),
finishedCh: make(chan struct{}),
updateCh: make(chan *Group),
}
if g.Interval == 0 {
g.Interval = defaultInterval
}
rules := make([]Rule, len(cfg.Rules))
for i, r := range cfg.Rules {
rules[i] = g.newRule(r)
}
g.Rules = rules
return g
}
func (g *Group) newRule(rule config.Rule) Rule {
if rule.Alert != "" {
return newAlertingRule(g.ID(), rule)
}
return newRecordingRule(g.ID(), rule)
} }
// ID return unique group ID that consists of // ID return unique group ID that consists of
@ -36,48 +67,49 @@ func (g *Group) ID() uint64 {
return hash.Sum64() return hash.Sum64()
} }
// Restore restores alerts state for all group rules with For > 0 // Restore restores alerts state for group rules
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error { func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
for _, rule := range g.Rules { for _, rule := range g.Rules {
if rule.For == 0 { rr, ok := rule.(*AlertingRule)
return nil if !ok {
continue
} }
if err := rule.Restore(ctx, q, lookback); err != nil { if rr.For < 1 {
return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err) continue
}
if err := rr.Restore(ctx, q, lookback); err != nil {
return fmt.Errorf("error while restoring rule %q: %s", rule, err)
} }
} }
return nil return nil
} }
// updateWith updates existing group with // updateWith updates existing group with
// passed group object. // passed group object. This function ignores group
// evaluation interval change. It supposed to be updated
// in group.start function.
// Not thread-safe. // Not thread-safe.
func (g *Group) updateWith(newGroup Group) { func (g *Group) updateWith(newGroup *Group) error {
rulesRegistry := make(map[string]*Rule) rulesRegistry := make(map[uint64]Rule)
for _, nr := range newGroup.Rules { for _, nr := range newGroup.Rules {
rulesRegistry[nr.id()] = nr rulesRegistry[nr.ID()] = nr
} }
for i, or := range g.Rules { for i, or := range g.Rules {
nr, ok := rulesRegistry[or.id()] nr, ok := rulesRegistry[or.ID()]
if !ok { if !ok {
// old rule is not present in the new list // old rule is not present in the new list
// so we mark it for removing // so we mark it for removing
g.Rules[i] = nil g.Rules[i] = nil
continue continue
} }
if err := or.UpdateWith(nr); err != nil {
// copy all significant fields. return err
// alerts state isn't copied since }
// it should be updated in next 2 Execs delete(rulesRegistry, nr.ID())
or.For = nr.For
or.Expr = nr.Expr
or.Labels = nr.Labels
or.Annotations = nr.Annotations
delete(rulesRegistry, nr.id())
} }
var newRules []*Rule var newRules []Rule
for _, r := range g.Rules { for _, r := range g.Rules {
if r == nil { if r == nil {
// skip nil rules // skip nil rules
@ -90,6 +122,7 @@ func (g *Group) updateWith(newGroup Group) {
newRules = append(newRules, nr) newRules = append(newRules, nr)
} }
g.Rules = newRules g.Rules = newRules
return nil
} }
var ( var (
@ -116,10 +149,15 @@ func (g *Group) close() {
<-g.finishedCh <-g.finishedCh
} }
func (g *Group) start(ctx context.Context, interval time.Duration, func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) { logger.Infof("group %q started with interval %v", g.Name, g.Interval)
logger.Infof("group %q started", g.Name)
t := time.NewTicker(interval) var returnSeries bool
if rw != nil {
returnSeries = true
}
t := time.NewTicker(g.Interval)
defer t.Stop() defer t.Stop()
for { for {
select { select {
@ -132,7 +170,20 @@ func (g *Group) start(ctx context.Context, interval time.Duration,
close(g.finishedCh) close(g.finishedCh)
return return
case ng := <-g.updateCh: case ng := <-g.updateCh:
g.updateWith(ng) g.mu.Lock()
err := g.updateWith(ng)
if err != nil {
logger.Errorf("group %q: failed to update: %s", g.Name, err)
g.mu.Unlock()
continue
}
if g.Interval != ng.Interval {
g.Interval = ng.Interval
t.Stop()
t = time.NewTicker(g.Interval)
logger.Infof("group %q: changed evaluation interval to %v", g.Name, g.Interval)
}
g.mu.Unlock()
case <-t.C: case <-t.C:
iterationTotal.Inc() iterationTotal.Inc()
iterationStart := time.Now() iterationStart := time.Now()
@ -140,58 +191,55 @@ func (g *Group) start(ctx context.Context, interval time.Duration,
execTotal.Inc() execTotal.Inc()
execStart := time.Now() execStart := time.Now()
err := rule.Exec(ctx, querier) tss, err := rule.Exec(ctx, querier, returnSeries)
execDuration.UpdateDuration(execStart) execDuration.UpdateDuration(execStart)
if err != nil { if err != nil {
execErrors.Inc() execErrors.Inc()
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err) logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule, err)
continue continue
} }
var alertsToSend []notifier.Alert if len(tss) > 0 {
for _, a := range rule.alerts { remoteWriteSent.Add(len(tss))
for _, ts := range tss {
if err := rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
logger.Errorf("failed to remote write for rule %q.%q: %s", g.Name, rule, err)
}
}
}
ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
var alerts []notifier.Alert
for _, a := range ar.alerts {
switch a.State { switch a.State {
case notifier.StateFiring: case notifier.StateFiring:
// set End to execStart + 3 intervals // set End to execStart + 3 intervals
// so notifier can resolve it automatically if `vmalert` // so notifier can resolve it automatically if `vmalert`
// won't be able to send resolve for some reason // won't be able to send resolve for some reason
a.End = execStart.Add(3 * interval) a.End = execStart.Add(3 * g.Interval)
alertsToSend = append(alertsToSend, *a) alerts = append(alerts, *a)
pushToRW(rw, rule, a, execStart)
case notifier.StatePending:
pushToRW(rw, rule, a, execStart)
case notifier.StateInactive: case notifier.StateInactive:
// set End to execStart to notify // set End to execStart to notify
// that it was just resolved // that it was just resolved
a.End = execStart a.End = execStart
alertsToSend = append(alertsToSend, *a) alerts = append(alerts, *a)
} }
} }
if len(alertsToSend) == 0 { if len(alerts) < 1 {
continue continue
} }
alertsSent.Add(len(alertsToSend)) alertsSent.Add(len(alerts))
if err := nr.Send(ctx, alertsToSend); err != nil { if err := nr.Send(ctx, alerts); err != nil {
alertsSendErrors.Inc() alertsSendErrors.Inc()
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err) logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule, err)
} }
} }
iterationDuration.UpdateDuration(iterationStart) iterationDuration.UpdateDuration(iterationStart)
} }
} }
} }
func pushToRW(rw *remotewrite.Client, rule *Rule, a *notifier.Alert, timestamp time.Time) {
if rw == nil {
return
}
tss := rule.AlertToTimeSeries(a, timestamp)
remoteWriteSent.Add(len(tss))
for _, ts := range tss {
if err := rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
}
}
}

View file

@ -4,28 +4,28 @@ import (
"context" "context"
"reflect" "reflect"
"sort" "sort"
"sync"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
) )
func TestUpdateWith(t *testing.T) { func TestUpdateWith(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
currentRules []*Rule currentRules []Rule
// rules must be sorted by name // rules must be sorted by ID
newRules []*Rule newRules []Rule
}{ }{
{ {
"new rule", "new rule",
[]*Rule{}, []Rule{},
[]*Rule{{Name: "bar"}}, []Rule{&AlertingRule{Name: "bar"}},
}, },
{ {
"update rule", "update alerting rule",
[]*Rule{{ []Rule{&AlertingRule{
Name: "foo", Name: "foo",
Expr: "up > 0", Expr: "up > 0",
For: time.Second, For: time.Second,
@ -37,8 +37,8 @@ func TestUpdateWith(t *testing.T) {
"description": "{{$labels}}", "description": "{{$labels}}",
}, },
}}, }},
[]*Rule{{ []Rule{&AlertingRule{
Name: "bar", Name: "foo",
Expr: "up > 10", Expr: "up > 10",
For: time.Second, For: time.Second,
Labels: map[string]string{ Labels: map[string]string{
@ -49,56 +49,82 @@ func TestUpdateWith(t *testing.T) {
}, },
}}, }},
}, },
{
"update recording rule",
[]Rule{&RecordingRule{
Name: "foo",
Expr: "max(up)",
Labels: map[string]string{
"bar": "baz",
},
}},
[]Rule{&RecordingRule{
Name: "foo",
Expr: "min(up)",
Labels: map[string]string{
"baz": "bar",
},
}},
},
{ {
"empty rule", "empty rule",
[]*Rule{{Name: "foo"}}, []Rule{&AlertingRule{Name: "foo"}, &RecordingRule{Name: "bar"}},
[]*Rule{}, []Rule{},
}, },
{ {
"multiple rules", "multiple rules",
[]*Rule{{Name: "bar"}, {Name: "baz"}, {Name: "foo"}}, []Rule{
[]*Rule{{Name: "baz"}, {Name: "foo"}}, &AlertingRule{Name: "bar"},
&AlertingRule{Name: "baz"},
&RecordingRule{Name: "foo"},
},
[]Rule{
&AlertingRule{Name: "baz"},
&RecordingRule{Name: "foo"},
},
}, },
{ {
"replace rule", "replace rule",
[]*Rule{{Name: "foo1"}}, []Rule{&AlertingRule{Name: "foo1"}},
[]*Rule{{Name: "foo2"}}, []Rule{&AlertingRule{Name: "foo2"}},
}, },
{ {
"replace multiple rules", "replace multiple rules",
[]*Rule{{Name: "foo1"}, {Name: "foo2"}}, []Rule{
[]*Rule{{Name: "foo3"}, {Name: "foo4"}}, &AlertingRule{Name: "foo1"},
&RecordingRule{Name: "foo2"},
&AlertingRule{Name: "foo3"},
},
[]Rule{
&AlertingRule{Name: "foo3"},
&AlertingRule{Name: "foo4"},
&RecordingRule{Name: "foo5"},
},
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
g := &Group{Rules: tc.currentRules} g := &Group{Rules: tc.currentRules}
g.updateWith(Group{Rules: tc.newRules}) err := g.updateWith(&Group{Rules: tc.newRules})
if err != nil {
t.Fatal(err)
}
if len(g.Rules) != len(tc.newRules) { if len(g.Rules) != len(tc.newRules) {
t.Fatalf("expected to have %d rules; got: %d", t.Fatalf("expected to have %d rules; got: %d",
len(g.Rules), len(tc.newRules)) len(g.Rules), len(tc.newRules))
} }
sort.Slice(g.Rules, func(i, j int) bool { sort.Slice(g.Rules, func(i, j int) bool {
return g.Rules[i].Name < g.Rules[j].Name return g.Rules[i].ID() < g.Rules[j].ID()
}) })
for i, r := range g.Rules { for i, r := range g.Rules {
got, want := r, tc.newRules[i] got, want := r, tc.newRules[i]
if got.Name != want.Name { if got.ID() != want.ID() {
t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name) t.Fatalf("expected to have rule %q; got %q", want, got)
} }
if got.Expr != want.Expr { if err := compareRules(t, got, want); err != nil {
t.Fatalf("expected to have expression %q; got %q", want.Expr, got.Expr) t.Fatalf("comparsion error: %s", err)
}
if got.For != want.For {
t.Fatalf("expected to have for %q; got %q", want.For, got.For)
}
if !reflect.DeepEqual(got.Annotations, want.Annotations) {
t.Fatalf("expected to have annotations %#v; got %#v", want.Annotations, got.Annotations)
}
if !reflect.DeepEqual(got.Labels, want.Labels) {
t.Fatalf("expected to have labels %#v; got %#v", want.Labels, got.Labels)
} }
} }
}) })
@ -107,11 +133,12 @@ func TestUpdateWith(t *testing.T) {
func TestGroupStart(t *testing.T) { func TestGroupStart(t *testing.T) {
// TODO: make parsing from string instead of file // TODO: make parsing from string instead of file
groups, err := Parse([]string{"testdata/rules1-good.rules"}, true) groups, err := config.Parse([]string{"config/testdata/rules1-good.rules"}, true)
if err != nil { if err != nil {
t.Fatalf("failed to parse rules: %s", err) t.Fatalf("failed to parse rules: %s", err)
} }
g := groups[0] const evalInterval = time.Millisecond
g := newGroup(groups[0], evalInterval)
fn := &fakeNotifier{} fn := &fakeNotifier{}
fs := &fakeQuerier{} fs := &fakeQuerier{}
@ -120,27 +147,26 @@ func TestGroupStart(t *testing.T) {
m1 := metricWithLabels(t, "instance", inst1, "job", job) m1 := metricWithLabels(t, "instance", inst1, "job", job)
m2 := metricWithLabels(t, "instance", inst2, "job", job) m2 := metricWithLabels(t, "instance", inst2, "job", job)
r := g.Rules[0] r := g.Rules[0].(*AlertingRule)
alert1, err := r.newAlert(m1) alert1, err := r.newAlert(m1, time.Now())
if err != nil { if err != nil {
t.Fatalf("faield to create alert: %s", err) t.Fatalf("faield to create alert: %s", err)
} }
alert1.State = notifier.StateFiring alert1.State = notifier.StateFiring
alert1.ID = hash(m1) alert1.ID = hash(m1)
alert2, err := r.newAlert(m2) alert2, err := r.newAlert(m2, time.Now())
if err != nil { if err != nil {
t.Fatalf("faield to create alert: %s", err) t.Fatalf("faield to create alert: %s", err)
} }
alert2.State = notifier.StateFiring alert2.State = notifier.StateFiring
alert2.ID = hash(m2) alert2.ID = hash(m2)
const evalInterval = time.Millisecond
finished := make(chan struct{}) finished := make(chan struct{})
fs.add(m1) fs.add(m1)
fs.add(m2) fs.add(m2)
go func() { go func() {
g.start(context.Background(), evalInterval, fs, fn, nil) g.start(context.Background(), fs, fn, nil)
close(finished) close(finished)
}() }()
@ -197,21 +223,3 @@ func compareAlerts(t *testing.T, as, bs []notifier.Alert) {
} }
} }
} }
type fakeNotifier struct {
sync.Mutex
alerts []notifier.Alert
}
func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error {
fn.Lock()
defer fn.Unlock()
fn.alerts = alerts
return nil
}
func (fn *fakeNotifier) getAlerts() []notifier.Alert {
fn.Lock()
defer fn.Unlock()
return fn.alerts
}

200
app/vmalert/helpers_test.go Normal file
View file

@ -0,0 +1,200 @@
package main
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
type fakeQuerier struct {
sync.Mutex
metrics []datasource.Metric
err error
}
func (fq *fakeQuerier) setErr(err error) {
fq.Lock()
fq.err = err
fq.Unlock()
}
func (fq *fakeQuerier) reset() {
fq.Lock()
fq.err = nil
fq.metrics = fq.metrics[:0]
fq.Unlock()
}
func (fq *fakeQuerier) add(metrics ...datasource.Metric) {
fq.Lock()
fq.metrics = append(fq.metrics, metrics...)
fq.Unlock()
}
func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) {
fq.Lock()
defer fq.Unlock()
if fq.err != nil {
return nil, fq.err
}
cp := make([]datasource.Metric, len(fq.metrics))
copy(cp, fq.metrics)
return cp, nil
}
type fakeNotifier struct {
sync.Mutex
alerts []notifier.Alert
}
func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error {
fn.Lock()
defer fn.Unlock()
fn.alerts = alerts
return nil
}
func (fn *fakeNotifier) getAlerts() []notifier.Alert {
fn.Lock()
defer fn.Unlock()
return fn.alerts
}
func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric {
t.Helper()
m := metricWithLabels(t, labels...)
m.Value = value
return m
}
func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
t.Helper()
if len(labels) == 0 || len(labels)%2 != 0 {
t.Fatalf("expected to get even number of labels")
}
m := datasource.Metric{}
for i := 0; i < len(labels); i += 2 {
m.Labels = append(m.Labels, datasource.Label{
Name: labels[i],
Value: labels[i+1],
})
}
return m
}
func compareGroups(t *testing.T, a, b *Group) {
t.Helper()
if a.Name != b.Name {
t.Fatalf("expected group name %q; got %q", a.Name, b.Name)
}
if a.File != b.File {
t.Fatalf("expected group %q file name %q; got %q", a.Name, a.File, b.File)
}
if a.Interval != b.Interval {
t.Fatalf("expected group %q interval %v; got %v", a.Name, a.Interval, b.Interval)
}
if len(a.Rules) != len(b.Rules) {
t.Fatalf("expected group %s to have %d rules; got: %d",
a.Name, len(a.Rules), len(b.Rules))
}
for i, r := range a.Rules {
got, want := r, b.Rules[i]
if a.ID() != b.ID() {
t.Fatalf("expected to have rule %q; got %q", want.ID(), got.ID())
}
if err := compareRules(t, want, got); err != nil {
t.Fatalf("comparsion error: %s", err)
}
}
}
func compareRules(t *testing.T, a, b Rule) error {
t.Helper()
switch v := a.(type) {
case *AlertingRule:
br, ok := b.(*AlertingRule)
if !ok {
return fmt.Errorf("rule %q supposed to be of type AlertingRule", b.ID())
}
return compareAlertingRules(t, v, br)
case *RecordingRule:
br, ok := b.(*RecordingRule)
if !ok {
return fmt.Errorf("rule %q supposed to be of type RecordingRule", b.ID())
}
return compareRecordingRules(t, v, br)
default:
return fmt.Errorf("unexpected rule type received %T", a)
}
}
func compareRecordingRules(t *testing.T, a, b *RecordingRule) error {
t.Helper()
if a.Expr != b.Expr {
return fmt.Errorf("expected to have expression %q; got %q", a.Expr, b.Expr)
}
if !reflect.DeepEqual(a.Labels, b.Labels) {
return fmt.Errorf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
}
return nil
}
func compareAlertingRules(t *testing.T, a, b *AlertingRule) error {
t.Helper()
if a.Expr != b.Expr {
return fmt.Errorf("expected to have expression %q; got %q", a.Expr, b.Expr)
}
if a.For != b.For {
return fmt.Errorf("expected to have for %q; got %q", a.For, b.For)
}
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
return fmt.Errorf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations)
}
if !reflect.DeepEqual(a.Labels, b.Labels) {
return fmt.Errorf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
}
return nil
}
func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error {
t.Helper()
if len(a) != len(b) {
return fmt.Errorf("expected number of timeseries %d; got %d", len(a), len(b))
}
for i := range a {
expTS, gotTS := a[i], b[i]
if len(expTS.Samples) != len(gotTS.Samples) {
return fmt.Errorf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples))
}
for i, exp := range expTS.Samples {
got := gotTS.Samples[i]
if got.Value != exp.Value {
return fmt.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value)
}
// timestamp validation isn't always correct for now.
// this must be improved with time mock.
/*if got.Timestamp != exp.Timestamp {
return fmt.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp)
}*/
}
if len(expTS.Labels) != len(gotTS.Labels) {
return fmt.Errorf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels))
}
for i, exp := range expTS.Labels {
got := gotTS.Labels[i]
if got.Name != exp.Name {
return fmt.Errorf("expected label name %q; got %q", exp.Name, got.Name)
}
if got.Value != exp.Value {
return fmt.Errorf("expected label value %q; got %q", exp.Value, got.Value)
}
}
}
return nil
}

View file

@ -42,7 +42,9 @@ absolute path to all .yaml files in root.`)
" in form of timeseries. E.g. http://127.0.0.1:8428") " in form of timeseries. E.g. http://127.0.0.1:8428")
remoteWriteUsername = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username for -remoteWrite.url") remoteWriteUsername = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username for -remoteWrite.url")
remoteWritePassword = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password for -remoteWrite.url") remoteWritePassword = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password for -remoteWrite.url")
remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 10e3, "Defines the max number of pending datapoints to remote write endpoint") remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 1e5, "Defines the max number of pending datapoints to remote write endpoint")
remoteWriteMaxBatchSize = flag.Int("remoteWrite.maxBatchSize", 1e3, "Defines defines max number of timeseries to be flushed at once")
remoteWriteConcurrency = flag.Int("remoteWrite.concurrency", 1, "Defines number of readers that concurrently write into remote storage")
remoteReadURL = flag.String("remoteRead.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+ remoteReadURL = flag.String("remoteRead.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+
" state. This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state."+ " state. This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state."+
@ -52,7 +54,7 @@ absolute path to all .yaml files in root.`)
remoteReadLookBack = flag.Duration("remoteRead.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries."+ remoteReadLookBack = flag.Duration("remoteRead.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries."+
" For example, if lookback=1h then range from now() to now()-1h will be scanned.") " For example, if lookback=1h then range from now() to now()-1h will be scanned.")
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules. Default 1m") evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules")
notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093")
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier")
) )
@ -81,7 +83,9 @@ func main() {
if *remoteWriteURL != "" { if *remoteWriteURL != "" {
c, err := remotewrite.NewClient(ctx, remotewrite.Config{ c, err := remotewrite.NewClient(ctx, remotewrite.Config{
Addr: *remoteWriteURL, Addr: *remoteWriteURL,
Concurrency: *remoteWriteConcurrency,
MaxQueueSize: *remoteWriteMaxQueueSize, MaxQueueSize: *remoteWriteMaxQueueSize,
MaxBatchSize: *remoteWriteMaxBatchSize,
FlushInterval: *evaluationInterval, FlushInterval: *evaluationInterval,
BasicAuthUser: *remoteWriteUsername, BasicAuthUser: *remoteWriteUsername,
BasicAuthPass: *remoteWritePassword, BasicAuthPass: *remoteWritePassword,

View file

@ -6,12 +6,14 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// manager controls group states
type manager struct { type manager struct {
storage datasource.Querier storage datasource.Querier
notifier notifier.Notifier notifier notifier.Notifier
@ -25,7 +27,7 @@ type manager struct {
groups map[uint64]*Group groups map[uint64]*Group
} }
// AlertAPI generates APIAlert object from alert by its id(hash) // AlertAPI generates APIAlert object from alert by its ID(hash)
func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
m.groupsMu.RLock() m.groupsMu.RLock()
defer m.groupsMu.RUnlock() defer m.groupsMu.RUnlock()
@ -35,11 +37,15 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
return nil, fmt.Errorf("can't find group with id %q", gID) return nil, fmt.Errorf("can't find group with id %q", gID)
} }
for _, rule := range g.Rules { for _, rule := range g.Rules {
if apiAlert := rule.AlertAPI(aID); apiAlert != nil { ar, ok := rule.(*AlertingRule)
if !ok {
continue
}
if apiAlert := ar.AlertAPI(aID); apiAlert != nil {
return apiAlert, nil return apiAlert, nil
} }
} }
return nil, fmt.Errorf("can't func alert with id %q in group %q", aID, g.Name) return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name)
} }
func (m *manager) start(ctx context.Context, path []string, validate bool) error { func (m *manager) start(ctx context.Context, path []string, validate bool) error {
@ -56,7 +62,7 @@ func (m *manager) close() {
m.wg.Wait() m.wg.Wait()
} }
func (m *manager) startGroup(ctx context.Context, group Group, restore bool) { func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) {
if restore && m.rr != nil { if restore && m.rr != nil {
err := group.Restore(ctx, m.rr, *remoteReadLookBack) err := group.Restore(ctx, m.rr, *remoteReadLookBack)
if err != nil { if err != nil {
@ -67,21 +73,22 @@ func (m *manager) startGroup(ctx context.Context, group Group, restore bool) {
m.wg.Add(1) m.wg.Add(1)
id := group.ID() id := group.ID()
go func() { go func() {
group.start(ctx, *evaluationInterval, m.storage, m.notifier, m.rw) group.start(ctx, m.storage, m.notifier, m.rw)
m.wg.Done() m.wg.Done()
}() }()
m.groups[id] = &group m.groups[id] = group
} }
func (m *manager) update(ctx context.Context, path []string, validate, restore bool) error { func (m *manager) update(ctx context.Context, path []string, validate, restore bool) error {
logger.Infof("reading alert rules configuration file from %q", strings.Join(path, ";")) logger.Infof("reading rules configuration file from %q", strings.Join(path, ";"))
newGroups, err := Parse(path, validate) groupsCfg, err := config.Parse(path, validate)
if err != nil { if err != nil {
return fmt.Errorf("cannot parse configuration file: %s", err) return fmt.Errorf("cannot parse configuration file: %s", err)
} }
groupsRegistry := make(map[uint64]Group) groupsRegistry := make(map[uint64]*Group)
for _, ng := range newGroups { for _, cfg := range groupsCfg {
ng := newGroup(cfg, *evaluationInterval)
groupsRegistry[ng.ID()] = ng groupsRegistry[ng.ID()] = ng
} }
@ -106,3 +113,22 @@ func (m *manager) update(ctx context.Context, path []string, validate, restore b
m.groupsMu.Unlock() m.groupsMu.Unlock()
return nil return nil
} }
func (g *Group) toAPI() APIGroup {
ag := APIGroup{
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", g.ID()),
Name: g.Name,
File: g.File,
Interval: g.Interval.String(),
}
for _, r := range g.Rules {
switch v := r.(type) {
case *AlertingRule:
ag.AlertingRules = append(ag.AlertingRules, v.RuleAPI())
case *RecordingRule:
ag.RecordingRules = append(ag.RecordingRules, v.RuleAPI())
}
}
return ag
}

View file

@ -3,12 +3,22 @@ package main
import ( import (
"context" "context"
"math/rand" "math/rand"
"net/url"
"os"
"strings" "strings"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
) )
func TestMain(m *testing.M) {
u, _ := url.Parse("https://victoriametrics.com/path")
notifier.InitTemplateFunc(u)
os.Exit(m.Run())
}
func TestManagerUpdateError(t *testing.T) { func TestManagerUpdateError(t *testing.T) {
m := &manager{groups: make(map[uint64]*Group)} m := &manager{groups: make(map[uint64]*Group)}
path := []string{"foo/bar"} path := []string{"foo/bar"}
@ -32,9 +42,13 @@ func TestManagerUpdateConcurrent(t *testing.T) {
notifier: &fakeNotifier{}, notifier: &fakeNotifier{},
} }
paths := []string{ paths := []string{
"testdata/dir/rules0-good.rules", "config/testdata/dir/rules0-good.rules",
"testdata/dir/rules1-good.rules", "config/testdata/dir/rules0-bad.rules",
"testdata/rules0-good.rules", "config/testdata/dir/rules1-good.rules",
"config/testdata/dir/rules1-bad.rules",
"config/testdata/rules0-good.rules",
"config/testdata/rules1-good.rules",
"config/testdata/rules2-good.rules",
} }
*evaluationInterval = time.Millisecond *evaluationInterval = time.Millisecond
if err := m.start(context.Background(), []string{paths[0]}, true); err != nil { if err := m.start(context.Background(), []string{paths[0]}, true); err != nil {
@ -51,10 +65,7 @@ func TestManagerUpdateConcurrent(t *testing.T) {
for i := 0; i < iterations; i++ { for i := 0; i < iterations; i++ {
rnd := rand.Intn(len(paths)) rnd := rand.Intn(len(paths))
path := []string{paths[rnd]} path := []string{paths[rnd]}
err := m.update(context.Background(), path, true, false) _ = m.update(context.Background(), path, true, false)
if err != nil {
t.Errorf("update error: %s", err)
}
} }
}() }()
} }
@ -64,6 +75,41 @@ func TestManagerUpdateConcurrent(t *testing.T) {
// TestManagerUpdate tests sequential configuration // TestManagerUpdate tests sequential configuration
// updates. // updates.
func TestManagerUpdate(t *testing.T) { func TestManagerUpdate(t *testing.T) {
const defaultEvalInterval = time.Second * 30
currentEvalInterval := *evaluationInterval
*evaluationInterval = defaultEvalInterval
defer func() {
*evaluationInterval = currentEvalInterval
}()
var (
VMRows = &AlertingRule{
Name: "VMRows",
Expr: "vm_rows > 0",
For: 10 * time.Second,
Labels: map[string]string{
"label": "bar",
"host": "{{ $labels.instance }}",
},
Annotations: map[string]string{
"summary": "{{ $value|humanize }}",
"description": "{{$labels}}",
},
}
Conns = &AlertingRule{
Name: "Conns",
Expr: "sum(vm_tcplistener_conns) by(instance) > 1",
Annotations: map[string]string{
"summary": "Too high connection number for {{$labels.instance}}",
"description": "It is {{ $value }} connections for {{$labels.instance}}",
},
}
ExampleAlertAlwaysFiring = &AlertingRule{
Name: "ExampleAlertAlwaysFiring",
Expr: "sum by(job) (up == 1)",
}
)
testCases := []struct { testCases := []struct {
name string name string
initPath string initPath string
@ -72,49 +118,65 @@ func TestManagerUpdate(t *testing.T) {
}{ }{
{ {
name: "update good rules", name: "update good rules",
initPath: "testdata/rules0-good.rules", initPath: "config/testdata/rules0-good.rules",
updatePath: "testdata/dir/rules1-good.rules", updatePath: "config/testdata/dir/rules1-good.rules",
want: []*Group{ want: []*Group{
{ {
File: "testdata/dir/rules1-good.rules", File: "config/testdata/dir/rules1-good.rules",
Name: "duplicatedGroupDiffFiles", Name: "duplicatedGroupDiffFiles",
Rules: []*Rule{newTestRule("VMRows", time.Second*10)}, Interval: defaultEvalInterval,
Rules: []Rule{
&AlertingRule{
Name: "VMRows",
Expr: "vm_rows > 0",
For: 5 * time.Minute,
Labels: map[string]string{"label": "bar"},
Annotations: map[string]string{
"summary": "{{ $value }}",
"description": "{{$labels}}",
},
},
},
}, },
}, },
}, },
{ {
name: "update good rules from 1 to 2 groups", name: "update good rules from 1 to 2 groups",
initPath: "testdata/dir/rules1-good.rules", initPath: "config/testdata/dir/rules1-good.rules",
updatePath: "testdata/rules0-good.rules", updatePath: "config/testdata/rules0-good.rules",
want: []*Group{ want: []*Group{
{ {
File: "testdata/rules0-good.rules", File: "config/testdata/rules0-good.rules",
Name: "groupGorSingleAlert", Rules: []*Rule{ Name: "groupGorSingleAlert",
newTestRule("VMRows", time.Second*10), Rules: []Rule{VMRows},
}}, Interval: defaultEvalInterval,
},
{ {
File: "testdata/rules0-good.rules", File: "config/testdata/rules0-good.rules",
Name: "TestGroup", Rules: []*Rule{ Interval: defaultEvalInterval,
newTestRule("Conns", time.Duration(0)), Name: "TestGroup", Rules: []Rule{
newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), Conns,
ExampleAlertAlwaysFiring,
}}, }},
}, },
}, },
{ {
name: "update with one bad rule file", name: "update with one bad rule file",
initPath: "testdata/rules0-good.rules", initPath: "config/testdata/rules0-good.rules",
updatePath: "testdata/dir/rules2-bad.rules", updatePath: "config/testdata/dir/rules2-bad.rules",
want: []*Group{ want: []*Group{
{ {
File: "testdata/rules0-good.rules", File: "config/testdata/rules0-good.rules",
Name: "groupGorSingleAlert", Rules: []*Rule{ Name: "groupGorSingleAlert",
newTestRule("VMRows", time.Second*10), Interval: defaultEvalInterval,
}}, Rules: []Rule{VMRows},
},
{ {
File: "testdata/rules0-good.rules", File: "config/testdata/rules0-good.rules",
Name: "TestGroup", Rules: []*Rule{ Interval: defaultEvalInterval,
newTestRule("Conns", time.Duration(0)), Name: "TestGroup", Rules: []Rule{
newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), Conns,
ExampleAlertAlwaysFiring,
}}, }},
}, },
}, },
@ -139,7 +201,7 @@ func TestManagerUpdate(t *testing.T) {
if !ok { if !ok {
t.Fatalf("expected to have group %q", wantG.Name) t.Fatalf("expected to have group %q", wantG.Name)
} }
compareGroups(t, gotG, wantG) compareGroups(t, wantG, gotG)
} }
cancel() cancel()
@ -147,17 +209,3 @@ func TestManagerUpdate(t *testing.T) {
}) })
} }
} }
func compareGroups(t *testing.T, a, b *Group) {
t.Helper()
if len(a.Rules) != len(b.Rules) {
t.Fatalf("expected group %s to have %d rules; got: %d",
a.Name, len(a.Rules), len(b.Rules))
}
for i, r := range a.Rules {
got, want := r, b.Rules[i]
if got.Name != want.Name {
t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name)
}
}
}

View file

@ -87,7 +87,7 @@ func templateAnnotations(annotations map[string]string, header string, data aler
builder.WriteString(header) builder.WriteString(header)
builder.WriteString(text) builder.WriteString(text)
if err := templateAnnotation(&buf, builder.String(), data); err != nil { if err := templateAnnotation(&buf, builder.String(), data); err != nil {
eg.errs = append(eg.errs, fmt.Sprintf("key %s, template %s:%s", key, text, err)) eg.errs = append(eg.errs, fmt.Sprintf("key %q, template %q: %s", key, text, err))
continue continue
} }
r[key] = buf.String() r[key] = buf.String()
@ -98,10 +98,10 @@ func templateAnnotations(annotations map[string]string, header string, data aler
func templateAnnotation(dst io.Writer, text string, data alertTplData) error { func templateAnnotation(dst io.Writer, text string, data alertTplData) error {
tpl, err := template.New("").Funcs(tmplFunc).Option("missingkey=zero").Parse(text) tpl, err := template.New("").Funcs(tmplFunc).Option("missingkey=zero").Parse(text)
if err != nil { if err != nil {
return fmt.Errorf("error parsing annotation:%w", err) return fmt.Errorf("error parsing annotation: %w", err)
} }
if err = tpl.Execute(dst, data); err != nil { if err = tpl.Execute(dst, data); err != nil {
return fmt.Errorf("error evaluating annotation template:%w", err) return fmt.Errorf("error evaluating annotation template: %w", err)
} }
return nil return nil
} }

View file

@ -1,131 +1,131 @@
// Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT. // Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details. // See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/notifier/alertmanager_request.qtpl:1 //line notifier/alertmanager_request.qtpl:1
package notifier package notifier
//line app/vmalert/notifier/alertmanager_request.qtpl:1 //line notifier/alertmanager_request.qtpl:1
import ( import (
"strconv" "strconv"
"time" "time"
) )
//line app/vmalert/notifier/alertmanager_request.qtpl:7 //line notifier/alertmanager_request.qtpl:7
import ( import (
qtio422016 "io" qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate" qt422016 "github.com/valyala/quicktemplate"
) )
//line app/vmalert/notifier/alertmanager_request.qtpl:7 //line notifier/alertmanager_request.qtpl:7
var ( var (
_ = qtio422016.Copy _ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer _ = qt422016.AcquireByteBuffer
) )
//line app/vmalert/notifier/alertmanager_request.qtpl:7 //line notifier/alertmanager_request.qtpl:7
func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) { func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line app/vmalert/notifier/alertmanager_request.qtpl:7 //line notifier/alertmanager_request.qtpl:7
qw422016.N().S(`[`) qw422016.N().S(`[`)
//line app/vmalert/notifier/alertmanager_request.qtpl:9 //line notifier/alertmanager_request.qtpl:9
for i, alert := range alerts { for i, alert := range alerts {
//line app/vmalert/notifier/alertmanager_request.qtpl:9 //line notifier/alertmanager_request.qtpl:9
qw422016.N().S(`{"startsAt":`) qw422016.N().S(`{"startsAt":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:11 //line notifier/alertmanager_request.qtpl:11
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano)) qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
//line app/vmalert/notifier/alertmanager_request.qtpl:11 //line notifier/alertmanager_request.qtpl:11
qw422016.N().S(`,"generatorURL":`) qw422016.N().S(`,"generatorURL":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:12 //line notifier/alertmanager_request.qtpl:12
qw422016.N().Q(generatorURL(strconv.FormatUint(alert.GroupID, 10), strconv.FormatUint(alert.ID, 10))) qw422016.N().Q(generatorURL(strconv.FormatUint(alert.GroupID, 10), strconv.FormatUint(alert.ID, 10)))
//line app/vmalert/notifier/alertmanager_request.qtpl:12 //line notifier/alertmanager_request.qtpl:12
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:13 //line notifier/alertmanager_request.qtpl:13
if !alert.End.IsZero() { if !alert.End.IsZero() {
//line app/vmalert/notifier/alertmanager_request.qtpl:13 //line notifier/alertmanager_request.qtpl:13
qw422016.N().S(`"endsAt":`) qw422016.N().S(`"endsAt":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:14 //line notifier/alertmanager_request.qtpl:14
qw422016.N().Q(alert.End.Format(time.RFC3339Nano)) qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
//line app/vmalert/notifier/alertmanager_request.qtpl:14 //line notifier/alertmanager_request.qtpl:14
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:15 //line notifier/alertmanager_request.qtpl:15
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:15 //line notifier/alertmanager_request.qtpl:15
qw422016.N().S(`"labels": {"alertname":`) qw422016.N().S(`"labels": {"alertname":`)
//line app/vmalert/notifier/alertmanager_request.qtpl:17 //line notifier/alertmanager_request.qtpl:17
qw422016.N().Q(alert.Name) qw422016.N().Q(alert.Name)
//line app/vmalert/notifier/alertmanager_request.qtpl:18 //line notifier/alertmanager_request.qtpl:18
for k, v := range alert.Labels { for k, v := range alert.Labels {
//line app/vmalert/notifier/alertmanager_request.qtpl:18 //line notifier/alertmanager_request.qtpl:18
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:19 //line notifier/alertmanager_request.qtpl:19
qw422016.N().Q(k) qw422016.N().Q(k)
//line app/vmalert/notifier/alertmanager_request.qtpl:19 //line notifier/alertmanager_request.qtpl:19
qw422016.N().S(`:`) qw422016.N().S(`:`)
//line app/vmalert/notifier/alertmanager_request.qtpl:19 //line notifier/alertmanager_request.qtpl:19
qw422016.N().Q(v) qw422016.N().Q(v)
//line app/vmalert/notifier/alertmanager_request.qtpl:20 //line notifier/alertmanager_request.qtpl:20
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:20 //line notifier/alertmanager_request.qtpl:20
qw422016.N().S(`},"annotations": {`) qw422016.N().S(`},"annotations": {`)
//line app/vmalert/notifier/alertmanager_request.qtpl:23 //line notifier/alertmanager_request.qtpl:23
c := len(alert.Annotations) c := len(alert.Annotations)
//line app/vmalert/notifier/alertmanager_request.qtpl:24 //line notifier/alertmanager_request.qtpl:24
for k, v := range alert.Annotations { for k, v := range alert.Annotations {
//line app/vmalert/notifier/alertmanager_request.qtpl:25 //line notifier/alertmanager_request.qtpl:25
c = c - 1 c = c - 1
//line app/vmalert/notifier/alertmanager_request.qtpl:26 //line notifier/alertmanager_request.qtpl:26
qw422016.N().Q(k) qw422016.N().Q(k)
//line app/vmalert/notifier/alertmanager_request.qtpl:26 //line notifier/alertmanager_request.qtpl:26
qw422016.N().S(`:`) qw422016.N().S(`:`)
//line app/vmalert/notifier/alertmanager_request.qtpl:26 //line notifier/alertmanager_request.qtpl:26
qw422016.N().Q(v) qw422016.N().Q(v)
//line app/vmalert/notifier/alertmanager_request.qtpl:26 //line notifier/alertmanager_request.qtpl:26
if c > 0 { if c > 0 {
//line app/vmalert/notifier/alertmanager_request.qtpl:26 //line notifier/alertmanager_request.qtpl:26
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:26 //line notifier/alertmanager_request.qtpl:26
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:27 //line notifier/alertmanager_request.qtpl:27
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:27 //line notifier/alertmanager_request.qtpl:27
qw422016.N().S(`}}`) qw422016.N().S(`}}`)
//line app/vmalert/notifier/alertmanager_request.qtpl:30 //line notifier/alertmanager_request.qtpl:30
if i != len(alerts)-1 { if i != len(alerts)-1 {
//line app/vmalert/notifier/alertmanager_request.qtpl:30 //line notifier/alertmanager_request.qtpl:30
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmalert/notifier/alertmanager_request.qtpl:30 //line notifier/alertmanager_request.qtpl:30
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:31 //line notifier/alertmanager_request.qtpl:31
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:31 //line notifier/alertmanager_request.qtpl:31
qw422016.N().S(`]`) qw422016.N().S(`]`)
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) { func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
streamamRequest(qw422016, alerts, generatorURL) streamamRequest(qw422016, alerts, generatorURL)
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
} }
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
func amRequest(alerts []Alert, generatorURL func(string, string) string) string { func amRequest(alerts []Alert, generatorURL func(string, string) string) string {
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
writeamRequest(qb422016, alerts, generatorURL) writeamRequest(qb422016, alerts, generatorURL)
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
return qs422016 return qs422016
//line app/vmalert/notifier/alertmanager_request.qtpl:33 //line notifier/alertmanager_request.qtpl:33
} }

View file

@ -17,5 +17,5 @@ func (eg *errGroup) err() error {
} }
func (eg *errGroup) Error() string { func (eg *errGroup) Error() string {
return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n")) return fmt.Sprintf("errors: %s", strings.Join(eg.errs, "\n"))
} }

151
app/vmalert/recording.go Normal file
View file

@ -0,0 +1,151 @@
package main
import (
"context"
"errors"
"fmt"
"hash/fnv"
"sort"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// RecordingRule is a Rule that supposed
// to evaluate configured Expression and
// return TimeSeries as result.
type RecordingRule struct {
Name string
Expr string
Labels map[string]string
GroupID uint64
// guard status fields
mu sync.RWMutex
// stores last moment of time Exec was called
lastExecTime time.Time
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health state
lastExecError error
}
// String implements Stringer interface
func (rr *RecordingRule) String() string {
return rr.Name
}
// ID returns unique Rule ID
// within the parent Group.
func (rr *RecordingRule) ID() uint64 {
hash := fnv.New64a()
hash.Write([]byte("alerting"))
hash.Write([]byte("\xff"))
hash.Write([]byte(rr.Name))
return hash.Sum64()
}
func newRecordingRule(gID uint64, cfg config.Rule) *RecordingRule {
return &RecordingRule{
Name: cfg.Record,
Expr: cfg.Expr,
Labels: cfg.Labels,
GroupID: gID,
}
}
var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels")
// Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) {
if !series {
return nil, nil
}
qMetrics, err := q.Query(ctx, rr.Expr)
rr.mu.Lock()
defer rr.mu.Unlock()
rr.lastExecTime = time.Now()
rr.lastExecError = err
if err != nil {
return nil, fmt.Errorf("failed to execute query %q: %s", rr.Expr, err)
}
duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics))
var tss []prompbmarshal.TimeSeries
for _, r := range qMetrics {
ts := rr.toTimeSeries(r, rr.lastExecTime)
h := hashTimeSeries(ts)
if _, ok := duplicates[h]; ok {
rr.lastExecError = errDuplicate
return nil, errDuplicate
}
duplicates[h] = ts
tss = append(tss, ts)
}
return tss, nil
}
func hashTimeSeries(ts prompbmarshal.TimeSeries) uint64 {
hash := fnv.New64a()
labels := ts.Labels
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
for _, l := range labels {
hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value))
hash.Write([]byte("\xff"))
}
return hash.Sum64()
}
func (rr *RecordingRule) toTimeSeries(m datasource.Metric, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for _, l := range m.Labels {
labels[l.Name] = l.Value
}
labels["__name__"] = rr.Name
// override existing labels with configured ones
for k, v := range rr.Labels {
labels[k] = v
}
return newTimeSeries(m.Value, labels, timestamp)
}
// copy all significant fields.
// alerts state isn't copied since
// it should be updated in next 2 Execs
func (rr *RecordingRule) UpdateWith(r Rule) error {
nr, ok := r.(*RecordingRule)
if !ok {
return fmt.Errorf("BUG: attempt to update recroding rule with wrong type %#v", r)
}
rr.Expr = nr.Expr
rr.Labels = nr.Labels
return nil
}
// RuleAPI returns Rule representation in form
// of APIRecordingRule
func (rr *RecordingRule) RuleAPI() APIRecordingRule {
var lastErr string
if rr.lastExecError != nil {
lastErr = rr.lastExecError.Error()
}
return APIRecordingRule{
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", rr.ID()),
GroupID: fmt.Sprintf("%d", rr.GroupID),
Name: rr.Name,
Expression: rr.Expr,
LastError: lastErr,
LastExec: rr.lastExecTime,
Labels: rr.Labels,
}
}

View file

@ -0,0 +1,121 @@
package main
import (
"context"
"errors"
"strings"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestRecoridngRule_ToTimeSeries(t *testing.T) {
timestamp := time.Now()
testCases := []struct {
rule *RecordingRule
metrics []datasource.Metric
expTS []prompbmarshal.TimeSeries
}{
{
&RecordingRule{Name: "foo"},
[]datasource.Metric{metricWithValueAndLabels(t, 10,
"__name__", "bar",
)},
[]prompbmarshal.TimeSeries{
newTimeSeries(10, map[string]string{
"__name__": "foo",
}, timestamp),
},
},
{
&RecordingRule{Name: "foobarbaz"},
[]datasource.Metric{
metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"),
metricWithValueAndLabels(t, 2, "__name__", "bar", "job", "bar"),
metricWithValueAndLabels(t, 3, "__name__", "baz", "job", "baz"),
},
[]prompbmarshal.TimeSeries{
newTimeSeries(1, map[string]string{
"__name__": "foobarbaz",
"job": "foo",
}, timestamp),
newTimeSeries(2, map[string]string{
"__name__": "foobarbaz",
"job": "bar",
}, timestamp),
newTimeSeries(3, map[string]string{
"__name__": "foobarbaz",
"job": "baz",
}, timestamp),
},
},
{
&RecordingRule{Name: "job:foo", Labels: map[string]string{
"source": "test",
}},
[]datasource.Metric{
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar")},
[]prompbmarshal.TimeSeries{
newTimeSeries(2, map[string]string{
"__name__": "job:foo",
"job": "foo",
"source": "test",
}, timestamp),
newTimeSeries(1, map[string]string{
"__name__": "job:foo",
"job": "bar",
"source": "test",
}, timestamp),
},
},
}
for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{}
fq.add(tc.metrics...)
tss, err := tc.rule.Exec(context.TODO(), fq, true)
if err != nil {
t.Fatalf("unexpected Exec err: %s", err)
}
if err := compareTimeSeries(t, tc.expTS, tss); err != nil {
t.Fatalf("timeseries missmatch: %s", err)
}
})
}
}
func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) {
rr := &RecordingRule{Name: "job:foo", Labels: map[string]string{
"job": "test",
}}
fq := &fakeQuerier{}
expErr := "connection reset by peer"
fq.setErr(errors.New(expErr))
_, err := rr.Exec(context.TODO(), fq, true)
if err == nil {
t.Fatalf("expected to get err; got nil")
}
if !strings.Contains(err.Error(), expErr) {
t.Fatalf("expected to get err %q; got %q insterad", expErr, err)
}
fq.reset()
// add metrics which differs only by `job` label
// which will be overridden by rule
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"))
fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar"))
_, err = rr.Exec(context.TODO(), fq, true)
if err == nil {
t.Fatalf("expected to get err; got nil")
}
if !strings.Contains(err.Error(), errDuplicate.Error()) {
t.Fatalf("expected to get err %q; got %q insterad", errDuplicate, err)
}
}

View file

@ -38,11 +38,15 @@ type Config struct {
BasicAuthUser string BasicAuthUser string
BasicAuthPass string BasicAuthPass string
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries // MaxBatchSize defines max number of timeseries
// to be flushed at once // to be flushed at once
MaxBatchSize int MaxBatchSize int
// MaxQueueSize defines max length of input queue // MaxQueueSize defines max length of input queue
// populated by Push method // populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int MaxQueueSize int
// FlushInterval defines time interval for flushing batches // FlushInterval defines time interval for flushing batches
FlushInterval time.Duration FlushInterval time.Duration
@ -52,9 +56,10 @@ type Config struct {
} }
const ( const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3 defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 100 defaultMaxQueueSize = 1e5
defaultFlushInterval = 5 * time.Second defaultFlushInterval = time.Second
defaultWriteTimeout = 30 * time.Second defaultWriteTimeout = 30 * time.Second
) )
@ -90,7 +95,13 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
doneCh: make(chan struct{}), doneCh: make(chan struct{}),
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize), input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
} }
c.run(ctx) cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
for i := 0; i < cc; i++ {
c.run(ctx)
}
return c, nil return c, nil
} }
@ -128,7 +139,10 @@ func (c *Client) run(ctx context.Context) {
for ts := range c.input { for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts) wr.Timeseries = append(wr.Timeseries, ts)
} }
lastCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) if len(wr.Timeseries) < 1 {
return
}
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
c.flush(lastCtx, wr) c.flush(lastCtx, wr)
cancel() cancel()
} }

View file

@ -2,339 +2,23 @@ package main
import ( import (
"context" "context"
"errors"
"fmt"
"hash/fnv"
"sort"
"strconv"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metricsql"
) )
// Rule is basic alert entity // Rule represents alerting or recording rule
type Rule struct { // that has unique ID, can be Executed and
Name string `yaml:"alert"` // updated with other Rule.
Expr string `yaml:"expr"` type Rule interface {
For time.Duration `yaml:"for"` // Returns unique ID that may be used for
Labels map[string]string `yaml:"labels"` // identifying this Rule among others.
Annotations map[string]string `yaml:"annotations"` ID() uint64
// Exec executes the rule with given context
group Group // and Querier. If returnSeries is true, Exec
// may return TimeSeries as result of execution
// guard status fields Exec(ctx context.Context, q datasource.Querier, returnSeries bool) ([]prompbmarshal.TimeSeries, error)
mu sync.RWMutex // UpdateWith performs modification of current Rule
// stores list of active alerts // with fields of the given Rule.
alerts map[uint64]*notifier.Alert UpdateWith(Rule) error
// stores last moment of time Exec was called
lastExecTime time.Time
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health state
lastExecError error
}
func (r *Rule) id() string {
return r.Name
}
// Validate validates rule
func (r *Rule) Validate() error {
if r.Name == "" {
return errors.New("rule name can not be empty")
}
if r.Expr == "" {
return fmt.Errorf("expression for rule %q can't be empty", r.Name)
}
if _, err := metricsql.Parse(r.Expr); err != nil {
return fmt.Errorf("invalid expression for rule %q: %w", r.Name, err)
}
return nil
}
// Exec executes Rule expression via the given Querier.
// Based on the Querier results Rule maintains notifier.Alerts
func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
qMetrics, err := q.Query(ctx, r.Expr)
r.mu.Lock()
defer r.mu.Unlock()
r.lastExecError = err
r.lastExecTime = time.Now()
if err != nil {
return fmt.Errorf("failed to execute query %q: %s", r.Expr, err)
}
for h, a := range r.alerts {
// cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive {
delete(r.alerts, h)
}
}
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range qMetrics {
h := hash(m)
updated[h] = struct{}{}
if a, ok := r.alerts[h]; ok {
if a.Value != m.Value {
// update Value field with latest value
a.Value = m.Value
// and re-exec template since Value can be used
// in templates
err = r.template(a)
if err != nil {
return err
}
}
continue
}
a, err := r.newAlert(m)
if err != nil {
r.lastExecError = err
return fmt.Errorf("failed to create alert: %s", err)
}
a.ID = h
a.State = notifier.StatePending
r.alerts[h] = a
}
for h, a := range r.alerts {
// if alert wasn't updated in this iteration
// means it is resolved already
if _, ok := updated[h]; !ok {
if a.State == notifier.StatePending {
// alert was in Pending state - it is not
// active anymore
delete(r.alerts, h)
continue
}
a.State = notifier.StateInactive
continue
}
if a.State == notifier.StatePending && time.Since(a.Start) >= r.For {
a.State = notifier.StateFiring
alertsFired.Inc()
}
}
return nil
}
// TODO: consider hashing algorithm in VM
func hash(m datasource.Metric) uint64 {
hash := fnv.New64a()
labels := m.Labels
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
for _, l := range labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value))
hash.Write([]byte("\xff"))
}
return hash.Sum64()
}
func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) {
a := &notifier.Alert{
GroupID: r.group.ID(),
Name: r.Name,
Labels: map[string]string{},
Value: m.Value,
Start: time.Now(),
Expr: r.Expr,
// TODO: support End time
}
for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
a.Labels[l.Name] = l.Value
}
return a, r.template(a)
}
func (r *Rule) template(a *notifier.Alert) error {
// 1. template rule labels with data labels
rLabels, err := a.ExecTemplate(r.Labels)
if err != nil {
return err
}
// 2. merge data labels and rule labels
// metric labels may be overridden by
// rule labels
for k, v := range rLabels {
a.Labels[k] = v
}
// 3. template merged labels
a.Labels, err = a.ExecTemplate(a.Labels)
if err != nil {
return err
}
a.Annotations, err = a.ExecTemplate(r.Annotations)
return err
}
// AlertAPI generates APIAlert object from alert by its id(hash)
func (r *Rule) AlertAPI(id uint64) *APIAlert {
r.mu.RLock()
defer r.mu.RUnlock()
a, ok := r.alerts[id]
if !ok {
return nil
}
return r.newAlertAPI(*a)
}
// AlertsAPI generates list of APIAlert objects from existing alerts
func (r *Rule) AlertsAPI() []*APIAlert {
var alerts []*APIAlert
r.mu.RLock()
for _, a := range r.alerts {
alerts = append(alerts, r.newAlertAPI(*a))
}
r.mu.RUnlock()
return alerts
}
func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert {
return &APIAlert{
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", a.ID),
GroupID: fmt.Sprintf("%d", a.GroupID),
Name: a.Name,
Expression: r.Expr,
Labels: a.Labels,
Annotations: a.Annotations,
State: a.State.String(),
ActiveAt: a.Start,
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
}
}
const (
// AlertMetricName is the metric name for synthetic alert timeseries.
alertMetricName = "ALERTS"
// AlertForStateMetricName is the metric name for 'for' state of alert.
alertForStateMetricName = "ALERTS_FOR_STATE"
// AlertNameLabel is the label name indicating the name of an alert.
alertNameLabel = "alertname"
// AlertStateLabel is the label name indicating the state of an alert.
alertStateLabel = "alertstate"
)
// AlertToTimeSeries converts the given alert with the given timestamp to timeseries
func (r *Rule) AlertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries
tss = append(tss, alertToTimeSeries(r.Name, a, timestamp))
if r.For > 0 {
tss = append(tss, alertForToTimeSeries(r.Name, a, timestamp))
}
return tss
}
func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for k, v := range a.Labels {
labels[k] = v
}
labels["__name__"] = alertMetricName
labels[alertNameLabel] = name
labels[alertStateLabel] = a.State.String()
return newTimeSeries(1, labels, timestamp)
}
// alertForToTimeSeries returns a timeseries that represents
// state of active alerts, where value is time when alert become active
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for k, v := range a.Labels {
labels[k] = v
}
labels["__name__"] = alertForStateMetricName
labels[alertNameLabel] = name
return newTimeSeries(float64(a.Start.Unix()), labels, timestamp)
}
func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries {
ts := prompbmarshal.TimeSeries{}
ts.Samples = append(ts.Samples, prompbmarshal.Sample{
Value: value,
Timestamp: timestamp.UnixNano() / 1e6,
})
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
ts.Labels = append(ts.Labels, prompbmarshal.Label{
Name: key,
Value: labels[key],
})
}
return ts
}
// Restore restores the state of active alerts basing on previously written timeseries.
// Restore restores only Start field. Field State will be always Pending and supposed
// to be updated on next Exec, as well as Value field.
func (r *Rule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error {
if q == nil {
return fmt.Errorf("querier is nil")
}
// Get the last datapoint in range via MetricsQL `last_over_time`.
// We don't use plain PromQL since Prometheus doesn't support
// remote write protocol which is used for state persistence in vmalert.
expr := fmt.Sprintf("last_over_time(%s{alertname=%q}[%ds])",
alertForStateMetricName, r.Name, int(lookback.Seconds()))
qMetrics, err := q.Query(ctx, expr)
if err != nil {
return err
}
for _, m := range qMetrics {
labels := m.Labels
m.Labels = make([]datasource.Label, 0)
// drop all extra labels, so hash key will
// be identical to timeseries received in Exec
for _, l := range labels {
if l.Name == alertNameLabel {
continue
}
// drop all overridden labels
if _, ok := r.Labels[l.Name]; ok {
continue
}
m.Labels = append(m.Labels, l)
}
a, err := r.newAlert(m)
if err != nil {
return fmt.Errorf("failed to create alert: %s", err)
}
a.ID = hash(m)
a.State = notifier.StatePending
a.Start = time.Unix(int64(m.Value), 0)
r.alerts[a.ID] = a
logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start)
}
return nil
} }

27
app/vmalert/utils.go Normal file
View file

@ -0,0 +1,27 @@
package main
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"sort"
"time"
)
func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries {
ts := prompbmarshal.TimeSeries{}
ts.Samples = append(ts.Samples, prompbmarshal.Sample{
Value: value,
Timestamp: timestamp.UnixNano() / 1e6,
})
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
ts.Labels = append(ts.Labels, prompbmarshal.Label{
Name: key,
Value: labels[key],
})
}
return ts
}

View file

@ -7,32 +7,18 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
) )
// APIAlert represents an notifier.Alert state
// for WEB view
type APIAlert struct {
ID string `json:"id"`
Name string `json:"name"`
GroupID string `json:"group_id"`
Expression string `json:"expression"`
State string `json:"state"`
Value string `json:"value"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
ActiveAt time.Time `json:"activeAt"`
}
type requestHandler struct { type requestHandler struct {
m *manager m *manager
} }
var pathList = [][]string{ var pathList = [][]string{
{"/api/v1/groups", "list all loaded groups and rules"},
{"/api/v1/alerts", "list all active alerts"}, {"/api/v1/alerts", "list all active alerts"},
{"/api/v1/groupID/alertID/status", "get alert status by ID"}, {"/api/v1/groupID/alertID/status", "get alert status by ID"},
// /metrics is served by httpserver by default // /metrics is served by httpserver by default
@ -49,8 +35,11 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc) fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
} }
return true return true
case "/api/v1/groups":
resph.handle(rh.listGroups())
return true
case "/api/v1/alerts": case "/api/v1/alerts":
resph.handle(rh.list()) resph.handle(rh.listAlerts())
return true return true
case "/-/reload": case "/-/reload":
logger.Infof("api config reload was called, sending sighup") logger.Infof("api config reload was called, sending sighup")
@ -67,6 +56,37 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
} }
} }
type listGroupsResponse struct {
Data struct {
Groups []APIGroup `json:"groups"`
} `json:"data"`
Status string `json:"status"`
}
func (rh *requestHandler) listGroups() ([]byte, error) {
rh.m.groupsMu.RLock()
defer rh.m.groupsMu.RUnlock()
lr := listGroupsResponse{Status: "success"}
for _, g := range rh.m.groups {
lr.Data.Groups = append(lr.Data.Groups, g.toAPI())
}
// sort list of alerts for deterministic output
sort.Slice(lr.Data.Groups, func(i, j int) bool {
return lr.Data.Groups[i].Name < lr.Data.Groups[j].Name
})
b, err := json.Marshal(lr)
if err != nil {
return nil, &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf(`error encoding list of active alerts: %s`, err),
StatusCode: http.StatusInternalServerError,
}
}
return b, nil
}
type listAlertsResponse struct { type listAlertsResponse struct {
Data struct { Data struct {
Alerts []*APIAlert `json:"alerts"` Alerts []*APIAlert `json:"alerts"`
@ -74,13 +94,18 @@ type listAlertsResponse struct {
Status string `json:"status"` Status string `json:"status"`
} }
func (rh *requestHandler) list() ([]byte, error) { func (rh *requestHandler) listAlerts() ([]byte, error) {
rh.m.groupsMu.RLock() rh.m.groupsMu.RLock()
defer rh.m.groupsMu.RUnlock() defer rh.m.groupsMu.RUnlock()
lr := listAlertsResponse{Status: "success"} lr := listAlertsResponse{Status: "success"}
for _, g := range rh.m.groups { for _, g := range rh.m.groups {
for _, r := range g.Rules { for _, r := range g.Rules {
lr.Data.Alerts = append(lr.Data.Alerts, r.AlertsAPI()...) a, ok := r.(*AlertingRule)
if !ok {
continue
}
lr.Data.Alerts = append(lr.Data.Alerts, a.AlertsAPI()...)
} }
} }

View file

@ -11,7 +11,7 @@ import (
) )
func TestHandler(t *testing.T) { func TestHandler(t *testing.T) {
rule := &Rule{ ar := &AlertingRule{
Name: "alert", Name: "alert",
alerts: map[uint64]*notifier.Alert{ alerts: map[uint64]*notifier.Alert{
0: {}, 0: {},
@ -19,7 +19,7 @@ func TestHandler(t *testing.T) {
} }
g := &Group{ g := &Group{
Name: "group", Name: "group",
Rules: []*Rule{rule}, Rules: []Rule{ar},
} }
m := &manager{groups: make(map[uint64]*Group)} m := &manager{groups: make(map[uint64]*Group)}
m.groups[0] = g m.groups[0] = g
@ -54,10 +54,17 @@ func TestHandler(t *testing.T) {
t.Errorf("expected 1 alert got %d", length) t.Errorf("expected 1 alert got %d", length)
} }
}) })
t.Run("/api/v1/groups", func(t *testing.T) {
lr := listGroupsResponse{}
getResp(ts.URL+"/api/v1/groups", &lr, 200)
if length := len(lr.Data.Groups); length != 1 {
t.Errorf("expected 1 group got %d", length)
}
})
t.Run("/api/v1/0/0/status", func(t *testing.T) { t.Run("/api/v1/0/0/status", func(t *testing.T) {
alert := &APIAlert{} alert := &APIAlert{}
getResp(ts.URL+"/api/v1/0/0/status", alert, 200) getResp(ts.URL+"/api/v1/0/0/status", alert, 200)
expAlert := rule.newAlertAPI(*rule.alerts[0]) expAlert := ar.newAlertAPI(*ar.alerts[0])
if !reflect.DeepEqual(alert, expAlert) { if !reflect.DeepEqual(alert, expAlert) {
t.Errorf("expected %v is equal to %v", alert, expAlert) t.Errorf("expected %v is equal to %v", alert, expAlert)
} }

53
app/vmalert/web_types.go Normal file
View file

@ -0,0 +1,53 @@
package main
import (
"time"
)
// APIAlert represents an notifier.AlertingRule state
// for WEB view
type APIAlert struct {
ID string `json:"id"`
Name string `json:"name"`
GroupID string `json:"group_id"`
Expression string `json:"expression"`
State string `json:"state"`
Value string `json:"value"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
ActiveAt time.Time `json:"activeAt"`
}
// APIGroup represents Group for WEB view
type APIGroup struct {
Name string `json:"name"`
ID string `json:"id"`
File string `json:"file"`
Interval string `json:"interval"`
AlertingRules []APIAlertingRule `json:"alerting_rules"`
RecordingRules []APIRecordingRule `json:"recording_rules"`
}
// APIAlertingRule represents AlertingRule for WEB view
type APIAlertingRule struct {
ID string `json:"id"`
Name string `json:"name"`
GroupID string `json:"group_id"`
Expression string `json:"expression"`
For string `json:"for"`
LastError string `json:"last_error"`
LastExec time.Time `json:"last_exec"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
}
// APIRecordingRule represents RecordingRule for WEB view
type APIRecordingRule struct {
ID string `json:"id"`
Name string `json:"name"`
GroupID string `json:"group_id"`
Expression string `json:"expression"`
LastError string `json:"last_error"`
LastExec time.Time `json:"last_exec"`
Labels map[string]string `json:"labels"`
}