From 3e277020a59e042d1760c25532be88c3758359ed Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Tue, 9 Jun 2020 13:21:20 +0100 Subject: [PATCH] vmalert-491: allow to configure concurrent rules execution per group. (#542) The feature allows to speed up group rules execution by executing them concurrently. Change also contains README changes to reflect configuration details. --- app/vmalert/README.md | 109 +++++++++- app/vmalert/alerting_test.go | 15 +- app/vmalert/config/config.go | 9 +- app/vmalert/config/testdata/rules2-good.rules | 1 + app/vmalert/group.go | 188 +++++++++++------- app/vmalert/group_test.go | 33 +-- app/vmalert/helpers_test.go | 32 +++ app/vmalert/main.go | 4 +- app/vmalert/manager.go | 9 +- app/vmalert/web_types.go | 1 + 10 files changed, 272 insertions(+), 129 deletions(-) diff --git a/app/vmalert/README.md b/app/vmalert/README.md index 0f054b834..854bccbee 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -13,6 +13,15 @@ rules against configured address. * Integration with [Alertmanager](https://github.com/prometheus/alertmanager); * Lightweight without extra dependencies. +### Limitations: +* `vmalert` execute queries against remote datasource which has reliability risks because of network. +It is recommended to configure alerts thresholds and rules expressions with understanding that network request +may fail; +* by default, rules execution is sequential within one group, but persisting of execution results to remote +storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous +recording rule is reused in next one; +* `vmalert` has no UI, just an API for getting groups and rules statuses. + ### QuickStart To build `vmalert` from sources: @@ -28,6 +37,8 @@ To start using `vmalert` you will need the following things: * datasource address - reachable VictoriaMetrics instance for rules execution; * notifier address - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing, aggregating alerts and sending notifications. +* remote write address - [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations) +compatible storage address for storing recording rules results and alerts state in for of timeseries. This is optional. Then configure `vmalert` accordingly: ``` @@ -36,21 +47,96 @@ Then configure `vmalert` accordingly: -notifier.url=http://localhost:9093 ``` -Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata). +Configuration for [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) +and [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) rules is very +similar to Prometheus rules and configured using YAML. Configuration examples may be found +in [testdata](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata) folder. +Every `rule` belongs to `group` and every configuration file may contain arbitrary number of groups: +```yaml +groups: + [ - ] +``` -`vmalert` may be configured with `-remoteWrite` flag to write recording rules and -alerts state in form of timeseries via remote write protocol. Alerts state will be written -as `ALERTS` timeseries. These timeseries may be used to recover alerts state on `vmalert` -restarts if `-remoteRead` is configured. +#### Groups -`vmalert` runs evaluation for every group in a separate goroutine. -Rules in group evaluated one-by-one sequentially. +Each group has following attributes: +```yaml +# The name of the group. Must be unique within a file. +name: -**Important:** while recording rules execution is sequential, writing of timeseries results to remote -storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous -recording rule is reused in next one. +# How often rules in the group are evaluated. +[ interval: | default = global.evaluation_interval ] -`vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: +# How many rules execute at once. Increasing concurrency may speed +# up round execution speed. +[ concurrency: | default = 1 ] + +rules: + [ - ... ] +``` + +#### Rules + +There are two types of Rules: +* [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) - +Alerting rules allows to define alert conditions via [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL) +and to send notifications about firing alerts to [Alertmanager](https://github.com/prometheus/alertmanager). +* [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) - +Recording rules allow you to precompute frequently needed or computationally expensive expressions +and save their result as a new set of time series. + +##### Alerting rules + +The syntax for alerting rule is following: +```yaml +# The name of the alert. Must be a valid metric name. +alert: + +# The MetricsQL expression to evaluate. +expr: + +# Alerts are considered firing once they have been returned for this long. +# Alerts which have not yet fired for long enough are considered pending. +[ for: | default = 0s ] + +# Labels to add or overwrite for each alert. +labels: + [ : ] + +# Annotations to add to each alert. +annotations: + [ : ] +``` + +`vmalert` has no local storage and alerts state is stored in process memory. Hence, after reloading of `vmalert` process +alerts state will be lost. To avoid this situation, `vmalert` may be configured via following flags: +* `-remoteWrite.url` - URL to Victoria Metrics or VMInsert. `vmalert` will persist alerts state into the configured +address in form of timeseries with name `ALERTS` via remote-write protocol. +* `-remoteRead.url` - URL to Victoria Metrics or VMSelect. `vmalert` will try to restore alerts state from configured +address by querying `ALERTS` timeseries. + + +##### Recording rules + +The syntax for recording rules is following: +```yaml +# The name of the time series to output to. Must be a valid metric name. +record: + +# The MetricsQL expression to evaluate. +expr: + +# Labels to add or overwrite before storing the result. +labels: + [ : ] +``` + +For recording rules to work `-remoteWrite.url` must specified. + + +#### WEB + +`vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: * `http:///api/v1/groups` - list of all loaded groups and rules; * `http:///api/v1/alerts` - list of all active alerts; * `http:///api/v1///status" ` - get alert status by ID. @@ -58,6 +144,7 @@ Used as alert source in AlertManager. * `http:///metrics` - application metrics. * `http:///-/reload` - hot configuration reload. + ### Configuration The shortlist of configuration flags is the following: diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index a0f661f1f..ec9e56f86 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -101,6 +101,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) { } func TestAlertingRule_Exec(t *testing.T) { + const defaultStep = 5 * time.Millisecond testCases := []struct { rule *AlertingRule steps [][]datasource.Metric @@ -240,7 +241,7 @@ func TestAlertingRule_Exec(t *testing.T) { }, }, { - newTestAlertingRule("for-fired", time.Millisecond), + newTestAlertingRule("for-fired", defaultStep), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")}, @@ -260,7 +261,7 @@ func TestAlertingRule_Exec(t *testing.T) { map[uint64]*notifier.Alert{}, }, { - newTestAlertingRule("for-pending=>firing=>inactive", time.Millisecond), + newTestAlertingRule("for-pending=>firing=>inactive", defaultStep), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")}, @@ -272,10 +273,10 @@ func TestAlertingRule_Exec(t *testing.T) { }, }, { - newTestAlertingRule("for-pending=>firing=>inactive=>pending", time.Millisecond), + newTestAlertingRule("for-pending=>firing=>inactive=>pending", defaultStep), [][]datasource.Metric{ - //{metricWithLabels(t, "name", "foo")}, - //{metricWithLabels(t, "name", "foo")}, + {metricWithLabels(t, "name", "foo")}, + {metricWithLabels(t, "name", "foo")}, // empty step to reset pending alerts {}, {metricWithLabels(t, "name", "foo")}, @@ -285,7 +286,7 @@ func TestAlertingRule_Exec(t *testing.T) { }, }, { - newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond), + newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", defaultStep), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")}, @@ -311,7 +312,7 @@ func TestAlertingRule_Exec(t *testing.T) { t.Fatalf("unexpected err: %s", err) } // artificial delay between applying steps - time.Sleep(time.Millisecond) + time.Sleep(defaultStep) } if len(tc.rule.alerts) != len(tc.expAlerts) { t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts)) diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go index 9002ed270..56f5f7ee9 100644 --- a/app/vmalert/config/config.go +++ b/app/vmalert/config/config.go @@ -15,10 +15,11 @@ import ( // Group contains list of Rules grouped into // entity with one name and evaluation interval type Group struct { - File string - Name string `yaml:"name"` - Interval time.Duration `yaml:"interval,omitempty"` - Rules []Rule `yaml:"rules"` + File string + Name string `yaml:"name"` + Interval time.Duration `yaml:"interval,omitempty"` + Rules []Rule `yaml:"rules"` + Concurrency int `yaml:"concurrency"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` diff --git a/app/vmalert/config/testdata/rules2-good.rules b/app/vmalert/config/testdata/rules2-good.rules index ec82c43c6..a3fcea24d 100644 --- a/app/vmalert/config/testdata/rules2-good.rules +++ b/app/vmalert/config/testdata/rules2-good.rules @@ -1,6 +1,7 @@ groups: - name: TestGroup interval: 2s + concurrency: 2 rules: - alert: Conns expr: sum(vm_tcplistener_conns) by(instance) > 1 diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 0c1b97a08..0de9b10ce 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -17,31 +17,36 @@ import ( // Group is an entity for grouping rules type Group struct { - Name string - File string - Rules []Rule - Interval time.Duration + mu sync.RWMutex + Name string + File string + Rules []Rule + Interval time.Duration + Concurrency int doneCh chan struct{} finishedCh chan struct{} // channel accepts new Group obj // which supposed to update current group updateCh chan *Group - mu sync.RWMutex } func newGroup(cfg config.Group, defaultInterval time.Duration) *Group { g := &Group{ - Name: cfg.Name, - File: cfg.File, - Interval: cfg.Interval, - doneCh: make(chan struct{}), - finishedCh: make(chan struct{}), - updateCh: make(chan *Group), + Name: cfg.Name, + File: cfg.File, + Interval: cfg.Interval, + Concurrency: cfg.Concurrency, + doneCh: make(chan struct{}), + finishedCh: make(chan struct{}), + updateCh: make(chan *Group), } if g.Interval == 0 { g.Interval = defaultInterval } + if g.Concurrency < 1 { + g.Concurrency = 1 + } rules := make([]Rule, len(cfg.Rules)) for i, r := range cfg.Rules { rules[i] = g.newRule(r) @@ -121,6 +126,7 @@ func (g *Group) updateWith(newGroup *Group) error { for _, nr := range rulesRegistry { newRules = append(newRules, nr) } + g.Concurrency = newGroup.Concurrency g.Rules = newRules return nil } @@ -150,24 +156,18 @@ func (g *Group) close() { } func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) { - logger.Infof("group %q started with interval %v", g.Name, g.Interval) - - var returnSeries bool - if rw != nil { - returnSeries = true - } - + defer func() { close(g.finishedCh) }() + logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) + e := &executor{querier, nr, rw} t := time.NewTicker(g.Interval) defer t.Stop() for { select { case <-ctx.Done(): logger.Infof("group %q: context cancelled", g.Name) - close(g.finishedCh) return case <-g.doneCh: logger.Infof("group %q: received stop signal", g.Name) - close(g.finishedCh) return case ng := <-g.updateCh: g.mu.Lock() @@ -181,65 +181,115 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifi g.Interval = ng.Interval t.Stop() t = time.NewTicker(g.Interval) - logger.Infof("group %q: changed evaluation interval to %v", g.Name, g.Interval) } g.mu.Unlock() + logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) case <-t.C: iterationTotal.Inc() iterationStart := time.Now() - for _, rule := range g.Rules { - execTotal.Inc() - - execStart := time.Now() - tss, err := rule.Exec(ctx, querier, returnSeries) - execDuration.UpdateDuration(execStart) + errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, g.Interval) + for err := range errs { if err != nil { - execErrors.Inc() - logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule, err) - continue - } - - if len(tss) > 0 { - remoteWriteSent.Add(len(tss)) - for _, ts := range tss { - if err := rw.Push(ts); err != nil { - remoteWriteErrors.Inc() - logger.Errorf("failed to remote write for rule %q.%q: %s", g.Name, rule, err) - } - } - } - - ar, ok := rule.(*AlertingRule) - if !ok { - continue - } - var alerts []notifier.Alert - for _, a := range ar.alerts { - switch a.State { - case notifier.StateFiring: - // set End to execStart + 3 intervals - // so notifier can resolve it automatically if `vmalert` - // won't be able to send resolve for some reason - a.End = execStart.Add(3 * g.Interval) - alerts = append(alerts, *a) - case notifier.StateInactive: - // set End to execStart to notify - // that it was just resolved - a.End = execStart - alerts = append(alerts, *a) - } - } - if len(alerts) < 1 { - continue - } - alertsSent.Add(len(alerts)) - if err := nr.Send(ctx, alerts); err != nil { - alertsSendErrors.Inc() - logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule, err) + logger.Errorf("group %q: %s", g.Name, err) } } + iterationDuration.UpdateDuration(iterationStart) } } } + +type executor struct { + querier datasource.Querier + notifier notifier.Notifier + rw *remotewrite.Client +} + +func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, interval time.Duration) chan error { + res := make(chan error, len(rules)) + var returnSeries bool + if e.rw != nil { + returnSeries = true + } + + if concurrency == 1 { + // fast path + for _, rule := range rules { + res <- e.exec(ctx, rule, returnSeries, interval) + } + close(res) + return res + } + + sem := make(chan struct{}, concurrency) + go func() { + wg := sync.WaitGroup{} + for _, rule := range rules { + sem <- struct{}{} + wg.Add(1) + go func(r Rule) { + res <- e.exec(ctx, r, returnSeries, interval) + <-sem + wg.Done() + }(rule) + } + wg.Wait() + close(res) + }() + return res +} + +func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, interval time.Duration) error { + execTotal.Inc() + execStart := time.Now() + defer func() { + execDuration.UpdateDuration(execStart) + }() + + tss, err := rule.Exec(ctx, e.querier, returnSeries) + if err != nil { + execErrors.Inc() + return fmt.Errorf("rule %q: failed to execute: %s", rule, err) + } + + if len(tss) > 0 && e.rw != nil { + remoteWriteSent.Add(len(tss)) + for _, ts := range tss { + if err := e.rw.Push(ts); err != nil { + remoteWriteErrors.Inc() + return fmt.Errorf("rule %q: remote write failure: %s", rule, err) + } + } + } + + ar, ok := rule.(*AlertingRule) + if !ok { + return nil + } + var alerts []notifier.Alert + for _, a := range ar.alerts { + switch a.State { + case notifier.StateFiring: + // set End to execStart + 3 intervals + // so notifier can resolve it automatically if `vmalert` + // won't be able to send resolve for some reason + a.End = time.Now().Add(3 * interval) + alerts = append(alerts, *a) + case notifier.StateInactive: + // set End to execStart to notify + // that it was just resolved + a.End = time.Now() + alerts = append(alerts, *a) + } + } + if len(alerts) < 1 { + return nil + } + alertsSent.Add(len(alerts)) + if err := e.notifier.Send(ctx, alerts); err != nil { + alertsSendErrors.Inc() + return fmt.Errorf("rule %q: failed to send alerts: %s", rule, err) + } + return nil +} diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index dc2b53039..410be42d7 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -2,7 +2,6 @@ package main import ( "context" - "reflect" "sort" "testing" "time" @@ -139,6 +138,7 @@ func TestGroupStart(t *testing.T) { } const evalInterval = time.Millisecond g := newGroup(groups[0], evalInterval) + g.Concurrency = 2 fn := &fakeNotifier{} fs := &fakeQuerier{} @@ -192,34 +192,3 @@ func TestGroupStart(t *testing.T) { g.close() <-finished } - -func compareAlerts(t *testing.T, as, bs []notifier.Alert) { - t.Helper() - if len(as) != len(bs) { - t.Fatalf("expected to have length %d; got %d", len(as), len(bs)) - } - sort.Slice(as, func(i, j int) bool { - return as[i].ID < as[j].ID - }) - sort.Slice(bs, func(i, j int) bool { - return bs[i].ID < bs[j].ID - }) - for i := range as { - a, b := as[i], bs[i] - if a.Name != b.Name { - t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name) - } - if a.State != b.State { - t.Fatalf("expected t have State %q; got %q", a.State, b.State) - } - if a.Value != b.Value { - t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value) - } - if !reflect.DeepEqual(a.Annotations, b.Annotations) { - t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations) - } - if !reflect.DeepEqual(a.Labels, b.Labels) { - t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels) - } - } -} diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go index 2c2249bc2..e4fcc1d59 100644 --- a/app/vmalert/helpers_test.go +++ b/app/vmalert/helpers_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sort" "sync" "testing" @@ -198,3 +199,34 @@ func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error { } return nil } + +func compareAlerts(t *testing.T, as, bs []notifier.Alert) { + t.Helper() + if len(as) != len(bs) { + t.Fatalf("expected to have length %d; got %d", len(as), len(bs)) + } + sort.Slice(as, func(i, j int) bool { + return as[i].ID < as[j].ID + }) + sort.Slice(bs, func(i, j int) bool { + return bs[i].ID < bs[j].ID + }) + for i := range as { + a, b := as[i], bs[i] + if a.Name != b.Name { + t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name) + } + if a.State != b.State { + t.Fatalf("expected t have State %q; got %q", a.State, b.State) + } + if a.Value != b.Value { + t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value) + } + if !reflect.DeepEqual(a.Annotations, b.Annotations) { + t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations) + } + if !reflect.DeepEqual(a.Labels, b.Labels) { + t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels) + } + } +} diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 1686e6884..1d9c9db4b 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -42,12 +42,12 @@ absolute path to all .yaml files in root.`) basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password for -datasource.url") remoteWriteURL = flag.String("remoteWrite.url", "", "Optional URL to Victoria Metrics or VMInsert where to persist alerts state"+ - " in form of timeseries. E.g. http://127.0.0.1:8428") + " and recording rules results in form of timeseries. E.g. http://127.0.0.1:8428") remoteWriteUsername = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username for -remoteWrite.url") remoteWritePassword = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password for -remoteWrite.url") remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 1e5, "Defines the max number of pending datapoints to remote write endpoint") remoteWriteMaxBatchSize = flag.Int("remoteWrite.maxBatchSize", 1e3, "Defines defines max number of timeseries to be flushed at once") - remoteWriteConcurrency = flag.Int("remoteWrite.concurrency", 1, "Defines number of readers that concurrently write into remote storage") + remoteWriteConcurrency = flag.Int("remoteWrite.concurrency", 1, "Defines number of writers for concurrent writing into remote storage") remoteReadURL = flag.String("remoteRead.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+ " state. This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state."+ diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index 30928f0cb..ab8975189 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -117,10 +117,11 @@ func (m *manager) update(ctx context.Context, path []string, validateTpl, valida func (g *Group) toAPI() APIGroup { ag := APIGroup{ // encode as strings to avoid rounding - ID: fmt.Sprintf("%d", g.ID()), - Name: g.Name, - File: g.File, - Interval: g.Interval.String(), + ID: fmt.Sprintf("%d", g.ID()), + Name: g.Name, + File: g.File, + Interval: g.Interval.String(), + Concurrency: g.Concurrency, } for _, r := range g.Rules { switch v := r.(type) { diff --git a/app/vmalert/web_types.go b/app/vmalert/web_types.go index ebc395ab1..b26a20d2c 100644 --- a/app/vmalert/web_types.go +++ b/app/vmalert/web_types.go @@ -24,6 +24,7 @@ type APIGroup struct { ID string `json:"id"` File string `json:"file"` Interval string `json:"interval"` + Concurrency int `json:"concurrency"` AlertingRules []APIAlertingRule `json:"alerting_rules"` RecordingRules []APIRecordingRule `json:"recording_rules"` }