mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vmalert-491: allow to configure concurrent rules execution per group. (#542)
The feature allows to speed up group rules execution by executing them concurrently. Change also contains README changes to reflect configuration details.
This commit is contained in:
parent
ffa75c423d
commit
3e277020a5
10 changed files with 272 additions and 129 deletions
|
@ -13,6 +13,15 @@ rules against configured address.
|
||||||
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
|
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
|
||||||
* Lightweight without extra dependencies.
|
* Lightweight without extra dependencies.
|
||||||
|
|
||||||
|
### Limitations:
|
||||||
|
* `vmalert` execute queries against remote datasource which has reliability risks because of network.
|
||||||
|
It is recommended to configure alerts thresholds and rules expressions with understanding that network request
|
||||||
|
may fail;
|
||||||
|
* by default, rules execution is sequential within one group, but persisting of execution results to remote
|
||||||
|
storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous
|
||||||
|
recording rule is reused in next one;
|
||||||
|
* `vmalert` has no UI, just an API for getting groups and rules statuses.
|
||||||
|
|
||||||
### QuickStart
|
### QuickStart
|
||||||
|
|
||||||
To build `vmalert` from sources:
|
To build `vmalert` from sources:
|
||||||
|
@ -28,6 +37,8 @@ To start using `vmalert` you will need the following things:
|
||||||
* datasource address - reachable VictoriaMetrics instance for rules execution;
|
* datasource address - reachable VictoriaMetrics instance for rules execution;
|
||||||
* notifier address - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing,
|
* notifier address - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing,
|
||||||
aggregating alerts and sending notifications.
|
aggregating alerts and sending notifications.
|
||||||
|
* remote write address - [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations)
|
||||||
|
compatible storage address for storing recording rules results and alerts state in for of timeseries. This is optional.
|
||||||
|
|
||||||
Then configure `vmalert` accordingly:
|
Then configure `vmalert` accordingly:
|
||||||
```
|
```
|
||||||
|
@ -36,21 +47,96 @@ Then configure `vmalert` accordingly:
|
||||||
-notifier.url=http://localhost:9093
|
-notifier.url=http://localhost:9093
|
||||||
```
|
```
|
||||||
|
|
||||||
Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata).
|
Configuration for [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/)
|
||||||
|
and [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) rules is very
|
||||||
|
similar to Prometheus rules and configured using YAML. Configuration examples may be found
|
||||||
|
in [testdata](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata) folder.
|
||||||
|
Every `rule` belongs to `group` and every configuration file may contain arbitrary number of groups:
|
||||||
|
```yaml
|
||||||
|
groups:
|
||||||
|
[ - <rule_group> ]
|
||||||
|
```
|
||||||
|
|
||||||
`vmalert` may be configured with `-remoteWrite` flag to write recording rules and
|
#### Groups
|
||||||
alerts state in form of timeseries via remote write protocol. Alerts state will be written
|
|
||||||
as `ALERTS` timeseries. These timeseries may be used to recover alerts state on `vmalert`
|
|
||||||
restarts if `-remoteRead` is configured.
|
|
||||||
|
|
||||||
`vmalert` runs evaluation for every group in a separate goroutine.
|
Each group has following attributes:
|
||||||
Rules in group evaluated one-by-one sequentially.
|
```yaml
|
||||||
|
# The name of the group. Must be unique within a file.
|
||||||
|
name: <string>
|
||||||
|
|
||||||
**Important:** while recording rules execution is sequential, writing of timeseries results to remote
|
# How often rules in the group are evaluated.
|
||||||
storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous
|
[ interval: <duration> | default = global.evaluation_interval ]
|
||||||
recording rule is reused in next one.
|
|
||||||
|
|
||||||
`vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
|
# How many rules execute at once. Increasing concurrency may speed
|
||||||
|
# up round execution speed.
|
||||||
|
[ concurrency: <integer> | default = 1 ]
|
||||||
|
|
||||||
|
rules:
|
||||||
|
[ - <rule> ... ]
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Rules
|
||||||
|
|
||||||
|
There are two types of Rules:
|
||||||
|
* [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) -
|
||||||
|
Alerting rules allows to define alert conditions via [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL)
|
||||||
|
and to send notifications about firing alerts to [Alertmanager](https://github.com/prometheus/alertmanager).
|
||||||
|
* [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) -
|
||||||
|
Recording rules allow you to precompute frequently needed or computationally expensive expressions
|
||||||
|
and save their result as a new set of time series.
|
||||||
|
|
||||||
|
##### Alerting rules
|
||||||
|
|
||||||
|
The syntax for alerting rule is following:
|
||||||
|
```yaml
|
||||||
|
# The name of the alert. Must be a valid metric name.
|
||||||
|
alert: <string>
|
||||||
|
|
||||||
|
# The MetricsQL expression to evaluate.
|
||||||
|
expr: <string>
|
||||||
|
|
||||||
|
# Alerts are considered firing once they have been returned for this long.
|
||||||
|
# Alerts which have not yet fired for long enough are considered pending.
|
||||||
|
[ for: <duration> | default = 0s ]
|
||||||
|
|
||||||
|
# Labels to add or overwrite for each alert.
|
||||||
|
labels:
|
||||||
|
[ <labelname>: <tmpl_string> ]
|
||||||
|
|
||||||
|
# Annotations to add to each alert.
|
||||||
|
annotations:
|
||||||
|
[ <labelname>: <tmpl_string> ]
|
||||||
|
```
|
||||||
|
|
||||||
|
`vmalert` has no local storage and alerts state is stored in process memory. Hence, after reloading of `vmalert` process
|
||||||
|
alerts state will be lost. To avoid this situation, `vmalert` may be configured via following flags:
|
||||||
|
* `-remoteWrite.url` - URL to Victoria Metrics or VMInsert. `vmalert` will persist alerts state into the configured
|
||||||
|
address in form of timeseries with name `ALERTS` via remote-write protocol.
|
||||||
|
* `-remoteRead.url` - URL to Victoria Metrics or VMSelect. `vmalert` will try to restore alerts state from configured
|
||||||
|
address by querying `ALERTS` timeseries.
|
||||||
|
|
||||||
|
|
||||||
|
##### Recording rules
|
||||||
|
|
||||||
|
The syntax for recording rules is following:
|
||||||
|
```yaml
|
||||||
|
# The name of the time series to output to. Must be a valid metric name.
|
||||||
|
record: <string>
|
||||||
|
|
||||||
|
# The MetricsQL expression to evaluate.
|
||||||
|
expr: <string>
|
||||||
|
|
||||||
|
# Labels to add or overwrite before storing the result.
|
||||||
|
labels:
|
||||||
|
[ <labelname>: <labelvalue> ]
|
||||||
|
```
|
||||||
|
|
||||||
|
For recording rules to work `-remoteWrite.url` must specified.
|
||||||
|
|
||||||
|
|
||||||
|
#### WEB
|
||||||
|
|
||||||
|
`vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
|
||||||
* `http://<vmalert-addr>/api/v1/groups` - list of all loaded groups and rules;
|
* `http://<vmalert-addr>/api/v1/groups` - list of all loaded groups and rules;
|
||||||
* `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts;
|
* `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts;
|
||||||
* `http://<vmalert-addr>/api/v1/<groupName>/<alertID>/status" ` - get alert status by ID.
|
* `http://<vmalert-addr>/api/v1/<groupName>/<alertID>/status" ` - get alert status by ID.
|
||||||
|
@ -58,6 +144,7 @@ Used as alert source in AlertManager.
|
||||||
* `http://<vmalert-addr>/metrics` - application metrics.
|
* `http://<vmalert-addr>/metrics` - application metrics.
|
||||||
* `http://<vmalert-addr>/-/reload` - hot configuration reload.
|
* `http://<vmalert-addr>/-/reload` - hot configuration reload.
|
||||||
|
|
||||||
|
|
||||||
### Configuration
|
### Configuration
|
||||||
|
|
||||||
The shortlist of configuration flags is the following:
|
The shortlist of configuration flags is the following:
|
||||||
|
|
|
@ -101,6 +101,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAlertingRule_Exec(t *testing.T) {
|
func TestAlertingRule_Exec(t *testing.T) {
|
||||||
|
const defaultStep = 5 * time.Millisecond
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
rule *AlertingRule
|
rule *AlertingRule
|
||||||
steps [][]datasource.Metric
|
steps [][]datasource.Metric
|
||||||
|
@ -240,7 +241,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
newTestAlertingRule("for-fired", time.Millisecond),
|
newTestAlertingRule("for-fired", defaultStep),
|
||||||
[][]datasource.Metric{
|
[][]datasource.Metric{
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
|
@ -260,7 +261,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
||||||
map[uint64]*notifier.Alert{},
|
map[uint64]*notifier.Alert{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
newTestAlertingRule("for-pending=>firing=>inactive", time.Millisecond),
|
newTestAlertingRule("for-pending=>firing=>inactive", defaultStep),
|
||||||
[][]datasource.Metric{
|
[][]datasource.Metric{
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
|
@ -272,10 +273,10 @@ func TestAlertingRule_Exec(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
newTestAlertingRule("for-pending=>firing=>inactive=>pending", time.Millisecond),
|
newTestAlertingRule("for-pending=>firing=>inactive=>pending", defaultStep),
|
||||||
[][]datasource.Metric{
|
[][]datasource.Metric{
|
||||||
//{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
//{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
// empty step to reset pending alerts
|
// empty step to reset pending alerts
|
||||||
{},
|
{},
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
|
@ -285,7 +286,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond),
|
newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", defaultStep),
|
||||||
[][]datasource.Metric{
|
[][]datasource.Metric{
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
{metricWithLabels(t, "name", "foo")},
|
{metricWithLabels(t, "name", "foo")},
|
||||||
|
@ -311,7 +312,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
||||||
t.Fatalf("unexpected err: %s", err)
|
t.Fatalf("unexpected err: %s", err)
|
||||||
}
|
}
|
||||||
// artificial delay between applying steps
|
// artificial delay between applying steps
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(defaultStep)
|
||||||
}
|
}
|
||||||
if len(tc.rule.alerts) != len(tc.expAlerts) {
|
if len(tc.rule.alerts) != len(tc.expAlerts) {
|
||||||
t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts))
|
t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts))
|
||||||
|
|
|
@ -15,10 +15,11 @@ import (
|
||||||
// Group contains list of Rules grouped into
|
// Group contains list of Rules grouped into
|
||||||
// entity with one name and evaluation interval
|
// entity with one name and evaluation interval
|
||||||
type Group struct {
|
type Group struct {
|
||||||
File string
|
File string
|
||||||
Name string `yaml:"name"`
|
Name string `yaml:"name"`
|
||||||
Interval time.Duration `yaml:"interval,omitempty"`
|
Interval time.Duration `yaml:"interval,omitempty"`
|
||||||
Rules []Rule `yaml:"rules"`
|
Rules []Rule `yaml:"rules"`
|
||||||
|
Concurrency int `yaml:"concurrency"`
|
||||||
|
|
||||||
// Catches all undefined fields and must be empty after parsing.
|
// Catches all undefined fields and must be empty after parsing.
|
||||||
XXX map[string]interface{} `yaml:",inline"`
|
XXX map[string]interface{} `yaml:",inline"`
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
groups:
|
groups:
|
||||||
- name: TestGroup
|
- name: TestGroup
|
||||||
interval: 2s
|
interval: 2s
|
||||||
|
concurrency: 2
|
||||||
rules:
|
rules:
|
||||||
- alert: Conns
|
- alert: Conns
|
||||||
expr: sum(vm_tcplistener_conns) by(instance) > 1
|
expr: sum(vm_tcplistener_conns) by(instance) > 1
|
||||||
|
|
|
@ -17,31 +17,36 @@ import (
|
||||||
|
|
||||||
// Group is an entity for grouping rules
|
// Group is an entity for grouping rules
|
||||||
type Group struct {
|
type Group struct {
|
||||||
Name string
|
mu sync.RWMutex
|
||||||
File string
|
Name string
|
||||||
Rules []Rule
|
File string
|
||||||
Interval time.Duration
|
Rules []Rule
|
||||||
|
Interval time.Duration
|
||||||
|
Concurrency int
|
||||||
|
|
||||||
doneCh chan struct{}
|
doneCh chan struct{}
|
||||||
finishedCh chan struct{}
|
finishedCh chan struct{}
|
||||||
// channel accepts new Group obj
|
// channel accepts new Group obj
|
||||||
// which supposed to update current group
|
// which supposed to update current group
|
||||||
updateCh chan *Group
|
updateCh chan *Group
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGroup(cfg config.Group, defaultInterval time.Duration) *Group {
|
func newGroup(cfg config.Group, defaultInterval time.Duration) *Group {
|
||||||
g := &Group{
|
g := &Group{
|
||||||
Name: cfg.Name,
|
Name: cfg.Name,
|
||||||
File: cfg.File,
|
File: cfg.File,
|
||||||
Interval: cfg.Interval,
|
Interval: cfg.Interval,
|
||||||
doneCh: make(chan struct{}),
|
Concurrency: cfg.Concurrency,
|
||||||
finishedCh: make(chan struct{}),
|
doneCh: make(chan struct{}),
|
||||||
updateCh: make(chan *Group),
|
finishedCh: make(chan struct{}),
|
||||||
|
updateCh: make(chan *Group),
|
||||||
}
|
}
|
||||||
if g.Interval == 0 {
|
if g.Interval == 0 {
|
||||||
g.Interval = defaultInterval
|
g.Interval = defaultInterval
|
||||||
}
|
}
|
||||||
|
if g.Concurrency < 1 {
|
||||||
|
g.Concurrency = 1
|
||||||
|
}
|
||||||
rules := make([]Rule, len(cfg.Rules))
|
rules := make([]Rule, len(cfg.Rules))
|
||||||
for i, r := range cfg.Rules {
|
for i, r := range cfg.Rules {
|
||||||
rules[i] = g.newRule(r)
|
rules[i] = g.newRule(r)
|
||||||
|
@ -121,6 +126,7 @@ func (g *Group) updateWith(newGroup *Group) error {
|
||||||
for _, nr := range rulesRegistry {
|
for _, nr := range rulesRegistry {
|
||||||
newRules = append(newRules, nr)
|
newRules = append(newRules, nr)
|
||||||
}
|
}
|
||||||
|
g.Concurrency = newGroup.Concurrency
|
||||||
g.Rules = newRules
|
g.Rules = newRules
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -150,24 +156,18 @@ func (g *Group) close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
|
func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
|
||||||
logger.Infof("group %q started with interval %v", g.Name, g.Interval)
|
defer func() { close(g.finishedCh) }()
|
||||||
|
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
||||||
var returnSeries bool
|
e := &executor{querier, nr, rw}
|
||||||
if rw != nil {
|
|
||||||
returnSeries = true
|
|
||||||
}
|
|
||||||
|
|
||||||
t := time.NewTicker(g.Interval)
|
t := time.NewTicker(g.Interval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Infof("group %q: context cancelled", g.Name)
|
logger.Infof("group %q: context cancelled", g.Name)
|
||||||
close(g.finishedCh)
|
|
||||||
return
|
return
|
||||||
case <-g.doneCh:
|
case <-g.doneCh:
|
||||||
logger.Infof("group %q: received stop signal", g.Name)
|
logger.Infof("group %q: received stop signal", g.Name)
|
||||||
close(g.finishedCh)
|
|
||||||
return
|
return
|
||||||
case ng := <-g.updateCh:
|
case ng := <-g.updateCh:
|
||||||
g.mu.Lock()
|
g.mu.Lock()
|
||||||
|
@ -181,65 +181,115 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifi
|
||||||
g.Interval = ng.Interval
|
g.Interval = ng.Interval
|
||||||
t.Stop()
|
t.Stop()
|
||||||
t = time.NewTicker(g.Interval)
|
t = time.NewTicker(g.Interval)
|
||||||
logger.Infof("group %q: changed evaluation interval to %v", g.Name, g.Interval)
|
|
||||||
}
|
}
|
||||||
g.mu.Unlock()
|
g.mu.Unlock()
|
||||||
|
logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
iterationTotal.Inc()
|
iterationTotal.Inc()
|
||||||
iterationStart := time.Now()
|
iterationStart := time.Now()
|
||||||
for _, rule := range g.Rules {
|
|
||||||
execTotal.Inc()
|
|
||||||
|
|
||||||
execStart := time.Now()
|
|
||||||
tss, err := rule.Exec(ctx, querier, returnSeries)
|
|
||||||
execDuration.UpdateDuration(execStart)
|
|
||||||
|
|
||||||
|
errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, g.Interval)
|
||||||
|
for err := range errs {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
execErrors.Inc()
|
logger.Errorf("group %q: %s", g.Name, err)
|
||||||
logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(tss) > 0 {
|
|
||||||
remoteWriteSent.Add(len(tss))
|
|
||||||
for _, ts := range tss {
|
|
||||||
if err := rw.Push(ts); err != nil {
|
|
||||||
remoteWriteErrors.Inc()
|
|
||||||
logger.Errorf("failed to remote write for rule %q.%q: %s", g.Name, rule, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ar, ok := rule.(*AlertingRule)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var alerts []notifier.Alert
|
|
||||||
for _, a := range ar.alerts {
|
|
||||||
switch a.State {
|
|
||||||
case notifier.StateFiring:
|
|
||||||
// set End to execStart + 3 intervals
|
|
||||||
// so notifier can resolve it automatically if `vmalert`
|
|
||||||
// won't be able to send resolve for some reason
|
|
||||||
a.End = execStart.Add(3 * g.Interval)
|
|
||||||
alerts = append(alerts, *a)
|
|
||||||
case notifier.StateInactive:
|
|
||||||
// set End to execStart to notify
|
|
||||||
// that it was just resolved
|
|
||||||
a.End = execStart
|
|
||||||
alerts = append(alerts, *a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(alerts) < 1 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
alertsSent.Add(len(alerts))
|
|
||||||
if err := nr.Send(ctx, alerts); err != nil {
|
|
||||||
alertsSendErrors.Inc()
|
|
||||||
logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
iterationDuration.UpdateDuration(iterationStart)
|
iterationDuration.UpdateDuration(iterationStart)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type executor struct {
|
||||||
|
querier datasource.Querier
|
||||||
|
notifier notifier.Notifier
|
||||||
|
rw *remotewrite.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, interval time.Duration) chan error {
|
||||||
|
res := make(chan error, len(rules))
|
||||||
|
var returnSeries bool
|
||||||
|
if e.rw != nil {
|
||||||
|
returnSeries = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if concurrency == 1 {
|
||||||
|
// fast path
|
||||||
|
for _, rule := range rules {
|
||||||
|
res <- e.exec(ctx, rule, returnSeries, interval)
|
||||||
|
}
|
||||||
|
close(res)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
sem := make(chan struct{}, concurrency)
|
||||||
|
go func() {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for _, rule := range rules {
|
||||||
|
sem <- struct{}{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(r Rule) {
|
||||||
|
res <- e.exec(ctx, r, returnSeries, interval)
|
||||||
|
<-sem
|
||||||
|
wg.Done()
|
||||||
|
}(rule)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
close(res)
|
||||||
|
}()
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, interval time.Duration) error {
|
||||||
|
execTotal.Inc()
|
||||||
|
execStart := time.Now()
|
||||||
|
defer func() {
|
||||||
|
execDuration.UpdateDuration(execStart)
|
||||||
|
}()
|
||||||
|
|
||||||
|
tss, err := rule.Exec(ctx, e.querier, returnSeries)
|
||||||
|
if err != nil {
|
||||||
|
execErrors.Inc()
|
||||||
|
return fmt.Errorf("rule %q: failed to execute: %s", rule, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tss) > 0 && e.rw != nil {
|
||||||
|
remoteWriteSent.Add(len(tss))
|
||||||
|
for _, ts := range tss {
|
||||||
|
if err := e.rw.Push(ts); err != nil {
|
||||||
|
remoteWriteErrors.Inc()
|
||||||
|
return fmt.Errorf("rule %q: remote write failure: %s", rule, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ar, ok := rule.(*AlertingRule)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var alerts []notifier.Alert
|
||||||
|
for _, a := range ar.alerts {
|
||||||
|
switch a.State {
|
||||||
|
case notifier.StateFiring:
|
||||||
|
// set End to execStart + 3 intervals
|
||||||
|
// so notifier can resolve it automatically if `vmalert`
|
||||||
|
// won't be able to send resolve for some reason
|
||||||
|
a.End = time.Now().Add(3 * interval)
|
||||||
|
alerts = append(alerts, *a)
|
||||||
|
case notifier.StateInactive:
|
||||||
|
// set End to execStart to notify
|
||||||
|
// that it was just resolved
|
||||||
|
a.End = time.Now()
|
||||||
|
alerts = append(alerts, *a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(alerts) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
alertsSent.Add(len(alerts))
|
||||||
|
if err := e.notifier.Send(ctx, alerts); err != nil {
|
||||||
|
alertsSendErrors.Inc()
|
||||||
|
return fmt.Errorf("rule %q: failed to send alerts: %s", rule, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -139,6 +138,7 @@ func TestGroupStart(t *testing.T) {
|
||||||
}
|
}
|
||||||
const evalInterval = time.Millisecond
|
const evalInterval = time.Millisecond
|
||||||
g := newGroup(groups[0], evalInterval)
|
g := newGroup(groups[0], evalInterval)
|
||||||
|
g.Concurrency = 2
|
||||||
|
|
||||||
fn := &fakeNotifier{}
|
fn := &fakeNotifier{}
|
||||||
fs := &fakeQuerier{}
|
fs := &fakeQuerier{}
|
||||||
|
@ -192,34 +192,3 @@ func TestGroupStart(t *testing.T) {
|
||||||
g.close()
|
g.close()
|
||||||
<-finished
|
<-finished
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareAlerts(t *testing.T, as, bs []notifier.Alert) {
|
|
||||||
t.Helper()
|
|
||||||
if len(as) != len(bs) {
|
|
||||||
t.Fatalf("expected to have length %d; got %d", len(as), len(bs))
|
|
||||||
}
|
|
||||||
sort.Slice(as, func(i, j int) bool {
|
|
||||||
return as[i].ID < as[j].ID
|
|
||||||
})
|
|
||||||
sort.Slice(bs, func(i, j int) bool {
|
|
||||||
return bs[i].ID < bs[j].ID
|
|
||||||
})
|
|
||||||
for i := range as {
|
|
||||||
a, b := as[i], bs[i]
|
|
||||||
if a.Name != b.Name {
|
|
||||||
t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name)
|
|
||||||
}
|
|
||||||
if a.State != b.State {
|
|
||||||
t.Fatalf("expected t have State %q; got %q", a.State, b.State)
|
|
||||||
}
|
|
||||||
if a.Value != b.Value {
|
|
||||||
t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
|
|
||||||
t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(a.Labels, b.Labels) {
|
|
||||||
t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -198,3 +199,34 @@ func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func compareAlerts(t *testing.T, as, bs []notifier.Alert) {
|
||||||
|
t.Helper()
|
||||||
|
if len(as) != len(bs) {
|
||||||
|
t.Fatalf("expected to have length %d; got %d", len(as), len(bs))
|
||||||
|
}
|
||||||
|
sort.Slice(as, func(i, j int) bool {
|
||||||
|
return as[i].ID < as[j].ID
|
||||||
|
})
|
||||||
|
sort.Slice(bs, func(i, j int) bool {
|
||||||
|
return bs[i].ID < bs[j].ID
|
||||||
|
})
|
||||||
|
for i := range as {
|
||||||
|
a, b := as[i], bs[i]
|
||||||
|
if a.Name != b.Name {
|
||||||
|
t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name)
|
||||||
|
}
|
||||||
|
if a.State != b.State {
|
||||||
|
t.Fatalf("expected t have State %q; got %q", a.State, b.State)
|
||||||
|
}
|
||||||
|
if a.Value != b.Value {
|
||||||
|
t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
|
||||||
|
t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(a.Labels, b.Labels) {
|
||||||
|
t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -42,12 +42,12 @@ absolute path to all .yaml files in root.`)
|
||||||
basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password for -datasource.url")
|
basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password for -datasource.url")
|
||||||
|
|
||||||
remoteWriteURL = flag.String("remoteWrite.url", "", "Optional URL to Victoria Metrics or VMInsert where to persist alerts state"+
|
remoteWriteURL = flag.String("remoteWrite.url", "", "Optional URL to Victoria Metrics or VMInsert where to persist alerts state"+
|
||||||
" in form of timeseries. E.g. http://127.0.0.1:8428")
|
" and recording rules results in form of timeseries. E.g. http://127.0.0.1:8428")
|
||||||
remoteWriteUsername = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username for -remoteWrite.url")
|
remoteWriteUsername = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username for -remoteWrite.url")
|
||||||
remoteWritePassword = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password for -remoteWrite.url")
|
remoteWritePassword = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password for -remoteWrite.url")
|
||||||
remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 1e5, "Defines the max number of pending datapoints to remote write endpoint")
|
remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 1e5, "Defines the max number of pending datapoints to remote write endpoint")
|
||||||
remoteWriteMaxBatchSize = flag.Int("remoteWrite.maxBatchSize", 1e3, "Defines defines max number of timeseries to be flushed at once")
|
remoteWriteMaxBatchSize = flag.Int("remoteWrite.maxBatchSize", 1e3, "Defines defines max number of timeseries to be flushed at once")
|
||||||
remoteWriteConcurrency = flag.Int("remoteWrite.concurrency", 1, "Defines number of readers that concurrently write into remote storage")
|
remoteWriteConcurrency = flag.Int("remoteWrite.concurrency", 1, "Defines number of writers for concurrent writing into remote storage")
|
||||||
|
|
||||||
remoteReadURL = flag.String("remoteRead.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+
|
remoteReadURL = flag.String("remoteRead.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+
|
||||||
" state. This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state."+
|
" state. This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state."+
|
||||||
|
|
|
@ -117,10 +117,11 @@ func (m *manager) update(ctx context.Context, path []string, validateTpl, valida
|
||||||
func (g *Group) toAPI() APIGroup {
|
func (g *Group) toAPI() APIGroup {
|
||||||
ag := APIGroup{
|
ag := APIGroup{
|
||||||
// encode as strings to avoid rounding
|
// encode as strings to avoid rounding
|
||||||
ID: fmt.Sprintf("%d", g.ID()),
|
ID: fmt.Sprintf("%d", g.ID()),
|
||||||
Name: g.Name,
|
Name: g.Name,
|
||||||
File: g.File,
|
File: g.File,
|
||||||
Interval: g.Interval.String(),
|
Interval: g.Interval.String(),
|
||||||
|
Concurrency: g.Concurrency,
|
||||||
}
|
}
|
||||||
for _, r := range g.Rules {
|
for _, r := range g.Rules {
|
||||||
switch v := r.(type) {
|
switch v := r.(type) {
|
||||||
|
|
|
@ -24,6 +24,7 @@ type APIGroup struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
File string `json:"file"`
|
File string `json:"file"`
|
||||||
Interval string `json:"interval"`
|
Interval string `json:"interval"`
|
||||||
|
Concurrency int `json:"concurrency"`
|
||||||
AlertingRules []APIAlertingRule `json:"alerting_rules"`
|
AlertingRules []APIAlertingRule `json:"alerting_rules"`
|
||||||
RecordingRules []APIRecordingRule `json:"recording_rules"`
|
RecordingRules []APIRecordingRule `json:"recording_rules"`
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue