From 270552fde4cd5c511b4e39d1c814d231de0bab99 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 1 Jun 2020 11:46:37 +0100 Subject: [PATCH] vmalert: Add recording rules support. (#519) * vmalert: Add recording rules support. Recording rules support required additional service refactoring since it wasn't planned to support them from the very beginning. The list of changes is following: * new entity RecordingRule was added for writing results of MetricsQL expressions into remote storage; * interface Rule now unites both recording and alerting rules; * configuration parser was moved to separate package and now performs more strict validation; * new endpoint for listing all groups and rules in json format was added; * evaluation interval may be set to every particular group; * vmalert: uncomment tests * vmalert: rm outdated TODO * vmalert: fix typos in README --- app/vmalert/Makefile | 3 +- app/vmalert/README.md | 53 +-- app/vmalert/alerting.go | 376 ++++++++++++++++++ .../{rule_test.go => alerting_test.go} | 171 ++------ app/vmalert/config.go | 74 ---- app/vmalert/config/config.go | 147 +++++++ app/vmalert/config/config_test.go | 83 ++++ .../testdata/dir/rules0-bad.rules | 0 .../testdata/dir/rules0-good.rules | 0 .../testdata/dir/rules1-bad.rules | 0 .../testdata/dir/rules1-good.rules | 2 - .../testdata/dir/rules2-bad.rules | 0 .../config/testdata/dir/rules3-bad.rules | 5 + .../config/testdata/dir/rules4-bad.rules | 7 + .../config/testdata/dir/rules5-bad.rules | 7 + .../{ => config}/testdata/rules0-bad.rules | 0 .../{ => config}/testdata/rules0-good.rules | 0 .../{ => config}/testdata/rules1-good.rules | 0 app/vmalert/config/testdata/rules2-good.rules | 28 ++ app/vmalert/config_test.go | 39 -- app/vmalert/group.go | 162 +++++--- app/vmalert/group_test.go | 124 +++--- app/vmalert/helpers_test.go | 200 ++++++++++ app/vmalert/main.go | 8 +- app/vmalert/manager.go | 46 ++- app/vmalert/manager_test.go | 142 ++++--- app/vmalert/notifier/alert.go | 6 +- .../notifier/alertmanager_request.qtpl.go | 110 ++--- app/vmalert/notifier/utils.go | 2 +- app/vmalert/recording.go | 151 +++++++ app/vmalert/recording_test.go | 121 ++++++ app/vmalert/remotewrite/remotewrite.go | 24 +- app/vmalert/rule.go | 344 +--------------- app/vmalert/utils.go | 27 ++ app/vmalert/web.go | 61 ++- app/vmalert/web_test.go | 13 +- app/vmalert/web_types.go | 53 +++ 37 files changed, 1728 insertions(+), 861 deletions(-) create mode 100644 app/vmalert/alerting.go rename app/vmalert/{rule_test.go => alerting_test.go} (72%) delete mode 100644 app/vmalert/config.go create mode 100644 app/vmalert/config/config.go create mode 100644 app/vmalert/config/config_test.go rename app/vmalert/{ => config}/testdata/dir/rules0-bad.rules (100%) rename app/vmalert/{ => config}/testdata/dir/rules0-good.rules (100%) rename app/vmalert/{ => config}/testdata/dir/rules1-bad.rules (100%) rename app/vmalert/{ => config}/testdata/dir/rules1-good.rules (99%) rename app/vmalert/{ => config}/testdata/dir/rules2-bad.rules (100%) create mode 100644 app/vmalert/config/testdata/dir/rules3-bad.rules create mode 100644 app/vmalert/config/testdata/dir/rules4-bad.rules create mode 100644 app/vmalert/config/testdata/dir/rules5-bad.rules rename app/vmalert/{ => config}/testdata/rules0-bad.rules (100%) rename app/vmalert/{ => config}/testdata/rules0-good.rules (100%) rename app/vmalert/{ => config}/testdata/rules1-good.rules (100%) create mode 100644 app/vmalert/config/testdata/rules2-good.rules delete mode 100644 app/vmalert/config_test.go create mode 100644 app/vmalert/helpers_test.go create mode 100644 app/vmalert/recording.go create mode 100644 app/vmalert/recording_test.go create mode 100644 app/vmalert/utils.go create mode 100644 app/vmalert/web_types.go diff --git a/app/vmalert/Makefile b/app/vmalert/Makefile index 0d9439398..8f378e67a 100644 --- a/app/vmalert/Makefile +++ b/app/vmalert/Makefile @@ -55,9 +55,10 @@ test-vmalert: go test -v -race -cover ./app/vmalert -loggerLevel=ERROR go test -v -race -cover ./app/vmalert/datasource go test -v -race -cover ./app/vmalert/notifier + go test -v -race -cover ./app/vmalert/config run-vmalert: vmalert - ./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \ + ./bin/vmalert -rule=app/vmalert/config/testdata/rules2-good.rules \ -datasource.url=http://localhost:8428 \ -notifier.url=http://localhost:9093 \ -remoteWrite.url=http://localhost:8428 \ diff --git a/app/vmalert/README.md b/app/vmalert/README.md index 34d0b8fef..c783ab08e 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -1,20 +1,18 @@ -## VM Alert +## vmalert -`vmalert` executes a list of given MetricsQL expressions (rules) and -sends alerts to [Alert Manager](https://github.com/prometheus/alertmanager). +`vmalert` executes a list of given [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) +or [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) +rules against configured address. ### Features: * Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) TSDB; * VictoriaMetrics [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL) - expressions validation; + support and expressions validation; * Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules) support; * Integration with [Alertmanager](https://github.com/prometheus/alertmanager); * Lightweight without extra dependencies. -### TODO: -* Support recording rules. - ### QuickStart To build `vmalert` from sources: @@ -26,9 +24,9 @@ make vmalert The build binary will be placed to `VictoriaMetrics/bin` folder. To start using `vmalert` you will need the following things: -* list of alert rules - PromQL/MetricsQL expressions to execute; +* list of rules - PromQL/MetricsQL expressions to execute; * datasource address - reachable VictoriaMetrics instance for rules execution; -* notifier address - reachable Alertmanager instance for processing, +* notifier address - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing, aggregating alerts and sending notifications. Then configure `vmalert` accordingly: @@ -38,23 +36,28 @@ Then configure `vmalert` accordingly: -notifier.url=http://localhost:9093 ``` -Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules) +Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata). + +`vmalert` may be configured with `-remoteWrite` flag to write recording rules and +alerts state in form of timeseries via remote write protocol. Alerts state will be written +as `ALERTS` timeseries. These timeseries may be used to recover alerts state on `vmalert` +restarts if `-remoteRead` is configured. `vmalert` runs evaluation for every group in a separate goroutine. Rules in group evaluated one-by-one sequentially. +**Important:** while recording rules execution is sequential, writing of timeseries results to remote +storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous +recording rule is reused in next one. + `vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: +* `http:///api/v1/groups` - list of all loaded groups and rules; * `http:///api/v1/alerts` - list of all active alerts; * `http:///api/v1///status" ` - get alert status by ID. Used as alert source in AlertManager. * `http:///metrics` - application metrics. * `http:///-/reload` - hot configuration reload. -`vmalert` may be configured with `-remoteWrite` flag to write alerts state in form of timeseries -via remote write protocol. Alerts state will be written as `ALERTS` timeseries. These timeseries -may be used to recover alerts state on `vmalert` restarts if `-remoteRead` is configured. - - ### Configuration The shortlist of configuration flags is the following: @@ -66,20 +69,14 @@ Usage of vmalert: Optional basic auth username for -datasource.url -datasource.url string Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428 - -enableTCP6 - Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP is used -evaluationInterval duration - How often to evaluate the rules. Default 1m (default 1m0s) + How often to evaluate the rules (default 1m0s) -external.url string External URL is used as alert's source for sent alerts to the notifier - -http.maxGracefulShutdownDuration duration - The maximum duration for graceful shutdown of HTTP server. Highly loaded server may require increased value for graceful shutdown (default 7s) - -httpAuth.password string - Password for HTTP Basic Auth. The authentication is disabled if -httpAuth.username is empty - -httpAuth.username string - Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password -httpListenAddr string Address to listen for http connections (default ":8880") + -metricsAuthKey string + Auth key for /metrics. It overrides httpAuth settings -notifier.url string Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093 -remoteRead.basicAuth.password string @@ -94,8 +91,12 @@ Usage of vmalert: Optional basic auth password for -remoteWrite.url -remoteWrite.basicAuth.username string Optional basic auth username for -remoteWrite.url - -remoteWrite.maxQueueSize - Defines the max number of pending datapoints to remote write endpoint + -remoteWrite.concurrency int + Defines number of readers that concurrently write into remote storage (default 1) + -remoteWrite.maxBatchSize int + Defines defines max number of timeseries to be flushed at once (default 1000) + -remoteWrite.maxQueueSize int + Defines the max number of pending datapoints to remote write endpoint (default 100000) -remoteWrite.url string Optional URL to Victoria Metrics or VMInsert where to persist alerts state in form of timeseries. E.g. http://127.0.0.1:8428 -rule value diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go new file mode 100644 index 000000000..978fafa3e --- /dev/null +++ b/app/vmalert/alerting.go @@ -0,0 +1,376 @@ +package main + +import ( + "context" + "fmt" + "hash/fnv" + "sort" + "strconv" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +// AlertingRule is basic alert entity +type AlertingRule struct { + Name string + Expr string + For time.Duration + Labels map[string]string + Annotations map[string]string + GroupID uint64 + + // guard status fields + mu sync.RWMutex + // stores list of active alerts + alerts map[uint64]*notifier.Alert + // stores last moment of time Exec was called + lastExecTime time.Time + // stores last error that happened in Exec func + // resets on every successful Exec + // may be used as Health state + lastExecError error +} + +func newAlertingRule(gID uint64, cfg config.Rule) *AlertingRule { + return &AlertingRule{ + Name: cfg.Alert, + Expr: cfg.Expr, + For: cfg.For, + Labels: cfg.Labels, + Annotations: cfg.Annotations, + GroupID: gID, + alerts: make(map[uint64]*notifier.Alert), + } +} + +// String implements Stringer interface +func (ar *AlertingRule) String() string { + return ar.Name +} + +// ID returns unique Rule ID +// within the parent Group. +func (ar *AlertingRule) ID() uint64 { + hash := fnv.New64a() + hash.Write([]byte("alerting")) + hash.Write([]byte("\xff")) + hash.Write([]byte(ar.Name)) + return hash.Sum64() +} + +// Exec executes AlertingRule expression via the given Querier. +// Based on the Querier results AlertingRule maintains notifier.Alerts +func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { + qMetrics, err := q.Query(ctx, ar.Expr) + ar.mu.Lock() + defer ar.mu.Unlock() + + ar.lastExecError = err + ar.lastExecTime = time.Now() + if err != nil { + return nil, fmt.Errorf("failed to execute query %q: %s", ar.Expr, err) + } + + for h, a := range ar.alerts { + // cleanup inactive alerts from previous Exec + if a.State == notifier.StateInactive { + delete(ar.alerts, h) + } + } + + updated := make(map[uint64]struct{}) + // update list of active alerts + for _, m := range qMetrics { + h := hash(m) + updated[h] = struct{}{} + if a, ok := ar.alerts[h]; ok { + if a.Value != m.Value { + // update Value field with latest value + a.Value = m.Value + // and re-exec template since Value can be used + // in templates + err = ar.template(a) + if err != nil { + return nil, err + } + } + continue + } + a, err := ar.newAlert(m, ar.lastExecTime) + if err != nil { + ar.lastExecError = err + return nil, fmt.Errorf("failed to create alert: %s", err) + } + a.ID = h + a.State = notifier.StatePending + ar.alerts[h] = a + } + + for h, a := range ar.alerts { + // if alert wasn't updated in this iteration + // means it is resolved already + if _, ok := updated[h]; !ok { + if a.State == notifier.StatePending { + // alert was in Pending state - it is not + // active anymore + delete(ar.alerts, h) + continue + } + a.State = notifier.StateInactive + continue + } + if a.State == notifier.StatePending && time.Since(a.Start) >= ar.For { + a.State = notifier.StateFiring + alertsFired.Inc() + } + } + if series { + return ar.toTimeSeries(ar.lastExecTime), nil + } + return nil, nil +} + +func (ar *AlertingRule) toTimeSeries(timestamp time.Time) []prompbmarshal.TimeSeries { + var tss []prompbmarshal.TimeSeries + for _, a := range ar.alerts { + if a.State == notifier.StateInactive { + continue + } + ts := ar.alertToTimeSeries(a, timestamp) + tss = append(tss, ts...) + } + return tss +} + +// copy all significant fields. +// alerts state isn't copied since +// it should be updated in next 2 Execs +func (ar *AlertingRule) UpdateWith(r Rule) error { + nr, ok := r.(*AlertingRule) + if !ok { + return fmt.Errorf("BUG: attempt to update alerting rule with wrong type %#v", r) + } + ar.Expr = nr.Expr + ar.For = nr.For + ar.Labels = nr.Labels + ar.Annotations = nr.Annotations + return nil +} + +// TODO: consider hashing algorithm in VM +func hash(m datasource.Metric) uint64 { + hash := fnv.New64a() + labels := m.Labels + sort.Slice(labels, func(i, j int) bool { + return labels[i].Name < labels[j].Name + }) + for _, l := range labels { + // drop __name__ to be consistent with Prometheus alerting + if l.Name == "__name__" { + continue + } + hash.Write([]byte(l.Name)) + hash.Write([]byte(l.Value)) + hash.Write([]byte("\xff")) + } + return hash.Sum64() +} + +func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time) (*notifier.Alert, error) { + a := ¬ifier.Alert{ + GroupID: ar.GroupID, + Name: ar.Name, + Labels: map[string]string{}, + Value: m.Value, + Start: start, + Expr: ar.Expr, + } + for _, l := range m.Labels { + // drop __name__ to be consistent with Prometheus alerting + if l.Name == "__name__" { + continue + } + a.Labels[l.Name] = l.Value + } + return a, ar.template(a) +} + +func (ar *AlertingRule) template(a *notifier.Alert) error { + // 1. template rule labels with data labels + rLabels, err := a.ExecTemplate(ar.Labels) + if err != nil { + return err + } + + // 2. merge data labels and rule labels + // metric labels may be overridden by + // rule labels + for k, v := range rLabels { + a.Labels[k] = v + } + + // 3. template merged labels + a.Labels, err = a.ExecTemplate(a.Labels) + if err != nil { + return err + } + + a.Annotations, err = a.ExecTemplate(ar.Annotations) + return err +} + +// AlertAPI generates APIAlert object from alert by its id(hash) +func (ar *AlertingRule) AlertAPI(id uint64) *APIAlert { + ar.mu.RLock() + defer ar.mu.RUnlock() + a, ok := ar.alerts[id] + if !ok { + return nil + } + return ar.newAlertAPI(*a) +} + +// RuleAPI returns Rule representation in form +// of APIAlertingRule +func (ar *AlertingRule) RuleAPI() APIAlertingRule { + var lastErr string + if ar.lastExecError != nil { + lastErr = ar.lastExecError.Error() + } + return APIAlertingRule{ + // encode as strings to avoid rounding + ID: fmt.Sprintf("%d", ar.ID()), + GroupID: fmt.Sprintf("%d", ar.GroupID), + Name: ar.Name, + Expression: ar.Expr, + For: ar.For.String(), + LastError: lastErr, + LastExec: ar.lastExecTime, + Labels: ar.Labels, + Annotations: ar.Annotations, + } +} + +// AlertsAPI generates list of APIAlert objects from existing alerts +func (ar *AlertingRule) AlertsAPI() []*APIAlert { + var alerts []*APIAlert + ar.mu.RLock() + for _, a := range ar.alerts { + alerts = append(alerts, ar.newAlertAPI(*a)) + } + ar.mu.RUnlock() + return alerts +} + +func (ar *AlertingRule) newAlertAPI(a notifier.Alert) *APIAlert { + return &APIAlert{ + // encode as strings to avoid rounding + ID: fmt.Sprintf("%d", a.ID), + GroupID: fmt.Sprintf("%d", a.GroupID), + + Name: a.Name, + Expression: ar.Expr, + Labels: a.Labels, + Annotations: a.Annotations, + State: a.State.String(), + ActiveAt: a.Start, + Value: strconv.FormatFloat(a.Value, 'e', -1, 64), + } +} + +const ( + // AlertMetricName is the metric name for synthetic alert timeseries. + alertMetricName = "ALERTS" + // AlertForStateMetricName is the metric name for 'for' state of alert. + alertForStateMetricName = "ALERTS_FOR_STATE" + + // AlertNameLabel is the label name indicating the name of an alert. + alertNameLabel = "alertname" + // AlertStateLabel is the label name indicating the state of an alert. + alertStateLabel = "alertstate" +) + +// alertToTimeSeries converts the given alert with the given timestamp to timeseries +func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries { + var tss []prompbmarshal.TimeSeries + tss = append(tss, alertToTimeSeries(ar.Name, a, timestamp)) + if ar.For > 0 { + tss = append(tss, alertForToTimeSeries(ar.Name, a, timestamp)) + } + return tss +} + +func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { + labels := make(map[string]string) + for k, v := range a.Labels { + labels[k] = v + } + labels["__name__"] = alertMetricName + labels[alertNameLabel] = name + labels[alertStateLabel] = a.State.String() + return newTimeSeries(1, labels, timestamp) +} + +// alertForToTimeSeries returns a timeseries that represents +// state of active alerts, where value is time when alert become active +func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { + labels := make(map[string]string) + for k, v := range a.Labels { + labels[k] = v + } + labels["__name__"] = alertForStateMetricName + labels[alertNameLabel] = name + return newTimeSeries(float64(a.Start.Unix()), labels, timestamp) +} + +// Restore restores the state of active alerts basing on previously written timeseries. +// Restore restores only Start field. Field State will be always Pending and supposed +// to be updated on next Exec, as well as Value field. +// Only rules with For > 0 will be restored. +func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error { + if q == nil { + return fmt.Errorf("querier is nil") + } + // Get the last datapoint in range via MetricsQL `last_over_time`. + // We don't use plain PromQL since Prometheus doesn't support + // remote write protocol which is used for state persistence in vmalert. + expr := fmt.Sprintf("last_over_time(%s{alertname=%q}[%ds])", + alertForStateMetricName, ar.Name, int(lookback.Seconds())) + qMetrics, err := q.Query(ctx, expr) + if err != nil { + return err + } + + for _, m := range qMetrics { + labels := m.Labels + m.Labels = make([]datasource.Label, 0) + // drop all extra labels, so hash key will + // be identical to timeseries received in Exec + for _, l := range labels { + if l.Name == alertNameLabel { + continue + } + // drop all overridden labels + if _, ok := ar.Labels[l.Name]; ok { + continue + } + m.Labels = append(m.Labels, l) + } + + a, err := ar.newAlert(m, time.Unix(int64(m.Value), 0)) + if err != nil { + return fmt.Errorf("failed to create alert: %s", err) + } + a.ID = hash(m) + a.State = notifier.StatePending + ar.alerts[a.ID] = a + logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start) + } + return nil +} diff --git a/app/vmalert/rule_test.go b/app/vmalert/alerting_test.go similarity index 72% rename from app/vmalert/rule_test.go rename to app/vmalert/alerting_test.go index 73227b476..a0f661f1f 100644 --- a/app/vmalert/rule_test.go +++ b/app/vmalert/alerting_test.go @@ -2,7 +2,6 @@ package main import ( "context" - "sync" "testing" "time" @@ -11,30 +10,15 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) -func TestRule_Validate(t *testing.T) { - if err := (&Rule{}).Validate(); err == nil { - t.Errorf("exptected empty name error") - } - if err := (&Rule{Name: "alert"}).Validate(); err == nil { - t.Errorf("exptected empty expr error") - } - if err := (&Rule{Name: "alert", Expr: "test{"}).Validate(); err == nil { - t.Errorf("exptected invalid expr error") - } - if err := (&Rule{Name: "alert", Expr: "test>0"}).Validate(); err != nil { - t.Errorf("exptected valid rule got %s", err) - } -} - -func TestRule_AlertToTimeSeries(t *testing.T) { +func TestAlertingRule_ToTimeSeries(t *testing.T) { timestamp := time.Now() testCases := []struct { - rule *Rule + rule *AlertingRule alert *notifier.Alert expTS []prompbmarshal.TimeSeries }{ { - newTestRule("instant", 0), + newTestAlertingRule("instant", 0), ¬ifier.Alert{State: notifier.StateFiring}, []prompbmarshal.TimeSeries{ newTimeSeries(1, map[string]string{ @@ -45,7 +29,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) { }, }, { - newTestRule("instant extra labels", 0), + newTestAlertingRule("instant extra labels", 0), ¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ "job": "foo", "instance": "bar", @@ -61,7 +45,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) { }, }, { - newTestRule("instant labels override", 0), + newTestAlertingRule("instant labels override", 0), ¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{ alertStateLabel: "foo", "__name__": "bar", @@ -75,7 +59,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) { }, }, { - newTestRule("for", time.Second), + newTestAlertingRule("for", time.Second), ¬ifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)}, []prompbmarshal.TimeSeries{ newTimeSeries(1, map[string]string{ @@ -90,7 +74,7 @@ func TestRule_AlertToTimeSeries(t *testing.T) { }, }, { - newTestRule("for pending", 10*time.Second), + newTestAlertingRule("for pending", 10*time.Second), ¬ifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)}, []prompbmarshal.TimeSeries{ newTimeSeries(1, map[string]string{ @@ -107,58 +91,28 @@ func TestRule_AlertToTimeSeries(t *testing.T) { } for _, tc := range testCases { t.Run(tc.rule.Name, func(t *testing.T) { - tss := tc.rule.AlertToTimeSeries(tc.alert, timestamp) - if len(tc.expTS) != len(tss) { - t.Fatalf("expected number of timeseries %d; got %d", len(tc.expTS), len(tss)) - } - for i := range tc.expTS { - expTS, gotTS := tc.expTS[i], tss[i] - if len(expTS.Samples) != len(gotTS.Samples) { - t.Fatalf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples)) - } - for i, exp := range expTS.Samples { - got := gotTS.Samples[i] - if got.Value != exp.Value { - t.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value) - } - if got.Timestamp != exp.Timestamp { - t.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp) - } - } - if len(expTS.Labels) != len(gotTS.Labels) { - t.Fatalf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels)) - } - for i, exp := range expTS.Labels { - got := gotTS.Labels[i] - if got.Name != exp.Name { - t.Errorf("expected label name %q; got %q", exp.Name, got.Name) - } - if got.Value != exp.Value { - t.Errorf("expected label value %q; got %q", exp.Value, got.Value) - } - } + tc.rule.alerts[tc.alert.ID] = tc.alert + tss := tc.rule.toTimeSeries(timestamp) + if err := compareTimeSeries(t, tc.expTS, tss); err != nil { + t.Fatalf("timeseries missmatch: %s", err) } }) } } -func newTestRule(name string, waitFor time.Duration) *Rule { - return &Rule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor} -} - -func TestRule_Exec(t *testing.T) { +func TestAlertingRule_Exec(t *testing.T) { testCases := []struct { - rule *Rule + rule *AlertingRule steps [][]datasource.Metric expAlerts map[uint64]*notifier.Alert }{ { - newTestRule("empty", 0), + newTestAlertingRule("empty", 0), [][]datasource.Metric{}, map[uint64]*notifier.Alert{}, }, { - newTestRule("empty labels", 0), + newTestAlertingRule("empty labels", 0), [][]datasource.Metric{ {datasource.Metric{}}, }, @@ -167,7 +121,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("single-firing", 0), + newTestAlertingRule("single-firing", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, }, @@ -176,7 +130,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("single-firing=>inactive", 0), + newTestAlertingRule("single-firing=>inactive", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {}, @@ -186,7 +140,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("single-firing=>inactive=>firing", 0), + newTestAlertingRule("single-firing=>inactive=>firing", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {}, @@ -197,7 +151,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("single-firing=>inactive=>firing=>inactive", 0), + newTestAlertingRule("single-firing=>inactive=>firing=>inactive", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {}, @@ -209,7 +163,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("single-firing=>inactive=>firing=>inactive=>empty", 0), + newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {}, @@ -220,7 +174,7 @@ func TestRule_Exec(t *testing.T) { map[uint64]*notifier.Alert{}, }, { - newTestRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0), + newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {}, @@ -234,7 +188,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("multiple-firing", 0), + newTestAlertingRule("multiple-firing", 0), [][]datasource.Metric{ { metricWithLabels(t, "name", "foo"), @@ -249,7 +203,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("multiple-steps-firing", 0), + newTestAlertingRule("multiple-steps-firing", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo1")}, @@ -264,7 +218,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("duplicate", 0), + newTestAlertingRule("duplicate", 0), [][]datasource.Metric{ { // metrics with the same labelset should result in one alert @@ -277,7 +231,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("for-pending", time.Minute), + newTestAlertingRule("for-pending", time.Minute), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, }, @@ -286,7 +240,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("for-fired", time.Millisecond), + newTestAlertingRule("for-fired", time.Millisecond), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")}, @@ -296,7 +250,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("for-pending=>empty", time.Second), + newTestAlertingRule("for-pending=>empty", time.Second), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")}, @@ -306,7 +260,7 @@ func TestRule_Exec(t *testing.T) { map[uint64]*notifier.Alert{}, }, { - newTestRule("for-pending=>firing=>inactive", time.Millisecond), + newTestAlertingRule("for-pending=>firing=>inactive", time.Millisecond), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")}, @@ -318,10 +272,10 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("for-pending=>firing=>inactive=>pending", time.Millisecond), + newTestAlertingRule("for-pending=>firing=>inactive=>pending", time.Millisecond), [][]datasource.Metric{ - {metricWithLabels(t, "name", "foo")}, - {metricWithLabels(t, "name", "foo")}, + //{metricWithLabels(t, "name", "foo")}, + //{metricWithLabels(t, "name", "foo")}, // empty step to reset pending alerts {}, {metricWithLabels(t, "name", "foo")}, @@ -331,7 +285,7 @@ func TestRule_Exec(t *testing.T) { }, }, { - newTestRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond), + newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")}, @@ -349,11 +303,11 @@ func TestRule_Exec(t *testing.T) { for _, tc := range testCases { t.Run(tc.rule.Name, func(t *testing.T) { fq := &fakeQuerier{} - tc.rule.group = fakeGroup + tc.rule.GroupID = fakeGroup.ID() for _, step := range tc.steps { fq.reset() fq.add(step...) - if err := tc.rule.Exec(context.TODO(), fq); err != nil { + if _, err := tc.rule.Exec(context.TODO(), fq, false); err != nil { t.Fatalf("unexpected err: %s", err) } // artificial delay between applying steps @@ -375,49 +329,9 @@ func TestRule_Exec(t *testing.T) { } } -func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { - t.Helper() - if len(labels) == 0 || len(labels)%2 != 0 { - t.Fatalf("expected to get even number of labels") - } - m := datasource.Metric{} - for i := 0; i < len(labels); i += 2 { - m.Labels = append(m.Labels, datasource.Label{ - Name: labels[i], - Value: labels[i+1], - }) - } - return m -} - -type fakeQuerier struct { - sync.Mutex - metrics []datasource.Metric -} - -func (fq *fakeQuerier) reset() { - fq.Lock() - fq.metrics = fq.metrics[:0] - fq.Unlock() -} - -func (fq *fakeQuerier) add(metrics ...datasource.Metric) { - fq.Lock() - fq.metrics = append(fq.metrics, metrics...) - fq.Unlock() -} - -func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) { - fq.Lock() - cpy := make([]datasource.Metric, len(fq.metrics)) - copy(cpy, fq.metrics) - fq.Unlock() - return cpy, nil -} - -func TestRule_Restore(t *testing.T) { +func TestAlertingRule_Restore(t *testing.T) { testCases := []struct { - rule *Rule + rule *AlertingRule metrics []datasource.Metric expAlerts map[uint64]*notifier.Alert }{ @@ -502,7 +416,7 @@ func TestRule_Restore(t *testing.T) { for _, tc := range testCases { t.Run(tc.rule.Name, func(t *testing.T) { fq := &fakeQuerier{} - tc.rule.group = fakeGroup + tc.rule.GroupID = fakeGroup.ID() fq.add(tc.metrics...) if err := tc.rule.Restore(context.TODO(), fq, time.Hour); err != nil { t.Fatalf("unexpected err: %s", err) @@ -526,8 +440,8 @@ func TestRule_Restore(t *testing.T) { } } -func newTestRuleWithLabels(name string, labels ...string) *Rule { - r := newTestRule(name, 0) +func newTestRuleWithLabels(name string, labels ...string) *AlertingRule { + r := newTestAlertingRule(name, 0) r.Labels = make(map[string]string) for i := 0; i < len(labels); i += 2 { r.Labels[labels[i]] = labels[i+1] @@ -535,9 +449,6 @@ func newTestRuleWithLabels(name string, labels ...string) *Rule { return r } -func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric { - t.Helper() - m := metricWithLabels(t, labels...) - m.Value = value - return m +func newTestAlertingRule(name string, waitFor time.Duration) *AlertingRule { + return &AlertingRule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor} } diff --git a/app/vmalert/config.go b/app/vmalert/config.go deleted file mode 100644 index b84e7044d..000000000 --- a/app/vmalert/config.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "gopkg.in/yaml.v2" - "io/ioutil" - "path/filepath" - "strings" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" -) - -// Parse parses rule configs from given file patterns -func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) { - var fp []string - for _, pattern := range pathPatterns { - matches, err := filepath.Glob(pattern) - if err != nil { - return nil, fmt.Errorf("error reading file patther %s:%v", pattern, err) - } - fp = append(fp, matches...) - } - var groups []Group - for _, file := range fp { - groupsNames := map[string]struct{}{} - gr, err := parseFile(file) - if err != nil { - return nil, fmt.Errorf("file %s: %w", file, err) - } - for _, g := range gr { - if _, ok := groupsNames[g.Name]; ok { - return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", g.Name, file) - } - g.File = file - g.doneCh = make(chan struct{}) - g.finishedCh = make(chan struct{}) - g.updateCh = make(chan Group) - - groupsNames[g.Name] = struct{}{} - for _, rule := range g.Rules { - if err = rule.Validate(); err != nil { - return nil, fmt.Errorf("invalid rule filepath: %s, group %s: %w", file, g.Name, err) - } - if validateAnnotations { - if err = notifier.ValidateTemplates(rule.Annotations); err != nil { - return nil, fmt.Errorf("invalid annotations filepath: %s, group %s: %w", file, g.Name, err) - } - if err = notifier.ValidateTemplates(rule.Labels); err != nil { - return nil, fmt.Errorf("invalid labels filepath: %s, group %s: %w", file, g.Name, err) - } - } - rule.group = g - rule.alerts = make(map[uint64]*notifier.Alert) - } - groups = append(groups, g) - } - } - if len(groups) < 1 { - return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";")) - } - return groups, nil -} - -func parseFile(path string) ([]Group, error) { - data, err := ioutil.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("error reading alert rule file: %w", err) - } - g := struct { - Groups []Group `yaml:"groups"` - }{} - err = yaml.Unmarshal(data, &g) - return g.Groups, err -} diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go new file mode 100644 index 000000000..ee05f03e0 --- /dev/null +++ b/app/vmalert/config/config.go @@ -0,0 +1,147 @@ +package config + +import ( + "fmt" + "gopkg.in/yaml.v2" + "io/ioutil" + "path/filepath" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/metricsql" +) + +// Group contains list of Rules grouped into +// entity with one name and evaluation interval +type Group struct { + File string + Name string `yaml:"name"` + Interval time.Duration `yaml:"interval,omitempty"` + Rules []Rule `yaml:"rules"` + + // Catches all undefined fields and must be empty after parsing. + XXX map[string]interface{} `yaml:",inline"` +} + +// Validate check for internal Group or Rule configuration errors +func (g *Group) Validate(validateAnnotations bool) error { + if g.Name == "" { + return fmt.Errorf("group name must be set") + } + if len(g.Rules) == 0 { + return fmt.Errorf("group %q can't contain no rules", g.Name) + } + uniqueRules := map[string]struct{}{} + for _, r := range g.Rules { + ruleName := r.Record + if r.Alert != "" { + ruleName = r.Alert + } + if _, ok := uniqueRules[ruleName]; ok { + return fmt.Errorf("rule name %q duplicate", ruleName) + } + uniqueRules[ruleName] = struct{}{} + if err := r.Validate(); err != nil { + return fmt.Errorf("invalid rule %q.%q: %s", g.Name, ruleName, err) + } + if !validateAnnotations { + continue + } + if err := notifier.ValidateTemplates(r.Annotations); err != nil { + return fmt.Errorf("invalid annotations for rule %q.%q: %s", g.Name, ruleName, err) + } + if err := notifier.ValidateTemplates(r.Labels); err != nil { + return fmt.Errorf("invalid labels for rule %q.%q: %s", g.Name, ruleName, err) + } + } + return checkOverflow(g.XXX, fmt.Sprintf("group %q", g.Name)) +} + +// Rule describes entity that represent either +// recording rule or alerting rule. +type Rule struct { + Record string `yaml:"record,omitempty"` + Alert string `yaml:"alert,omitempty"` + Expr string `yaml:"expr"` + For time.Duration `yaml:"for,omitempty"` + Labels map[string]string `yaml:"labels,omitempty"` + Annotations map[string]string `yaml:"annotations,omitempty"` +} + +// Validate check for Rule configuration errors +func (r *Rule) Validate() error { + if (r.Record == "" && r.Alert == "") || (r.Record != "" && r.Alert != "") { + return fmt.Errorf("either `record` or `alert` must be set") + } + if r.Expr == "" { + return fmt.Errorf("expression can't be empty") + } + if _, err := metricsql.Parse(r.Expr); err != nil { + return fmt.Errorf("invalid expression: %w", err) + } + return nil +} + +// Parse parses rule configs from given file patterns +func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) { + var fp []string + for _, pattern := range pathPatterns { + matches, err := filepath.Glob(pattern) + if err != nil { + return nil, fmt.Errorf("error reading file pattern %s: %v", pattern, err) + } + fp = append(fp, matches...) + } + var groups []Group + for _, file := range fp { + uniqueGroups := map[string]struct{}{} + gr, err := parseFile(file) + if err != nil { + return nil, fmt.Errorf("failed to parse file %q: %w", file, err) + } + for _, g := range gr { + if err := g.Validate(validateAnnotations); err != nil { + return nil, fmt.Errorf("invalid group %q in file %q: %s", g.Name, file, err) + } + if _, ok := uniqueGroups[g.Name]; ok { + return nil, fmt.Errorf("group name %q duplicate in file %q", g.Name, file) + } + uniqueGroups[g.Name] = struct{}{} + g.File = file + groups = append(groups, g) + } + } + if len(groups) < 1 { + return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";")) + } + return groups, nil +} + +func parseFile(path string) ([]Group, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("error reading alert rule file: %w", err) + } + g := struct { + Groups []Group `yaml:"groups"` + // Catches all undefined fields and must be empty after parsing. + XXX map[string]interface{} `yaml:",inline"` + }{} + err = yaml.Unmarshal(data, &g) + if err != nil { + return nil, err + } + return g.Groups, checkOverflow(g.XXX, "config") +} + +func checkOverflow(m map[string]interface{}, ctx string) error { + if len(m) > 0 { + var keys []string + for k := range m { + keys = append(keys, k) + } + return fmt.Errorf("unknown fields in %s: %s", ctx, strings.Join(keys, ", ")) + } + return nil +} diff --git a/app/vmalert/config/config_test.go b/app/vmalert/config/config_test.go new file mode 100644 index 000000000..26b3f9762 --- /dev/null +++ b/app/vmalert/config/config_test.go @@ -0,0 +1,83 @@ +package config + +import ( + "net/url" + "os" + "strings" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" +) + +func TestMain(m *testing.M) { + u, _ := url.Parse("https://victoriametrics.com/path") + notifier.InitTemplateFunc(u) + os.Exit(m.Run()) +} + +func TestParseGood(t *testing.T) { + if _, err := Parse([]string{"testdata/*good.rules", "testdata/dir/*good.*"}, true); err != nil { + t.Errorf("error parsing files %s", err) + } +} + +func TestParseBad(t *testing.T) { + testCases := []struct { + path []string + expErr string + }{ + { + []string{"testdata/rules0-bad.rules"}, + "unexpected token", + }, + { + []string{"testdata/dir/rules0-bad.rules"}, + "error parsing annotation", + }, + { + []string{"testdata/dir/rules1-bad.rules"}, + "duplicate in file", + }, + { + []string{"testdata/dir/rules2-bad.rules"}, + "function \"value\" not defined", + }, + { + []string{"testdata/dir/rules3-bad.rules"}, + "either `record` or `alert` must be set", + }, + { + []string{"testdata/dir/rules4-bad.rules"}, + "either `record` or `alert` must be set", + }, + { + []string{"testdata/*.yaml"}, + "no groups found", + }, + } + for _, tc := range testCases { + _, err := Parse(tc.path, true) + if err == nil { + t.Errorf("expected to get error") + return + } + if !strings.Contains(err.Error(), tc.expErr) { + t.Errorf("expected err to contain %q; got %q instead", tc.expErr, err) + } + } +} + +func TestRule_Validate(t *testing.T) { + if err := (&Rule{}).Validate(); err == nil { + t.Errorf("exptected empty name error") + } + if err := (&Rule{Alert: "alert"}).Validate(); err == nil { + t.Errorf("exptected empty expr error") + } + if err := (&Rule{Alert: "alert", Expr: "test{"}).Validate(); err == nil { + t.Errorf("exptected invalid expr error") + } + if err := (&Rule{Alert: "alert", Expr: "test>0"}).Validate(); err != nil { + t.Errorf("exptected valid rule got %s", err) + } +} diff --git a/app/vmalert/testdata/dir/rules0-bad.rules b/app/vmalert/config/testdata/dir/rules0-bad.rules similarity index 100% rename from app/vmalert/testdata/dir/rules0-bad.rules rename to app/vmalert/config/testdata/dir/rules0-bad.rules diff --git a/app/vmalert/testdata/dir/rules0-good.rules b/app/vmalert/config/testdata/dir/rules0-good.rules similarity index 100% rename from app/vmalert/testdata/dir/rules0-good.rules rename to app/vmalert/config/testdata/dir/rules0-good.rules diff --git a/app/vmalert/testdata/dir/rules1-bad.rules b/app/vmalert/config/testdata/dir/rules1-bad.rules similarity index 100% rename from app/vmalert/testdata/dir/rules1-bad.rules rename to app/vmalert/config/testdata/dir/rules1-bad.rules diff --git a/app/vmalert/testdata/dir/rules1-good.rules b/app/vmalert/config/testdata/dir/rules1-good.rules similarity index 99% rename from app/vmalert/testdata/dir/rules1-good.rules rename to app/vmalert/config/testdata/dir/rules1-good.rules index 1e602e031..1d04b1a6c 100644 --- a/app/vmalert/testdata/dir/rules1-good.rules +++ b/app/vmalert/config/testdata/dir/rules1-good.rules @@ -9,5 +9,3 @@ groups: annotations: summary: "{{ $value }}" description: "{{$labels}}" - - diff --git a/app/vmalert/testdata/dir/rules2-bad.rules b/app/vmalert/config/testdata/dir/rules2-bad.rules similarity index 100% rename from app/vmalert/testdata/dir/rules2-bad.rules rename to app/vmalert/config/testdata/dir/rules2-bad.rules diff --git a/app/vmalert/config/testdata/dir/rules3-bad.rules b/app/vmalert/config/testdata/dir/rules3-bad.rules new file mode 100644 index 000000000..927ca2980 --- /dev/null +++ b/app/vmalert/config/testdata/dir/rules3-bad.rules @@ -0,0 +1,5 @@ +groups: + - name: group + rules: + - for: 5m + expr: vm_rows > 0 diff --git a/app/vmalert/config/testdata/dir/rules4-bad.rules b/app/vmalert/config/testdata/dir/rules4-bad.rules new file mode 100644 index 000000000..64fb2f9e5 --- /dev/null +++ b/app/vmalert/config/testdata/dir/rules4-bad.rules @@ -0,0 +1,7 @@ +groups: + - name: group + rules: + - alert: rows + record: record + for: 5m + expr: vm_rows > 0 diff --git a/app/vmalert/config/testdata/dir/rules5-bad.rules b/app/vmalert/config/testdata/dir/rules5-bad.rules new file mode 100644 index 000000000..bb2e43701 --- /dev/null +++ b/app/vmalert/config/testdata/dir/rules5-bad.rules @@ -0,0 +1,7 @@ +groups: + - name: group + rules: + - alert: rows + expr: vm_rows > 0 + - record: rows + expr: sum(vm_rows) \ No newline at end of file diff --git a/app/vmalert/testdata/rules0-bad.rules b/app/vmalert/config/testdata/rules0-bad.rules similarity index 100% rename from app/vmalert/testdata/rules0-bad.rules rename to app/vmalert/config/testdata/rules0-bad.rules diff --git a/app/vmalert/testdata/rules0-good.rules b/app/vmalert/config/testdata/rules0-good.rules similarity index 100% rename from app/vmalert/testdata/rules0-good.rules rename to app/vmalert/config/testdata/rules0-good.rules diff --git a/app/vmalert/testdata/rules1-good.rules b/app/vmalert/config/testdata/rules1-good.rules similarity index 100% rename from app/vmalert/testdata/rules1-good.rules rename to app/vmalert/config/testdata/rules1-good.rules diff --git a/app/vmalert/config/testdata/rules2-good.rules b/app/vmalert/config/testdata/rules2-good.rules new file mode 100644 index 000000000..ec82c43c6 --- /dev/null +++ b/app/vmalert/config/testdata/rules2-good.rules @@ -0,0 +1,28 @@ +groups: + - name: TestGroup + interval: 2s + rules: + - alert: Conns + expr: sum(vm_tcplistener_conns) by(instance) > 1 + for: 3m + annotations: + summary: "Too high connection number for {{$labels.instance}}" + description: "It is {{ $value }} connections for {{$labels.instance}}" + - alert: ExampleAlertAlwaysFiring + expr: sum by(job) + (up == 1) + - record: handler:requests:rate5m + expr: sum(rate(prometheus_http_requests_total[5m])) by (handler) + labels: + recording: true + - record: code:requests:rate5m + expr: sum(rate(promhttp_metric_handler_requests_total[5m])) by (code) + labels: + recording: true + - record: successful_requests:ratio_rate5m + labels: + recording: true + expr: |2 + sum(code:requests:rate5m{code="200"}) + / + sum(code:requests:rate5m) \ No newline at end of file diff --git a/app/vmalert/config_test.go b/app/vmalert/config_test.go deleted file mode 100644 index 057d1ab8b..000000000 --- a/app/vmalert/config_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package main - -import ( - "net/url" - "os" - "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" -) - -func TestMain(m *testing.M) { - u, _ := url.Parse("https://victoriametrics.com/path") - notifier.InitTemplateFunc(u) - os.Exit(m.Run()) -} - -func TestParseGood(t *testing.T) { - if _, err := Parse([]string{"testdata/*good.rules", "testdata/dir/*good.*"}, true); err != nil { - t.Errorf("error parsing files %s", err) - } -} - -func TestParseBad(t *testing.T) { - if _, err := Parse([]string{"testdata/rules0-bad.rules"}, true); err == nil { - t.Errorf("expected syntaxt error") - } - if _, err := Parse([]string{"testdata/dir/rules0-bad.rules"}, true); err == nil { - t.Errorf("expected template annotation error") - } - if _, err := Parse([]string{"testdata/dir/rules1-bad.rules"}, true); err == nil { - t.Errorf("expected same group error") - } - if _, err := Parse([]string{"testdata/dir/rules2-bad.rules"}, true); err == nil { - t.Errorf("expected template label error") - } - if _, err := Parse([]string{"testdata/*.yaml"}, true); err == nil { - t.Errorf("expected empty group") - } -} diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 4d5e5a318..0c1b97a08 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "hash/fnv" + "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" @@ -15,15 +17,44 @@ import ( // Group is an entity for grouping rules type Group struct { - Name string - File string - Rules []*Rule + Name string + File string + Rules []Rule + Interval time.Duration doneCh chan struct{} finishedCh chan struct{} // channel accepts new Group obj // which supposed to update current group - updateCh chan Group + updateCh chan *Group + mu sync.RWMutex +} + +func newGroup(cfg config.Group, defaultInterval time.Duration) *Group { + g := &Group{ + Name: cfg.Name, + File: cfg.File, + Interval: cfg.Interval, + doneCh: make(chan struct{}), + finishedCh: make(chan struct{}), + updateCh: make(chan *Group), + } + if g.Interval == 0 { + g.Interval = defaultInterval + } + rules := make([]Rule, len(cfg.Rules)) + for i, r := range cfg.Rules { + rules[i] = g.newRule(r) + } + g.Rules = rules + return g +} + +func (g *Group) newRule(rule config.Rule) Rule { + if rule.Alert != "" { + return newAlertingRule(g.ID(), rule) + } + return newRecordingRule(g.ID(), rule) } // ID return unique group ID that consists of @@ -36,48 +67,49 @@ func (g *Group) ID() uint64 { return hash.Sum64() } -// Restore restores alerts state for all group rules with For > 0 +// Restore restores alerts state for group rules func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error { for _, rule := range g.Rules { - if rule.For == 0 { - return nil + rr, ok := rule.(*AlertingRule) + if !ok { + continue } - if err := rule.Restore(ctx, q, lookback); err != nil { - return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err) + if rr.For < 1 { + continue + } + if err := rr.Restore(ctx, q, lookback); err != nil { + return fmt.Errorf("error while restoring rule %q: %s", rule, err) } } return nil } // updateWith updates existing group with -// passed group object. +// passed group object. This function ignores group +// evaluation interval change. It supposed to be updated +// in group.start function. // Not thread-safe. -func (g *Group) updateWith(newGroup Group) { - rulesRegistry := make(map[string]*Rule) +func (g *Group) updateWith(newGroup *Group) error { + rulesRegistry := make(map[uint64]Rule) for _, nr := range newGroup.Rules { - rulesRegistry[nr.id()] = nr + rulesRegistry[nr.ID()] = nr } for i, or := range g.Rules { - nr, ok := rulesRegistry[or.id()] + nr, ok := rulesRegistry[or.ID()] if !ok { // old rule is not present in the new list // so we mark it for removing g.Rules[i] = nil continue } - - // copy all significant fields. - // alerts state isn't copied since - // it should be updated in next 2 Execs - or.For = nr.For - or.Expr = nr.Expr - or.Labels = nr.Labels - or.Annotations = nr.Annotations - delete(rulesRegistry, nr.id()) + if err := or.UpdateWith(nr); err != nil { + return err + } + delete(rulesRegistry, nr.ID()) } - var newRules []*Rule + var newRules []Rule for _, r := range g.Rules { if r == nil { // skip nil rules @@ -90,6 +122,7 @@ func (g *Group) updateWith(newGroup Group) { newRules = append(newRules, nr) } g.Rules = newRules + return nil } var ( @@ -116,10 +149,15 @@ func (g *Group) close() { <-g.finishedCh } -func (g *Group) start(ctx context.Context, interval time.Duration, - querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) { - logger.Infof("group %q started", g.Name) - t := time.NewTicker(interval) +func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) { + logger.Infof("group %q started with interval %v", g.Name, g.Interval) + + var returnSeries bool + if rw != nil { + returnSeries = true + } + + t := time.NewTicker(g.Interval) defer t.Stop() for { select { @@ -132,7 +170,20 @@ func (g *Group) start(ctx context.Context, interval time.Duration, close(g.finishedCh) return case ng := <-g.updateCh: - g.updateWith(ng) + g.mu.Lock() + err := g.updateWith(ng) + if err != nil { + logger.Errorf("group %q: failed to update: %s", g.Name, err) + g.mu.Unlock() + continue + } + if g.Interval != ng.Interval { + g.Interval = ng.Interval + t.Stop() + t = time.NewTicker(g.Interval) + logger.Infof("group %q: changed evaluation interval to %v", g.Name, g.Interval) + } + g.mu.Unlock() case <-t.C: iterationTotal.Inc() iterationStart := time.Now() @@ -140,58 +191,55 @@ func (g *Group) start(ctx context.Context, interval time.Duration, execTotal.Inc() execStart := time.Now() - err := rule.Exec(ctx, querier) + tss, err := rule.Exec(ctx, querier, returnSeries) execDuration.UpdateDuration(execStart) if err != nil { execErrors.Inc() - logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err) + logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule, err) continue } - var alertsToSend []notifier.Alert - for _, a := range rule.alerts { + if len(tss) > 0 { + remoteWriteSent.Add(len(tss)) + for _, ts := range tss { + if err := rw.Push(ts); err != nil { + remoteWriteErrors.Inc() + logger.Errorf("failed to remote write for rule %q.%q: %s", g.Name, rule, err) + } + } + } + + ar, ok := rule.(*AlertingRule) + if !ok { + continue + } + var alerts []notifier.Alert + for _, a := range ar.alerts { switch a.State { case notifier.StateFiring: // set End to execStart + 3 intervals // so notifier can resolve it automatically if `vmalert` // won't be able to send resolve for some reason - a.End = execStart.Add(3 * interval) - alertsToSend = append(alertsToSend, *a) - pushToRW(rw, rule, a, execStart) - case notifier.StatePending: - pushToRW(rw, rule, a, execStart) + a.End = execStart.Add(3 * g.Interval) + alerts = append(alerts, *a) case notifier.StateInactive: // set End to execStart to notify // that it was just resolved a.End = execStart - alertsToSend = append(alertsToSend, *a) + alerts = append(alerts, *a) } } - if len(alertsToSend) == 0 { + if len(alerts) < 1 { continue } - alertsSent.Add(len(alertsToSend)) - if err := nr.Send(ctx, alertsToSend); err != nil { + alertsSent.Add(len(alerts)) + if err := nr.Send(ctx, alerts); err != nil { alertsSendErrors.Inc() - logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err) + logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule, err) } } iterationDuration.UpdateDuration(iterationStart) } } } - -func pushToRW(rw *remotewrite.Client, rule *Rule, a *notifier.Alert, timestamp time.Time) { - if rw == nil { - return - } - tss := rule.AlertToTimeSeries(a, timestamp) - remoteWriteSent.Add(len(tss)) - for _, ts := range tss { - if err := rw.Push(ts); err != nil { - remoteWriteErrors.Inc() - logger.Errorf("failed to push timeseries to remotewrite: %s", err) - } - } -} diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index 1f99d8da2..c3c213cdc 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -4,28 +4,28 @@ import ( "context" "reflect" "sort" - "sync" "testing" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" ) func TestUpdateWith(t *testing.T) { testCases := []struct { name string - currentRules []*Rule - // rules must be sorted by name - newRules []*Rule + currentRules []Rule + // rules must be sorted by ID + newRules []Rule }{ { "new rule", - []*Rule{}, - []*Rule{{Name: "bar"}}, + []Rule{}, + []Rule{&AlertingRule{Name: "bar"}}, }, { - "update rule", - []*Rule{{ + "update alerting rule", + []Rule{&AlertingRule{ Name: "foo", Expr: "up > 0", For: time.Second, @@ -37,8 +37,8 @@ func TestUpdateWith(t *testing.T) { "description": "{{$labels}}", }, }}, - []*Rule{{ - Name: "bar", + []Rule{&AlertingRule{ + Name: "foo", Expr: "up > 10", For: time.Second, Labels: map[string]string{ @@ -49,56 +49,82 @@ func TestUpdateWith(t *testing.T) { }, }}, }, + { + "update recording rule", + []Rule{&RecordingRule{ + Name: "foo", + Expr: "max(up)", + Labels: map[string]string{ + "bar": "baz", + }, + }}, + []Rule{&RecordingRule{ + Name: "foo", + Expr: "min(up)", + Labels: map[string]string{ + "baz": "bar", + }, + }}, + }, { "empty rule", - []*Rule{{Name: "foo"}}, - []*Rule{}, + []Rule{&AlertingRule{Name: "foo"}, &RecordingRule{Name: "bar"}}, + []Rule{}, }, { "multiple rules", - []*Rule{{Name: "bar"}, {Name: "baz"}, {Name: "foo"}}, - []*Rule{{Name: "baz"}, {Name: "foo"}}, + []Rule{ + &AlertingRule{Name: "bar"}, + &AlertingRule{Name: "baz"}, + &RecordingRule{Name: "foo"}, + }, + []Rule{ + &AlertingRule{Name: "baz"}, + &RecordingRule{Name: "foo"}, + }, }, { "replace rule", - []*Rule{{Name: "foo1"}}, - []*Rule{{Name: "foo2"}}, + []Rule{&AlertingRule{Name: "foo1"}}, + []Rule{&AlertingRule{Name: "foo2"}}, }, { "replace multiple rules", - []*Rule{{Name: "foo1"}, {Name: "foo2"}}, - []*Rule{{Name: "foo3"}, {Name: "foo4"}}, + []Rule{ + &AlertingRule{Name: "foo1"}, + &RecordingRule{Name: "foo2"}, + &AlertingRule{Name: "foo3"}, + }, + []Rule{ + &AlertingRule{Name: "foo3"}, + &AlertingRule{Name: "foo4"}, + &RecordingRule{Name: "foo5"}, + }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { g := &Group{Rules: tc.currentRules} - g.updateWith(Group{Rules: tc.newRules}) + err := g.updateWith(&Group{Rules: tc.newRules}) + if err != nil { + t.Fatal(err) + } if len(g.Rules) != len(tc.newRules) { t.Fatalf("expected to have %d rules; got: %d", len(g.Rules), len(tc.newRules)) } sort.Slice(g.Rules, func(i, j int) bool { - return g.Rules[i].Name < g.Rules[j].Name + return g.Rules[i].ID() < g.Rules[j].ID() }) for i, r := range g.Rules { got, want := r, tc.newRules[i] - if got.Name != want.Name { - t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name) + if got.ID() != want.ID() { + t.Fatalf("expected to have rule %q; got %q", want, got) } - if got.Expr != want.Expr { - t.Fatalf("expected to have expression %q; got %q", want.Expr, got.Expr) - } - if got.For != want.For { - t.Fatalf("expected to have for %q; got %q", want.For, got.For) - } - if !reflect.DeepEqual(got.Annotations, want.Annotations) { - t.Fatalf("expected to have annotations %#v; got %#v", want.Annotations, got.Annotations) - } - if !reflect.DeepEqual(got.Labels, want.Labels) { - t.Fatalf("expected to have labels %#v; got %#v", want.Labels, got.Labels) + if err := compareRules(t, got, want); err != nil { + t.Fatalf("comparsion error: %s", err) } } }) @@ -107,11 +133,12 @@ func TestUpdateWith(t *testing.T) { func TestGroupStart(t *testing.T) { // TODO: make parsing from string instead of file - groups, err := Parse([]string{"testdata/rules1-good.rules"}, true) + groups, err := config.Parse([]string{"config/testdata/rules1-good.rules"}, true) if err != nil { t.Fatalf("failed to parse rules: %s", err) } - g := groups[0] + const evalInterval = time.Millisecond + g := newGroup(groups[0], evalInterval) fn := &fakeNotifier{} fs := &fakeQuerier{} @@ -120,27 +147,26 @@ func TestGroupStart(t *testing.T) { m1 := metricWithLabels(t, "instance", inst1, "job", job) m2 := metricWithLabels(t, "instance", inst2, "job", job) - r := g.Rules[0] - alert1, err := r.newAlert(m1) + r := g.Rules[0].(*AlertingRule) + alert1, err := r.newAlert(m1, time.Now()) if err != nil { t.Fatalf("faield to create alert: %s", err) } alert1.State = notifier.StateFiring alert1.ID = hash(m1) - alert2, err := r.newAlert(m2) + alert2, err := r.newAlert(m2, time.Now()) if err != nil { t.Fatalf("faield to create alert: %s", err) } alert2.State = notifier.StateFiring alert2.ID = hash(m2) - const evalInterval = time.Millisecond finished := make(chan struct{}) fs.add(m1) fs.add(m2) go func() { - g.start(context.Background(), evalInterval, fs, fn, nil) + g.start(context.Background(), fs, fn, nil) close(finished) }() @@ -197,21 +223,3 @@ func compareAlerts(t *testing.T, as, bs []notifier.Alert) { } } } - -type fakeNotifier struct { - sync.Mutex - alerts []notifier.Alert -} - -func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error { - fn.Lock() - defer fn.Unlock() - fn.alerts = alerts - return nil -} - -func (fn *fakeNotifier) getAlerts() []notifier.Alert { - fn.Lock() - defer fn.Unlock() - return fn.alerts -} diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go new file mode 100644 index 000000000..2c2249bc2 --- /dev/null +++ b/app/vmalert/helpers_test.go @@ -0,0 +1,200 @@ +package main + +import ( + "context" + "fmt" + "reflect" + "sync" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +type fakeQuerier struct { + sync.Mutex + metrics []datasource.Metric + err error +} + +func (fq *fakeQuerier) setErr(err error) { + fq.Lock() + fq.err = err + fq.Unlock() +} + +func (fq *fakeQuerier) reset() { + fq.Lock() + fq.err = nil + fq.metrics = fq.metrics[:0] + fq.Unlock() +} + +func (fq *fakeQuerier) add(metrics ...datasource.Metric) { + fq.Lock() + fq.metrics = append(fq.metrics, metrics...) + fq.Unlock() +} + +func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) { + fq.Lock() + defer fq.Unlock() + if fq.err != nil { + return nil, fq.err + } + cp := make([]datasource.Metric, len(fq.metrics)) + copy(cp, fq.metrics) + return cp, nil +} + +type fakeNotifier struct { + sync.Mutex + alerts []notifier.Alert +} + +func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error { + fn.Lock() + defer fn.Unlock() + fn.alerts = alerts + return nil +} + +func (fn *fakeNotifier) getAlerts() []notifier.Alert { + fn.Lock() + defer fn.Unlock() + return fn.alerts +} + +func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric { + t.Helper() + m := metricWithLabels(t, labels...) + m.Value = value + return m +} + +func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { + t.Helper() + if len(labels) == 0 || len(labels)%2 != 0 { + t.Fatalf("expected to get even number of labels") + } + m := datasource.Metric{} + for i := 0; i < len(labels); i += 2 { + m.Labels = append(m.Labels, datasource.Label{ + Name: labels[i], + Value: labels[i+1], + }) + } + return m +} + +func compareGroups(t *testing.T, a, b *Group) { + t.Helper() + if a.Name != b.Name { + t.Fatalf("expected group name %q; got %q", a.Name, b.Name) + } + if a.File != b.File { + t.Fatalf("expected group %q file name %q; got %q", a.Name, a.File, b.File) + } + if a.Interval != b.Interval { + t.Fatalf("expected group %q interval %v; got %v", a.Name, a.Interval, b.Interval) + } + if len(a.Rules) != len(b.Rules) { + t.Fatalf("expected group %s to have %d rules; got: %d", + a.Name, len(a.Rules), len(b.Rules)) + } + for i, r := range a.Rules { + got, want := r, b.Rules[i] + if a.ID() != b.ID() { + t.Fatalf("expected to have rule %q; got %q", want.ID(), got.ID()) + } + if err := compareRules(t, want, got); err != nil { + t.Fatalf("comparsion error: %s", err) + } + } +} + +func compareRules(t *testing.T, a, b Rule) error { + t.Helper() + switch v := a.(type) { + case *AlertingRule: + br, ok := b.(*AlertingRule) + if !ok { + return fmt.Errorf("rule %q supposed to be of type AlertingRule", b.ID()) + } + return compareAlertingRules(t, v, br) + case *RecordingRule: + br, ok := b.(*RecordingRule) + if !ok { + return fmt.Errorf("rule %q supposed to be of type RecordingRule", b.ID()) + } + return compareRecordingRules(t, v, br) + default: + return fmt.Errorf("unexpected rule type received %T", a) + } +} + +func compareRecordingRules(t *testing.T, a, b *RecordingRule) error { + t.Helper() + if a.Expr != b.Expr { + return fmt.Errorf("expected to have expression %q; got %q", a.Expr, b.Expr) + } + if !reflect.DeepEqual(a.Labels, b.Labels) { + return fmt.Errorf("expected to have labels %#v; got %#v", a.Labels, b.Labels) + } + return nil +} + +func compareAlertingRules(t *testing.T, a, b *AlertingRule) error { + t.Helper() + if a.Expr != b.Expr { + return fmt.Errorf("expected to have expression %q; got %q", a.Expr, b.Expr) + } + if a.For != b.For { + return fmt.Errorf("expected to have for %q; got %q", a.For, b.For) + } + if !reflect.DeepEqual(a.Annotations, b.Annotations) { + return fmt.Errorf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations) + } + if !reflect.DeepEqual(a.Labels, b.Labels) { + return fmt.Errorf("expected to have labels %#v; got %#v", a.Labels, b.Labels) + } + return nil +} + +func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error { + t.Helper() + if len(a) != len(b) { + return fmt.Errorf("expected number of timeseries %d; got %d", len(a), len(b)) + } + for i := range a { + expTS, gotTS := a[i], b[i] + if len(expTS.Samples) != len(gotTS.Samples) { + return fmt.Errorf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples)) + } + for i, exp := range expTS.Samples { + got := gotTS.Samples[i] + if got.Value != exp.Value { + return fmt.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value) + } + // timestamp validation isn't always correct for now. + // this must be improved with time mock. + /*if got.Timestamp != exp.Timestamp { + return fmt.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp) + }*/ + } + if len(expTS.Labels) != len(gotTS.Labels) { + return fmt.Errorf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels)) + } + for i, exp := range expTS.Labels { + got := gotTS.Labels[i] + if got.Name != exp.Name { + return fmt.Errorf("expected label name %q; got %q", exp.Name, got.Name) + } + if got.Value != exp.Value { + return fmt.Errorf("expected label value %q; got %q", exp.Value, got.Value) + } + } + } + return nil +} diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 35f9a150a..c21737f77 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -42,7 +42,9 @@ absolute path to all .yaml files in root.`) " in form of timeseries. E.g. http://127.0.0.1:8428") remoteWriteUsername = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username for -remoteWrite.url") remoteWritePassword = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password for -remoteWrite.url") - remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 10e3, "Defines the max number of pending datapoints to remote write endpoint") + remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 1e5, "Defines the max number of pending datapoints to remote write endpoint") + remoteWriteMaxBatchSize = flag.Int("remoteWrite.maxBatchSize", 1e3, "Defines defines max number of timeseries to be flushed at once") + remoteWriteConcurrency = flag.Int("remoteWrite.concurrency", 1, "Defines number of readers that concurrently write into remote storage") remoteReadURL = flag.String("remoteRead.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+ " state. This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state."+ @@ -52,7 +54,7 @@ absolute path to all .yaml files in root.`) remoteReadLookBack = flag.Duration("remoteRead.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries."+ " For example, if lookback=1h then range from now() to now()-1h will be scanned.") - evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules. Default 1m") + evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules") notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") ) @@ -81,7 +83,9 @@ func main() { if *remoteWriteURL != "" { c, err := remotewrite.NewClient(ctx, remotewrite.Config{ Addr: *remoteWriteURL, + Concurrency: *remoteWriteConcurrency, MaxQueueSize: *remoteWriteMaxQueueSize, + MaxBatchSize: *remoteWriteMaxBatchSize, FlushInterval: *evaluationInterval, BasicAuthUser: *remoteWriteUsername, BasicAuthPass: *remoteWritePassword, diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index 4429ca8d3..cad0278a2 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -6,12 +6,14 @@ import ( "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) +// manager controls group states type manager struct { storage datasource.Querier notifier notifier.Notifier @@ -25,7 +27,7 @@ type manager struct { groups map[uint64]*Group } -// AlertAPI generates APIAlert object from alert by its id(hash) +// AlertAPI generates APIAlert object from alert by its ID(hash) func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { m.groupsMu.RLock() defer m.groupsMu.RUnlock() @@ -35,11 +37,15 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { return nil, fmt.Errorf("can't find group with id %q", gID) } for _, rule := range g.Rules { - if apiAlert := rule.AlertAPI(aID); apiAlert != nil { + ar, ok := rule.(*AlertingRule) + if !ok { + continue + } + if apiAlert := ar.AlertAPI(aID); apiAlert != nil { return apiAlert, nil } } - return nil, fmt.Errorf("can't func alert with id %q in group %q", aID, g.Name) + return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name) } func (m *manager) start(ctx context.Context, path []string, validate bool) error { @@ -56,7 +62,7 @@ func (m *manager) close() { m.wg.Wait() } -func (m *manager) startGroup(ctx context.Context, group Group, restore bool) { +func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) { if restore && m.rr != nil { err := group.Restore(ctx, m.rr, *remoteReadLookBack) if err != nil { @@ -67,21 +73,22 @@ func (m *manager) startGroup(ctx context.Context, group Group, restore bool) { m.wg.Add(1) id := group.ID() go func() { - group.start(ctx, *evaluationInterval, m.storage, m.notifier, m.rw) + group.start(ctx, m.storage, m.notifier, m.rw) m.wg.Done() }() - m.groups[id] = &group + m.groups[id] = group } func (m *manager) update(ctx context.Context, path []string, validate, restore bool) error { - logger.Infof("reading alert rules configuration file from %q", strings.Join(path, ";")) - newGroups, err := Parse(path, validate) + logger.Infof("reading rules configuration file from %q", strings.Join(path, ";")) + groupsCfg, err := config.Parse(path, validate) if err != nil { return fmt.Errorf("cannot parse configuration file: %s", err) } - groupsRegistry := make(map[uint64]Group) - for _, ng := range newGroups { + groupsRegistry := make(map[uint64]*Group) + for _, cfg := range groupsCfg { + ng := newGroup(cfg, *evaluationInterval) groupsRegistry[ng.ID()] = ng } @@ -106,3 +113,22 @@ func (m *manager) update(ctx context.Context, path []string, validate, restore b m.groupsMu.Unlock() return nil } + +func (g *Group) toAPI() APIGroup { + ag := APIGroup{ + // encode as strings to avoid rounding + ID: fmt.Sprintf("%d", g.ID()), + Name: g.Name, + File: g.File, + Interval: g.Interval.String(), + } + for _, r := range g.Rules { + switch v := r.(type) { + case *AlertingRule: + ag.AlertingRules = append(ag.AlertingRules, v.RuleAPI()) + case *RecordingRule: + ag.RecordingRules = append(ag.RecordingRules, v.RuleAPI()) + } + } + return ag +} diff --git a/app/vmalert/manager_test.go b/app/vmalert/manager_test.go index 1e96e6b49..21239818f 100644 --- a/app/vmalert/manager_test.go +++ b/app/vmalert/manager_test.go @@ -3,12 +3,22 @@ package main import ( "context" "math/rand" + "net/url" + "os" "strings" "sync" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" ) +func TestMain(m *testing.M) { + u, _ := url.Parse("https://victoriametrics.com/path") + notifier.InitTemplateFunc(u) + os.Exit(m.Run()) +} + func TestManagerUpdateError(t *testing.T) { m := &manager{groups: make(map[uint64]*Group)} path := []string{"foo/bar"} @@ -32,9 +42,13 @@ func TestManagerUpdateConcurrent(t *testing.T) { notifier: &fakeNotifier{}, } paths := []string{ - "testdata/dir/rules0-good.rules", - "testdata/dir/rules1-good.rules", - "testdata/rules0-good.rules", + "config/testdata/dir/rules0-good.rules", + "config/testdata/dir/rules0-bad.rules", + "config/testdata/dir/rules1-good.rules", + "config/testdata/dir/rules1-bad.rules", + "config/testdata/rules0-good.rules", + "config/testdata/rules1-good.rules", + "config/testdata/rules2-good.rules", } *evaluationInterval = time.Millisecond if err := m.start(context.Background(), []string{paths[0]}, true); err != nil { @@ -51,10 +65,7 @@ func TestManagerUpdateConcurrent(t *testing.T) { for i := 0; i < iterations; i++ { rnd := rand.Intn(len(paths)) path := []string{paths[rnd]} - err := m.update(context.Background(), path, true, false) - if err != nil { - t.Errorf("update error: %s", err) - } + _ = m.update(context.Background(), path, true, false) } }() } @@ -64,6 +75,41 @@ func TestManagerUpdateConcurrent(t *testing.T) { // TestManagerUpdate tests sequential configuration // updates. func TestManagerUpdate(t *testing.T) { + const defaultEvalInterval = time.Second * 30 + currentEvalInterval := *evaluationInterval + *evaluationInterval = defaultEvalInterval + defer func() { + *evaluationInterval = currentEvalInterval + }() + + var ( + VMRows = &AlertingRule{ + Name: "VMRows", + Expr: "vm_rows > 0", + For: 10 * time.Second, + Labels: map[string]string{ + "label": "bar", + "host": "{{ $labels.instance }}", + }, + Annotations: map[string]string{ + "summary": "{{ $value|humanize }}", + "description": "{{$labels}}", + }, + } + Conns = &AlertingRule{ + Name: "Conns", + Expr: "sum(vm_tcplistener_conns) by(instance) > 1", + Annotations: map[string]string{ + "summary": "Too high connection number for {{$labels.instance}}", + "description": "It is {{ $value }} connections for {{$labels.instance}}", + }, + } + ExampleAlertAlwaysFiring = &AlertingRule{ + Name: "ExampleAlertAlwaysFiring", + Expr: "sum by(job) (up == 1)", + } + ) + testCases := []struct { name string initPath string @@ -72,49 +118,65 @@ func TestManagerUpdate(t *testing.T) { }{ { name: "update good rules", - initPath: "testdata/rules0-good.rules", - updatePath: "testdata/dir/rules1-good.rules", + initPath: "config/testdata/rules0-good.rules", + updatePath: "config/testdata/dir/rules1-good.rules", want: []*Group{ { - File: "testdata/dir/rules1-good.rules", - Name: "duplicatedGroupDiffFiles", - Rules: []*Rule{newTestRule("VMRows", time.Second*10)}, + File: "config/testdata/dir/rules1-good.rules", + Name: "duplicatedGroupDiffFiles", + Interval: defaultEvalInterval, + Rules: []Rule{ + &AlertingRule{ + Name: "VMRows", + Expr: "vm_rows > 0", + For: 5 * time.Minute, + Labels: map[string]string{"label": "bar"}, + Annotations: map[string]string{ + "summary": "{{ $value }}", + "description": "{{$labels}}", + }, + }, + }, }, }, }, { name: "update good rules from 1 to 2 groups", - initPath: "testdata/dir/rules1-good.rules", - updatePath: "testdata/rules0-good.rules", + initPath: "config/testdata/dir/rules1-good.rules", + updatePath: "config/testdata/rules0-good.rules", want: []*Group{ { - File: "testdata/rules0-good.rules", - Name: "groupGorSingleAlert", Rules: []*Rule{ - newTestRule("VMRows", time.Second*10), - }}, + File: "config/testdata/rules0-good.rules", + Name: "groupGorSingleAlert", + Rules: []Rule{VMRows}, + Interval: defaultEvalInterval, + }, { - File: "testdata/rules0-good.rules", - Name: "TestGroup", Rules: []*Rule{ - newTestRule("Conns", time.Duration(0)), - newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), + File: "config/testdata/rules0-good.rules", + Interval: defaultEvalInterval, + Name: "TestGroup", Rules: []Rule{ + Conns, + ExampleAlertAlwaysFiring, }}, }, }, { name: "update with one bad rule file", - initPath: "testdata/rules0-good.rules", - updatePath: "testdata/dir/rules2-bad.rules", + initPath: "config/testdata/rules0-good.rules", + updatePath: "config/testdata/dir/rules2-bad.rules", want: []*Group{ { - File: "testdata/rules0-good.rules", - Name: "groupGorSingleAlert", Rules: []*Rule{ - newTestRule("VMRows", time.Second*10), - }}, + File: "config/testdata/rules0-good.rules", + Name: "groupGorSingleAlert", + Interval: defaultEvalInterval, + Rules: []Rule{VMRows}, + }, { - File: "testdata/rules0-good.rules", - Name: "TestGroup", Rules: []*Rule{ - newTestRule("Conns", time.Duration(0)), - newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), + File: "config/testdata/rules0-good.rules", + Interval: defaultEvalInterval, + Name: "TestGroup", Rules: []Rule{ + Conns, + ExampleAlertAlwaysFiring, }}, }, }, @@ -139,7 +201,7 @@ func TestManagerUpdate(t *testing.T) { if !ok { t.Fatalf("expected to have group %q", wantG.Name) } - compareGroups(t, gotG, wantG) + compareGroups(t, wantG, gotG) } cancel() @@ -147,17 +209,3 @@ func TestManagerUpdate(t *testing.T) { }) } } - -func compareGroups(t *testing.T, a, b *Group) { - t.Helper() - if len(a.Rules) != len(b.Rules) { - t.Fatalf("expected group %s to have %d rules; got: %d", - a.Name, len(a.Rules), len(b.Rules)) - } - for i, r := range a.Rules { - got, want := r, b.Rules[i] - if got.Name != want.Name { - t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name) - } - } -} diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index b1cd05018..985bd47d5 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -87,7 +87,7 @@ func templateAnnotations(annotations map[string]string, header string, data aler builder.WriteString(header) builder.WriteString(text) if err := templateAnnotation(&buf, builder.String(), data); err != nil { - eg.errs = append(eg.errs, fmt.Sprintf("key %s, template %s:%s", key, text, err)) + eg.errs = append(eg.errs, fmt.Sprintf("key %q, template %q: %s", key, text, err)) continue } r[key] = buf.String() @@ -98,10 +98,10 @@ func templateAnnotations(annotations map[string]string, header string, data aler func templateAnnotation(dst io.Writer, text string, data alertTplData) error { tpl, err := template.New("").Funcs(tmplFunc).Option("missingkey=zero").Parse(text) if err != nil { - return fmt.Errorf("error parsing annotation:%w", err) + return fmt.Errorf("error parsing annotation: %w", err) } if err = tpl.Execute(dst, data); err != nil { - return fmt.Errorf("error evaluating annotation template:%w", err) + return fmt.Errorf("error evaluating annotation template: %w", err) } return nil } diff --git a/app/vmalert/notifier/alertmanager_request.qtpl.go b/app/vmalert/notifier/alertmanager_request.qtpl.go index af4ac2ea5..188fc3f37 100644 --- a/app/vmalert/notifier/alertmanager_request.qtpl.go +++ b/app/vmalert/notifier/alertmanager_request.qtpl.go @@ -1,131 +1,131 @@ // Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT. // See https://github.com/valyala/quicktemplate for details. -//line app/vmalert/notifier/alertmanager_request.qtpl:1 +//line notifier/alertmanager_request.qtpl:1 package notifier -//line app/vmalert/notifier/alertmanager_request.qtpl:1 +//line notifier/alertmanager_request.qtpl:1 import ( "strconv" "time" ) -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 import ( qtio422016 "io" qt422016 "github.com/valyala/quicktemplate" ) -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 var ( _ = qtio422016.Copy _ = qt422016.AcquireByteBuffer ) -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) { -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 qw422016.N().S(`[`) -//line app/vmalert/notifier/alertmanager_request.qtpl:9 +//line notifier/alertmanager_request.qtpl:9 for i, alert := range alerts { -//line app/vmalert/notifier/alertmanager_request.qtpl:9 +//line notifier/alertmanager_request.qtpl:9 qw422016.N().S(`{"startsAt":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:11 +//line notifier/alertmanager_request.qtpl:11 qw422016.N().Q(alert.Start.Format(time.RFC3339Nano)) -//line app/vmalert/notifier/alertmanager_request.qtpl:11 +//line notifier/alertmanager_request.qtpl:11 qw422016.N().S(`,"generatorURL":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:12 +//line notifier/alertmanager_request.qtpl:12 qw422016.N().Q(generatorURL(strconv.FormatUint(alert.GroupID, 10), strconv.FormatUint(alert.ID, 10))) -//line app/vmalert/notifier/alertmanager_request.qtpl:12 +//line notifier/alertmanager_request.qtpl:12 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:13 +//line notifier/alertmanager_request.qtpl:13 if !alert.End.IsZero() { -//line app/vmalert/notifier/alertmanager_request.qtpl:13 +//line notifier/alertmanager_request.qtpl:13 qw422016.N().S(`"endsAt":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:14 +//line notifier/alertmanager_request.qtpl:14 qw422016.N().Q(alert.End.Format(time.RFC3339Nano)) -//line app/vmalert/notifier/alertmanager_request.qtpl:14 +//line notifier/alertmanager_request.qtpl:14 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:15 +//line notifier/alertmanager_request.qtpl:15 } -//line app/vmalert/notifier/alertmanager_request.qtpl:15 +//line notifier/alertmanager_request.qtpl:15 qw422016.N().S(`"labels": {"alertname":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:17 +//line notifier/alertmanager_request.qtpl:17 qw422016.N().Q(alert.Name) -//line app/vmalert/notifier/alertmanager_request.qtpl:18 +//line notifier/alertmanager_request.qtpl:18 for k, v := range alert.Labels { -//line app/vmalert/notifier/alertmanager_request.qtpl:18 +//line notifier/alertmanager_request.qtpl:18 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:19 +//line notifier/alertmanager_request.qtpl:19 qw422016.N().Q(k) -//line app/vmalert/notifier/alertmanager_request.qtpl:19 +//line notifier/alertmanager_request.qtpl:19 qw422016.N().S(`:`) -//line app/vmalert/notifier/alertmanager_request.qtpl:19 +//line notifier/alertmanager_request.qtpl:19 qw422016.N().Q(v) -//line app/vmalert/notifier/alertmanager_request.qtpl:20 +//line notifier/alertmanager_request.qtpl:20 } -//line app/vmalert/notifier/alertmanager_request.qtpl:20 +//line notifier/alertmanager_request.qtpl:20 qw422016.N().S(`},"annotations": {`) -//line app/vmalert/notifier/alertmanager_request.qtpl:23 +//line notifier/alertmanager_request.qtpl:23 c := len(alert.Annotations) -//line app/vmalert/notifier/alertmanager_request.qtpl:24 +//line notifier/alertmanager_request.qtpl:24 for k, v := range alert.Annotations { -//line app/vmalert/notifier/alertmanager_request.qtpl:25 +//line notifier/alertmanager_request.qtpl:25 c = c - 1 -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().Q(k) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().S(`:`) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().Q(v) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 if c > 0 { -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 } -//line app/vmalert/notifier/alertmanager_request.qtpl:27 +//line notifier/alertmanager_request.qtpl:27 } -//line app/vmalert/notifier/alertmanager_request.qtpl:27 +//line notifier/alertmanager_request.qtpl:27 qw422016.N().S(`}}`) -//line app/vmalert/notifier/alertmanager_request.qtpl:30 +//line notifier/alertmanager_request.qtpl:30 if i != len(alerts)-1 { -//line app/vmalert/notifier/alertmanager_request.qtpl:30 +//line notifier/alertmanager_request.qtpl:30 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:30 +//line notifier/alertmanager_request.qtpl:30 } -//line app/vmalert/notifier/alertmanager_request.qtpl:31 +//line notifier/alertmanager_request.qtpl:31 } -//line app/vmalert/notifier/alertmanager_request.qtpl:31 +//line notifier/alertmanager_request.qtpl:31 qw422016.N().S(`]`) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 } -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) { -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 streamamRequest(qw422016, alerts, generatorURL) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qt422016.ReleaseWriter(qw422016) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 } -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 func amRequest(alerts []Alert, generatorURL func(string, string) string) string { -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 writeamRequest(qb422016, alerts, generatorURL) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qs422016 := string(qb422016.B) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 return qs422016 -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 } diff --git a/app/vmalert/notifier/utils.go b/app/vmalert/notifier/utils.go index 8cf7c901c..4bce9485a 100644 --- a/app/vmalert/notifier/utils.go +++ b/app/vmalert/notifier/utils.go @@ -17,5 +17,5 @@ func (eg *errGroup) err() error { } func (eg *errGroup) Error() string { - return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n")) + return fmt.Sprintf("errors: %s", strings.Join(eg.errs, "\n")) } diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go new file mode 100644 index 000000000..348041d22 --- /dev/null +++ b/app/vmalert/recording.go @@ -0,0 +1,151 @@ +package main + +import ( + "context" + "errors" + "fmt" + "hash/fnv" + "sort" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +// RecordingRule is a Rule that supposed +// to evaluate configured Expression and +// return TimeSeries as result. +type RecordingRule struct { + Name string + Expr string + Labels map[string]string + GroupID uint64 + + // guard status fields + mu sync.RWMutex + // stores last moment of time Exec was called + lastExecTime time.Time + // stores last error that happened in Exec func + // resets on every successful Exec + // may be used as Health state + lastExecError error +} + +// String implements Stringer interface +func (rr *RecordingRule) String() string { + return rr.Name +} + +// ID returns unique Rule ID +// within the parent Group. +func (rr *RecordingRule) ID() uint64 { + hash := fnv.New64a() + hash.Write([]byte("alerting")) + hash.Write([]byte("\xff")) + hash.Write([]byte(rr.Name)) + return hash.Sum64() +} + +func newRecordingRule(gID uint64, cfg config.Rule) *RecordingRule { + return &RecordingRule{ + Name: cfg.Record, + Expr: cfg.Expr, + Labels: cfg.Labels, + GroupID: gID, + } +} + +var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels") + +// Exec executes RecordingRule expression via the given Querier. +func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { + if !series { + return nil, nil + } + + qMetrics, err := q.Query(ctx, rr.Expr) + + rr.mu.Lock() + defer rr.mu.Unlock() + + rr.lastExecTime = time.Now() + rr.lastExecError = err + if err != nil { + return nil, fmt.Errorf("failed to execute query %q: %s", rr.Expr, err) + } + + duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics)) + var tss []prompbmarshal.TimeSeries + for _, r := range qMetrics { + ts := rr.toTimeSeries(r, rr.lastExecTime) + h := hashTimeSeries(ts) + if _, ok := duplicates[h]; ok { + rr.lastExecError = errDuplicate + return nil, errDuplicate + } + duplicates[h] = ts + tss = append(tss, ts) + } + return tss, nil +} + +func hashTimeSeries(ts prompbmarshal.TimeSeries) uint64 { + hash := fnv.New64a() + labels := ts.Labels + sort.Slice(labels, func(i, j int) bool { + return labels[i].Name < labels[j].Name + }) + for _, l := range labels { + hash.Write([]byte(l.Name)) + hash.Write([]byte(l.Value)) + hash.Write([]byte("\xff")) + } + return hash.Sum64() +} + +func (rr *RecordingRule) toTimeSeries(m datasource.Metric, timestamp time.Time) prompbmarshal.TimeSeries { + labels := make(map[string]string) + for _, l := range m.Labels { + labels[l.Name] = l.Value + } + labels["__name__"] = rr.Name + // override existing labels with configured ones + for k, v := range rr.Labels { + labels[k] = v + } + return newTimeSeries(m.Value, labels, timestamp) +} + +// copy all significant fields. +// alerts state isn't copied since +// it should be updated in next 2 Execs +func (rr *RecordingRule) UpdateWith(r Rule) error { + nr, ok := r.(*RecordingRule) + if !ok { + return fmt.Errorf("BUG: attempt to update recroding rule with wrong type %#v", r) + } + rr.Expr = nr.Expr + rr.Labels = nr.Labels + return nil +} + +// RuleAPI returns Rule representation in form +// of APIRecordingRule +func (rr *RecordingRule) RuleAPI() APIRecordingRule { + var lastErr string + if rr.lastExecError != nil { + lastErr = rr.lastExecError.Error() + } + return APIRecordingRule{ + // encode as strings to avoid rounding + ID: fmt.Sprintf("%d", rr.ID()), + GroupID: fmt.Sprintf("%d", rr.GroupID), + Name: rr.Name, + Expression: rr.Expr, + LastError: lastErr, + LastExec: rr.lastExecTime, + Labels: rr.Labels, + } +} diff --git a/app/vmalert/recording_test.go b/app/vmalert/recording_test.go new file mode 100644 index 000000000..1ab35295c --- /dev/null +++ b/app/vmalert/recording_test.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestRecoridngRule_ToTimeSeries(t *testing.T) { + timestamp := time.Now() + testCases := []struct { + rule *RecordingRule + metrics []datasource.Metric + expTS []prompbmarshal.TimeSeries + }{ + { + &RecordingRule{Name: "foo"}, + []datasource.Metric{metricWithValueAndLabels(t, 10, + "__name__", "bar", + )}, + []prompbmarshal.TimeSeries{ + newTimeSeries(10, map[string]string{ + "__name__": "foo", + }, timestamp), + }, + }, + { + &RecordingRule{Name: "foobarbaz"}, + []datasource.Metric{ + metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"), + metricWithValueAndLabels(t, 2, "__name__", "bar", "job", "bar"), + metricWithValueAndLabels(t, 3, "__name__", "baz", "job", "baz"), + }, + []prompbmarshal.TimeSeries{ + newTimeSeries(1, map[string]string{ + "__name__": "foobarbaz", + "job": "foo", + }, timestamp), + newTimeSeries(2, map[string]string{ + "__name__": "foobarbaz", + "job": "bar", + }, timestamp), + newTimeSeries(3, map[string]string{ + "__name__": "foobarbaz", + "job": "baz", + }, timestamp), + }, + }, + { + &RecordingRule{Name: "job:foo", Labels: map[string]string{ + "source": "test", + }}, + []datasource.Metric{ + metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"), + metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar")}, + []prompbmarshal.TimeSeries{ + newTimeSeries(2, map[string]string{ + "__name__": "job:foo", + "job": "foo", + "source": "test", + }, timestamp), + newTimeSeries(1, map[string]string{ + "__name__": "job:foo", + "job": "bar", + "source": "test", + }, timestamp), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.rule.Name, func(t *testing.T) { + fq := &fakeQuerier{} + fq.add(tc.metrics...) + tss, err := tc.rule.Exec(context.TODO(), fq, true) + if err != nil { + t.Fatalf("unexpected Exec err: %s", err) + } + if err := compareTimeSeries(t, tc.expTS, tss); err != nil { + t.Fatalf("timeseries missmatch: %s", err) + } + }) + } +} + +func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) { + rr := &RecordingRule{Name: "job:foo", Labels: map[string]string{ + "job": "test", + }} + + fq := &fakeQuerier{} + expErr := "connection reset by peer" + fq.setErr(errors.New(expErr)) + + _, err := rr.Exec(context.TODO(), fq, true) + if err == nil { + t.Fatalf("expected to get err; got nil") + } + if !strings.Contains(err.Error(), expErr) { + t.Fatalf("expected to get err %q; got %q insterad", expErr, err) + } + + fq.reset() + + // add metrics which differs only by `job` label + // which will be overridden by rule + fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo")) + fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar")) + + _, err = rr.Exec(context.TODO(), fq, true) + if err == nil { + t.Fatalf("expected to get err; got nil") + } + if !strings.Contains(err.Error(), errDuplicate.Error()) { + t.Fatalf("expected to get err %q; got %q insterad", errDuplicate, err) + } +} diff --git a/app/vmalert/remotewrite/remotewrite.go b/app/vmalert/remotewrite/remotewrite.go index b586a2261..8ac8733ae 100644 --- a/app/vmalert/remotewrite/remotewrite.go +++ b/app/vmalert/remotewrite/remotewrite.go @@ -38,11 +38,15 @@ type Config struct { BasicAuthUser string BasicAuthPass string + // Concurrency defines number of readers that + // concurrently read from the queue and flush data + Concurrency int // MaxBatchSize defines max number of timeseries // to be flushed at once MaxBatchSize int // MaxQueueSize defines max length of input queue - // populated by Push method + // populated by Push method. + // Push will be rejected once queue is full. MaxQueueSize int // FlushInterval defines time interval for flushing batches FlushInterval time.Duration @@ -52,9 +56,10 @@ type Config struct { } const ( + defaultConcurrency = 4 defaultMaxBatchSize = 1e3 - defaultMaxQueueSize = 100 - defaultFlushInterval = 5 * time.Second + defaultMaxQueueSize = 1e5 + defaultFlushInterval = time.Second defaultWriteTimeout = 30 * time.Second ) @@ -90,7 +95,13 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) { doneCh: make(chan struct{}), input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize), } - c.run(ctx) + cc := defaultConcurrency + if cfg.Concurrency > 0 { + cc = cfg.Concurrency + } + for i := 0; i < cc; i++ { + c.run(ctx) + } return c, nil } @@ -128,7 +139,10 @@ func (c *Client) run(ctx context.Context) { for ts := range c.input { wr.Timeseries = append(wr.Timeseries, ts) } - lastCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + if len(wr.Timeseries) < 1 { + return + } + lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout) c.flush(lastCtx, wr) cancel() } diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index c604361cd..e5498ac37 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -2,339 +2,23 @@ package main import ( "context" - "errors" - "fmt" - "hash/fnv" - "sort" - "strconv" - "sync" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/metricsql" ) -// Rule is basic alert entity -type Rule struct { - Name string `yaml:"alert"` - Expr string `yaml:"expr"` - For time.Duration `yaml:"for"` - Labels map[string]string `yaml:"labels"` - Annotations map[string]string `yaml:"annotations"` - - group Group - - // guard status fields - mu sync.RWMutex - // stores list of active alerts - alerts map[uint64]*notifier.Alert - // stores last moment of time Exec was called - lastExecTime time.Time - // stores last error that happened in Exec func - // resets on every successful Exec - // may be used as Health state - lastExecError error -} - -func (r *Rule) id() string { - return r.Name -} - -// Validate validates rule -func (r *Rule) Validate() error { - if r.Name == "" { - return errors.New("rule name can not be empty") - } - if r.Expr == "" { - return fmt.Errorf("expression for rule %q can't be empty", r.Name) - } - if _, err := metricsql.Parse(r.Expr); err != nil { - return fmt.Errorf("invalid expression for rule %q: %w", r.Name, err) - } - return nil -} - -// Exec executes Rule expression via the given Querier. -// Based on the Querier results Rule maintains notifier.Alerts -func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error { - qMetrics, err := q.Query(ctx, r.Expr) - r.mu.Lock() - defer r.mu.Unlock() - - r.lastExecError = err - r.lastExecTime = time.Now() - if err != nil { - return fmt.Errorf("failed to execute query %q: %s", r.Expr, err) - } - - for h, a := range r.alerts { - // cleanup inactive alerts from previous Exec - if a.State == notifier.StateInactive { - delete(r.alerts, h) - } - } - - updated := make(map[uint64]struct{}) - // update list of active alerts - for _, m := range qMetrics { - h := hash(m) - updated[h] = struct{}{} - if a, ok := r.alerts[h]; ok { - if a.Value != m.Value { - // update Value field with latest value - a.Value = m.Value - // and re-exec template since Value can be used - // in templates - err = r.template(a) - if err != nil { - return err - } - } - continue - } - a, err := r.newAlert(m) - if err != nil { - r.lastExecError = err - return fmt.Errorf("failed to create alert: %s", err) - } - a.ID = h - a.State = notifier.StatePending - r.alerts[h] = a - } - - for h, a := range r.alerts { - // if alert wasn't updated in this iteration - // means it is resolved already - if _, ok := updated[h]; !ok { - if a.State == notifier.StatePending { - // alert was in Pending state - it is not - // active anymore - delete(r.alerts, h) - continue - } - a.State = notifier.StateInactive - continue - } - if a.State == notifier.StatePending && time.Since(a.Start) >= r.For { - a.State = notifier.StateFiring - alertsFired.Inc() - } - } - return nil -} - -// TODO: consider hashing algorithm in VM -func hash(m datasource.Metric) uint64 { - hash := fnv.New64a() - labels := m.Labels - sort.Slice(labels, func(i, j int) bool { - return labels[i].Name < labels[j].Name - }) - for _, l := range labels { - // drop __name__ to be consistent with Prometheus alerting - if l.Name == "__name__" { - continue - } - hash.Write([]byte(l.Name)) - hash.Write([]byte(l.Value)) - hash.Write([]byte("\xff")) - } - return hash.Sum64() -} - -func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) { - a := ¬ifier.Alert{ - GroupID: r.group.ID(), - Name: r.Name, - Labels: map[string]string{}, - Value: m.Value, - Start: time.Now(), - Expr: r.Expr, - // TODO: support End time - } - for _, l := range m.Labels { - // drop __name__ to be consistent with Prometheus alerting - if l.Name == "__name__" { - continue - } - a.Labels[l.Name] = l.Value - } - return a, r.template(a) -} - -func (r *Rule) template(a *notifier.Alert) error { - // 1. template rule labels with data labels - rLabels, err := a.ExecTemplate(r.Labels) - if err != nil { - return err - } - - // 2. merge data labels and rule labels - // metric labels may be overridden by - // rule labels - for k, v := range rLabels { - a.Labels[k] = v - } - - // 3. template merged labels - a.Labels, err = a.ExecTemplate(a.Labels) - if err != nil { - return err - } - - a.Annotations, err = a.ExecTemplate(r.Annotations) - return err -} - -// AlertAPI generates APIAlert object from alert by its id(hash) -func (r *Rule) AlertAPI(id uint64) *APIAlert { - r.mu.RLock() - defer r.mu.RUnlock() - a, ok := r.alerts[id] - if !ok { - return nil - } - return r.newAlertAPI(*a) -} - -// AlertsAPI generates list of APIAlert objects from existing alerts -func (r *Rule) AlertsAPI() []*APIAlert { - var alerts []*APIAlert - r.mu.RLock() - for _, a := range r.alerts { - alerts = append(alerts, r.newAlertAPI(*a)) - } - r.mu.RUnlock() - return alerts -} - -func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert { - return &APIAlert{ - // encode as strings to avoid rounding - ID: fmt.Sprintf("%d", a.ID), - GroupID: fmt.Sprintf("%d", a.GroupID), - - Name: a.Name, - Expression: r.Expr, - Labels: a.Labels, - Annotations: a.Annotations, - State: a.State.String(), - ActiveAt: a.Start, - Value: strconv.FormatFloat(a.Value, 'e', -1, 64), - } -} - -const ( - // AlertMetricName is the metric name for synthetic alert timeseries. - alertMetricName = "ALERTS" - // AlertForStateMetricName is the metric name for 'for' state of alert. - alertForStateMetricName = "ALERTS_FOR_STATE" - - // AlertNameLabel is the label name indicating the name of an alert. - alertNameLabel = "alertname" - // AlertStateLabel is the label name indicating the state of an alert. - alertStateLabel = "alertstate" -) - -// AlertToTimeSeries converts the given alert with the given timestamp to timeseries -func (r *Rule) AlertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries { - var tss []prompbmarshal.TimeSeries - tss = append(tss, alertToTimeSeries(r.Name, a, timestamp)) - if r.For > 0 { - tss = append(tss, alertForToTimeSeries(r.Name, a, timestamp)) - } - return tss -} - -func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { - labels := make(map[string]string) - for k, v := range a.Labels { - labels[k] = v - } - labels["__name__"] = alertMetricName - labels[alertNameLabel] = name - labels[alertStateLabel] = a.State.String() - return newTimeSeries(1, labels, timestamp) -} - -// alertForToTimeSeries returns a timeseries that represents -// state of active alerts, where value is time when alert become active -func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries { - labels := make(map[string]string) - for k, v := range a.Labels { - labels[k] = v - } - labels["__name__"] = alertForStateMetricName - labels[alertNameLabel] = name - return newTimeSeries(float64(a.Start.Unix()), labels, timestamp) -} - -func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries { - ts := prompbmarshal.TimeSeries{} - ts.Samples = append(ts.Samples, prompbmarshal.Sample{ - Value: value, - Timestamp: timestamp.UnixNano() / 1e6, - }) - keys := make([]string, 0, len(labels)) - for k := range labels { - keys = append(keys, k) - } - sort.Strings(keys) - for _, key := range keys { - ts.Labels = append(ts.Labels, prompbmarshal.Label{ - Name: key, - Value: labels[key], - }) - } - return ts -} - -// Restore restores the state of active alerts basing on previously written timeseries. -// Restore restores only Start field. Field State will be always Pending and supposed -// to be updated on next Exec, as well as Value field. -func (r *Rule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error { - if q == nil { - return fmt.Errorf("querier is nil") - } - - // Get the last datapoint in range via MetricsQL `last_over_time`. - // We don't use plain PromQL since Prometheus doesn't support - // remote write protocol which is used for state persistence in vmalert. - expr := fmt.Sprintf("last_over_time(%s{alertname=%q}[%ds])", - alertForStateMetricName, r.Name, int(lookback.Seconds())) - qMetrics, err := q.Query(ctx, expr) - if err != nil { - return err - } - - for _, m := range qMetrics { - labels := m.Labels - m.Labels = make([]datasource.Label, 0) - // drop all extra labels, so hash key will - // be identical to timeseries received in Exec - for _, l := range labels { - if l.Name == alertNameLabel { - continue - } - // drop all overridden labels - if _, ok := r.Labels[l.Name]; ok { - continue - } - m.Labels = append(m.Labels, l) - } - - a, err := r.newAlert(m) - if err != nil { - return fmt.Errorf("failed to create alert: %s", err) - } - a.ID = hash(m) - a.State = notifier.StatePending - a.Start = time.Unix(int64(m.Value), 0) - r.alerts[a.ID] = a - logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start) - } - return nil +// Rule represents alerting or recording rule +// that has unique ID, can be Executed and +// updated with other Rule. +type Rule interface { + // Returns unique ID that may be used for + // identifying this Rule among others. + ID() uint64 + // Exec executes the rule with given context + // and Querier. If returnSeries is true, Exec + // may return TimeSeries as result of execution + Exec(ctx context.Context, q datasource.Querier, returnSeries bool) ([]prompbmarshal.TimeSeries, error) + // UpdateWith performs modification of current Rule + // with fields of the given Rule. + UpdateWith(Rule) error } diff --git a/app/vmalert/utils.go b/app/vmalert/utils.go new file mode 100644 index 000000000..dcf46f8ec --- /dev/null +++ b/app/vmalert/utils.go @@ -0,0 +1,27 @@ +package main + +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "sort" + "time" +) + +func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries { + ts := prompbmarshal.TimeSeries{} + ts.Samples = append(ts.Samples, prompbmarshal.Sample{ + Value: value, + Timestamp: timestamp.UnixNano() / 1e6, + }) + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + ts.Labels = append(ts.Labels, prompbmarshal.Label{ + Name: key, + Value: labels[key], + }) + } + return ts +} diff --git a/app/vmalert/web.go b/app/vmalert/web.go index 4146f3d48..d271fd555 100644 --- a/app/vmalert/web.go +++ b/app/vmalert/web.go @@ -7,32 +7,18 @@ import ( "sort" "strconv" "strings" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" ) -// APIAlert represents an notifier.Alert state -// for WEB view -type APIAlert struct { - ID string `json:"id"` - Name string `json:"name"` - GroupID string `json:"group_id"` - Expression string `json:"expression"` - State string `json:"state"` - Value string `json:"value"` - Labels map[string]string `json:"labels"` - Annotations map[string]string `json:"annotations"` - ActiveAt time.Time `json:"activeAt"` -} - type requestHandler struct { m *manager } var pathList = [][]string{ + {"/api/v1/groups", "list all loaded groups and rules"}, {"/api/v1/alerts", "list all active alerts"}, {"/api/v1/groupID/alertID/status", "get alert status by ID"}, // /metrics is served by httpserver by default @@ -49,8 +35,11 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { fmt.Fprintf(w, "%q - %s
", p, p, doc) } return true + case "/api/v1/groups": + resph.handle(rh.listGroups()) + return true case "/api/v1/alerts": - resph.handle(rh.list()) + resph.handle(rh.listAlerts()) return true case "/-/reload": logger.Infof("api config reload was called, sending sighup") @@ -67,6 +56,37 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { } } +type listGroupsResponse struct { + Data struct { + Groups []APIGroup `json:"groups"` + } `json:"data"` + Status string `json:"status"` +} + +func (rh *requestHandler) listGroups() ([]byte, error) { + rh.m.groupsMu.RLock() + defer rh.m.groupsMu.RUnlock() + + lr := listGroupsResponse{Status: "success"} + for _, g := range rh.m.groups { + lr.Data.Groups = append(lr.Data.Groups, g.toAPI()) + } + + // sort list of alerts for deterministic output + sort.Slice(lr.Data.Groups, func(i, j int) bool { + return lr.Data.Groups[i].Name < lr.Data.Groups[j].Name + }) + + b, err := json.Marshal(lr) + if err != nil { + return nil, &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf(`error encoding list of active alerts: %s`, err), + StatusCode: http.StatusInternalServerError, + } + } + return b, nil +} + type listAlertsResponse struct { Data struct { Alerts []*APIAlert `json:"alerts"` @@ -74,13 +94,18 @@ type listAlertsResponse struct { Status string `json:"status"` } -func (rh *requestHandler) list() ([]byte, error) { +func (rh *requestHandler) listAlerts() ([]byte, error) { rh.m.groupsMu.RLock() defer rh.m.groupsMu.RUnlock() + lr := listAlertsResponse{Status: "success"} for _, g := range rh.m.groups { for _, r := range g.Rules { - lr.Data.Alerts = append(lr.Data.Alerts, r.AlertsAPI()...) + a, ok := r.(*AlertingRule) + if !ok { + continue + } + lr.Data.Alerts = append(lr.Data.Alerts, a.AlertsAPI()...) } } diff --git a/app/vmalert/web_test.go b/app/vmalert/web_test.go index 012a334c5..e8e820a54 100644 --- a/app/vmalert/web_test.go +++ b/app/vmalert/web_test.go @@ -11,7 +11,7 @@ import ( ) func TestHandler(t *testing.T) { - rule := &Rule{ + ar := &AlertingRule{ Name: "alert", alerts: map[uint64]*notifier.Alert{ 0: {}, @@ -19,7 +19,7 @@ func TestHandler(t *testing.T) { } g := &Group{ Name: "group", - Rules: []*Rule{rule}, + Rules: []Rule{ar}, } m := &manager{groups: make(map[uint64]*Group)} m.groups[0] = g @@ -54,10 +54,17 @@ func TestHandler(t *testing.T) { t.Errorf("expected 1 alert got %d", length) } }) + t.Run("/api/v1/groups", func(t *testing.T) { + lr := listGroupsResponse{} + getResp(ts.URL+"/api/v1/groups", &lr, 200) + if length := len(lr.Data.Groups); length != 1 { + t.Errorf("expected 1 group got %d", length) + } + }) t.Run("/api/v1/0/0/status", func(t *testing.T) { alert := &APIAlert{} getResp(ts.URL+"/api/v1/0/0/status", alert, 200) - expAlert := rule.newAlertAPI(*rule.alerts[0]) + expAlert := ar.newAlertAPI(*ar.alerts[0]) if !reflect.DeepEqual(alert, expAlert) { t.Errorf("expected %v is equal to %v", alert, expAlert) } diff --git a/app/vmalert/web_types.go b/app/vmalert/web_types.go new file mode 100644 index 000000000..ebc395ab1 --- /dev/null +++ b/app/vmalert/web_types.go @@ -0,0 +1,53 @@ +package main + +import ( + "time" +) + +// APIAlert represents an notifier.AlertingRule state +// for WEB view +type APIAlert struct { + ID string `json:"id"` + Name string `json:"name"` + GroupID string `json:"group_id"` + Expression string `json:"expression"` + State string `json:"state"` + Value string `json:"value"` + Labels map[string]string `json:"labels"` + Annotations map[string]string `json:"annotations"` + ActiveAt time.Time `json:"activeAt"` +} + +// APIGroup represents Group for WEB view +type APIGroup struct { + Name string `json:"name"` + ID string `json:"id"` + File string `json:"file"` + Interval string `json:"interval"` + AlertingRules []APIAlertingRule `json:"alerting_rules"` + RecordingRules []APIRecordingRule `json:"recording_rules"` +} + +// APIAlertingRule represents AlertingRule for WEB view +type APIAlertingRule struct { + ID string `json:"id"` + Name string `json:"name"` + GroupID string `json:"group_id"` + Expression string `json:"expression"` + For string `json:"for"` + LastError string `json:"last_error"` + LastExec time.Time `json:"last_exec"` + Labels map[string]string `json:"labels"` + Annotations map[string]string `json:"annotations"` +} + +// APIRecordingRule represents RecordingRule for WEB view +type APIRecordingRule struct { + ID string `json:"id"` + Name string `json:"name"` + GroupID string `json:"group_id"` + Expression string `json:"expression"` + LastError string `json:"last_error"` + LastExec time.Time `json:"last_exec"` + Labels map[string]string `json:"labels"` +}