From d71b6e658409c4efde9d25b6c200263c23c90af5 Mon Sep 17 00:00:00 2001
From: Roman Khavronenko <hagen1778@gmail.com>
Date: Tue, 9 Jun 2020 13:21:20 +0100
Subject: [PATCH] vmalert-491: allow to configure concurrent rules execution
 per group. (#542)

The feature allows to speed up group rules execution by
executing them concurrently.

Change also contains README changes to reflect configuration
details.
---
 app/vmalert/README.md                         | 109 +++++++++-
 app/vmalert/alerting_test.go                  |  15 +-
 app/vmalert/config/config.go                  |   9 +-
 app/vmalert/config/testdata/rules2-good.rules |   1 +
 app/vmalert/group.go                          | 188 +++++++++++-------
 app/vmalert/group_test.go                     |  33 +--
 app/vmalert/helpers_test.go                   |  32 +++
 app/vmalert/main.go                           |   4 +-
 app/vmalert/manager.go                        |   9 +-
 app/vmalert/web_types.go                      |   1 +
 10 files changed, 272 insertions(+), 129 deletions(-)

diff --git a/app/vmalert/README.md b/app/vmalert/README.md
index 0f054b8340..854bccbee5 100644
--- a/app/vmalert/README.md
+++ b/app/vmalert/README.md
@@ -13,6 +13,15 @@ rules against configured address.
 * Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
 * Lightweight without extra dependencies.
 
+### Limitations:
+* `vmalert` execute queries against remote datasource which has reliability risks because of network. 
+It is recommended to configure alerts thresholds and rules expressions with understanding that network request
+may fail;
+* by default, rules execution is sequential within one group, but persisting of execution results to remote
+storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous
+recording rule is reused in next one;
+* `vmalert` has no UI, just an API for getting groups and rules statuses.
+
 ### QuickStart
 
 To build `vmalert` from sources:
@@ -28,6 +37,8 @@ To start using `vmalert` you will need the following things:
 * datasource address - reachable VictoriaMetrics instance for rules execution;
 * notifier address - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing, 
 aggregating alerts and sending notifications.
+* remote write address - [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations)
+compatible storage address for storing recording rules results and alerts state in for of timeseries. This is optional.
 
 Then configure `vmalert` accordingly:
 ```
@@ -36,21 +47,96 @@ Then configure `vmalert` accordingly:
         -notifier.url=http://localhost:9093
 ```
 
-Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata).
+Configuration for [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) 
+and [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) rules is very 
+similar to Prometheus rules and configured using YAML. Configuration examples may be found 
+in [testdata](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata) folder.
+Every `rule` belongs to `group` and every configuration file may contain arbitrary number of groups:
+```yaml
+groups:
+  [ - <rule_group> ]
+```
 
-`vmalert` may be configured with `-remoteWrite` flag to write recording rules and 
-alerts state in form of timeseries via remote write protocol. Alerts state will be written 
-as `ALERTS` timeseries. These timeseries may be used to recover alerts state on `vmalert` 
-restarts if `-remoteRead` is configured.
+#### Groups
 
-`vmalert` runs evaluation for every group in a separate goroutine.
-Rules in group evaluated one-by-one sequentially. 
+Each group has following attributes:
+```yaml
+# The name of the group. Must be unique within a file.
+name: <string>
 
-**Important:** while recording rules execution is sequential, writing of timeseries results to remote
-storage is asynchronous. Hence, user shouldn't rely on recording rules chaining when result of previous
-recording rule is reused in next one.
+# How often rules in the group are evaluated.
+[ interval: <duration> | default = global.evaluation_interval ]
 
-`vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
+# How many rules execute at once. Increasing concurrency may speed
+# up round execution speed. 
+[ concurrency: <integer> | default = 1 ]
+
+rules:
+  [ - <rule> ... ]
+```
+
+#### Rules
+
+There are two types of Rules:
+* [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) - 
+Alerting rules allows to define alert conditions via [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL)
+and to send notifications about firing alerts to [Alertmanager](https://github.com/prometheus/alertmanager).
+* [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) - 
+Recording rules allow you to precompute frequently needed or computationally expensive expressions 
+and save their result as a new set of time series.
+
+##### Alerting rules
+
+The syntax for alerting rule is following:
+```yaml
+# The name of the alert. Must be a valid metric name.
+alert: <string>
+
+# The MetricsQL expression to evaluate.
+expr: <string>
+
+# Alerts are considered firing once they have been returned for this long.
+# Alerts which have not yet fired for long enough are considered pending.
+[ for: <duration> | default = 0s ]
+
+# Labels to add or overwrite for each alert.
+labels:
+  [ <labelname>: <tmpl_string> ]
+
+# Annotations to add to each alert.
+annotations:
+  [ <labelname>: <tmpl_string> ]
+``` 
+
+`vmalert` has no local storage and alerts state is stored in process memory. Hence, after reloading of `vmalert` process
+alerts state will be lost. To avoid this situation, `vmalert` may be configured via following flags:
+* `-remoteWrite.url` - URL to Victoria Metrics or VMInsert. `vmalert` will persist alerts state into the configured
+address in form of timeseries with name `ALERTS` via remote-write protocol.
+* `-remoteRead.url` - URL to Victoria Metrics or VMSelect. `vmalert` will try to restore alerts state from configured
+address by querying `ALERTS` timeseries.
+
+
+##### Recording rules
+
+The syntax for recording rules is following:
+```yaml
+# The name of the time series to output to. Must be a valid metric name.
+record: <string>
+
+# The MetricsQL expression to evaluate.
+expr: <string>
+
+# Labels to add or overwrite before storing the result.
+labels:
+  [ <labelname>: <labelvalue> ]
+```
+
+For recording rules to work `-remoteWrite.url` must specified.
+
+
+#### WEB
+
+`vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
 * `http://<vmalert-addr>/api/v1/groups` - list of all loaded groups and rules;
 * `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts;
 * `http://<vmalert-addr>/api/v1/<groupName>/<alertID>/status" ` - get alert status by ID.
@@ -58,6 +144,7 @@ Used as alert source in AlertManager.
 * `http://<vmalert-addr>/metrics` - application metrics.
 * `http://<vmalert-addr>/-/reload` - hot configuration reload.
 
+
 ### Configuration
 
 The shortlist of configuration flags is the following:
diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go
index a0f661f1f2..ec9e56f862 100644
--- a/app/vmalert/alerting_test.go
+++ b/app/vmalert/alerting_test.go
@@ -101,6 +101,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
 }
 
 func TestAlertingRule_Exec(t *testing.T) {
+	const defaultStep = 5 * time.Millisecond
 	testCases := []struct {
 		rule      *AlertingRule
 		steps     [][]datasource.Metric
@@ -240,7 +241,7 @@ func TestAlertingRule_Exec(t *testing.T) {
 			},
 		},
 		{
-			newTestAlertingRule("for-fired", time.Millisecond),
+			newTestAlertingRule("for-fired", defaultStep),
 			[][]datasource.Metric{
 				{metricWithLabels(t, "name", "foo")},
 				{metricWithLabels(t, "name", "foo")},
@@ -260,7 +261,7 @@ func TestAlertingRule_Exec(t *testing.T) {
 			map[uint64]*notifier.Alert{},
 		},
 		{
-			newTestAlertingRule("for-pending=>firing=>inactive", time.Millisecond),
+			newTestAlertingRule("for-pending=>firing=>inactive", defaultStep),
 			[][]datasource.Metric{
 				{metricWithLabels(t, "name", "foo")},
 				{metricWithLabels(t, "name", "foo")},
@@ -272,10 +273,10 @@ func TestAlertingRule_Exec(t *testing.T) {
 			},
 		},
 		{
-			newTestAlertingRule("for-pending=>firing=>inactive=>pending", time.Millisecond),
+			newTestAlertingRule("for-pending=>firing=>inactive=>pending", defaultStep),
 			[][]datasource.Metric{
-				//{metricWithLabels(t, "name", "foo")},
-				//{metricWithLabels(t, "name", "foo")},
+				{metricWithLabels(t, "name", "foo")},
+				{metricWithLabels(t, "name", "foo")},
 				// empty step to reset pending alerts
 				{},
 				{metricWithLabels(t, "name", "foo")},
@@ -285,7 +286,7 @@ func TestAlertingRule_Exec(t *testing.T) {
 			},
 		},
 		{
-			newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond),
+			newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", defaultStep),
 			[][]datasource.Metric{
 				{metricWithLabels(t, "name", "foo")},
 				{metricWithLabels(t, "name", "foo")},
@@ -311,7 +312,7 @@ func TestAlertingRule_Exec(t *testing.T) {
 					t.Fatalf("unexpected err: %s", err)
 				}
 				// artificial delay between applying steps
-				time.Sleep(time.Millisecond)
+				time.Sleep(defaultStep)
 			}
 			if len(tc.rule.alerts) != len(tc.expAlerts) {
 				t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts))
diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go
index 9002ed2706..56f5f7ee94 100644
--- a/app/vmalert/config/config.go
+++ b/app/vmalert/config/config.go
@@ -15,10 +15,11 @@ import (
 // Group contains list of Rules grouped into
 // entity with one name and evaluation interval
 type Group struct {
-	File     string
-	Name     string        `yaml:"name"`
-	Interval time.Duration `yaml:"interval,omitempty"`
-	Rules    []Rule        `yaml:"rules"`
+	File        string
+	Name        string        `yaml:"name"`
+	Interval    time.Duration `yaml:"interval,omitempty"`
+	Rules       []Rule        `yaml:"rules"`
+	Concurrency int           `yaml:"concurrency"`
 
 	// Catches all undefined fields and must be empty after parsing.
 	XXX map[string]interface{} `yaml:",inline"`
diff --git a/app/vmalert/config/testdata/rules2-good.rules b/app/vmalert/config/testdata/rules2-good.rules
index ec82c43c69..a3fcea24dd 100644
--- a/app/vmalert/config/testdata/rules2-good.rules
+++ b/app/vmalert/config/testdata/rules2-good.rules
@@ -1,6 +1,7 @@
 groups:
   - name: TestGroup
     interval: 2s
+    concurrency: 2
     rules:
       - alert: Conns
         expr: sum(vm_tcplistener_conns) by(instance) > 1
diff --git a/app/vmalert/group.go b/app/vmalert/group.go
index 0c1b97a082..0de9b10cef 100644
--- a/app/vmalert/group.go
+++ b/app/vmalert/group.go
@@ -17,31 +17,36 @@ import (
 
 // Group is an entity for grouping rules
 type Group struct {
-	Name     string
-	File     string
-	Rules    []Rule
-	Interval time.Duration
+	mu          sync.RWMutex
+	Name        string
+	File        string
+	Rules       []Rule
+	Interval    time.Duration
+	Concurrency int
 
 	doneCh     chan struct{}
 	finishedCh chan struct{}
 	// channel accepts new Group obj
 	// which supposed to update current group
 	updateCh chan *Group
-	mu       sync.RWMutex
 }
 
 func newGroup(cfg config.Group, defaultInterval time.Duration) *Group {
 	g := &Group{
-		Name:       cfg.Name,
-		File:       cfg.File,
-		Interval:   cfg.Interval,
-		doneCh:     make(chan struct{}),
-		finishedCh: make(chan struct{}),
-		updateCh:   make(chan *Group),
+		Name:        cfg.Name,
+		File:        cfg.File,
+		Interval:    cfg.Interval,
+		Concurrency: cfg.Concurrency,
+		doneCh:      make(chan struct{}),
+		finishedCh:  make(chan struct{}),
+		updateCh:    make(chan *Group),
 	}
 	if g.Interval == 0 {
 		g.Interval = defaultInterval
 	}
+	if g.Concurrency < 1 {
+		g.Concurrency = 1
+	}
 	rules := make([]Rule, len(cfg.Rules))
 	for i, r := range cfg.Rules {
 		rules[i] = g.newRule(r)
@@ -121,6 +126,7 @@ func (g *Group) updateWith(newGroup *Group) error {
 	for _, nr := range rulesRegistry {
 		newRules = append(newRules, nr)
 	}
+	g.Concurrency = newGroup.Concurrency
 	g.Rules = newRules
 	return nil
 }
@@ -150,24 +156,18 @@ func (g *Group) close() {
 }
 
 func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
-	logger.Infof("group %q started with interval %v", g.Name, g.Interval)
-
-	var returnSeries bool
-	if rw != nil {
-		returnSeries = true
-	}
-
+	defer func() { close(g.finishedCh) }()
+	logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
+	e := &executor{querier, nr, rw}
 	t := time.NewTicker(g.Interval)
 	defer t.Stop()
 	for {
 		select {
 		case <-ctx.Done():
 			logger.Infof("group %q: context cancelled", g.Name)
-			close(g.finishedCh)
 			return
 		case <-g.doneCh:
 			logger.Infof("group %q: received stop signal", g.Name)
-			close(g.finishedCh)
 			return
 		case ng := <-g.updateCh:
 			g.mu.Lock()
@@ -181,65 +181,115 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifi
 				g.Interval = ng.Interval
 				t.Stop()
 				t = time.NewTicker(g.Interval)
-				logger.Infof("group %q: changed evaluation interval to %v", g.Name, g.Interval)
 			}
 			g.mu.Unlock()
+			logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
 		case <-t.C:
 			iterationTotal.Inc()
 			iterationStart := time.Now()
-			for _, rule := range g.Rules {
-				execTotal.Inc()
-
-				execStart := time.Now()
-				tss, err := rule.Exec(ctx, querier, returnSeries)
-				execDuration.UpdateDuration(execStart)
 
+			errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, g.Interval)
+			for err := range errs {
 				if err != nil {
-					execErrors.Inc()
-					logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule, err)
-					continue
-				}
-
-				if len(tss) > 0 {
-					remoteWriteSent.Add(len(tss))
-					for _, ts := range tss {
-						if err := rw.Push(ts); err != nil {
-							remoteWriteErrors.Inc()
-							logger.Errorf("failed to remote write for rule %q.%q: %s", g.Name, rule, err)
-						}
-					}
-				}
-
-				ar, ok := rule.(*AlertingRule)
-				if !ok {
-					continue
-				}
-				var alerts []notifier.Alert
-				for _, a := range ar.alerts {
-					switch a.State {
-					case notifier.StateFiring:
-						// set End to execStart + 3 intervals
-						// so notifier can resolve it automatically if `vmalert`
-						// won't be able to send resolve for some reason
-						a.End = execStart.Add(3 * g.Interval)
-						alerts = append(alerts, *a)
-					case notifier.StateInactive:
-						// set End to execStart to notify
-						// that it was just resolved
-						a.End = execStart
-						alerts = append(alerts, *a)
-					}
-				}
-				if len(alerts) < 1 {
-					continue
-				}
-				alertsSent.Add(len(alerts))
-				if err := nr.Send(ctx, alerts); err != nil {
-					alertsSendErrors.Inc()
-					logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule, err)
+					logger.Errorf("group %q: %s", g.Name, err)
 				}
 			}
+
 			iterationDuration.UpdateDuration(iterationStart)
 		}
 	}
 }
+
+type executor struct {
+	querier  datasource.Querier
+	notifier notifier.Notifier
+	rw       *remotewrite.Client
+}
+
+func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, interval time.Duration) chan error {
+	res := make(chan error, len(rules))
+	var returnSeries bool
+	if e.rw != nil {
+		returnSeries = true
+	}
+
+	if concurrency == 1 {
+		// fast path
+		for _, rule := range rules {
+			res <- e.exec(ctx, rule, returnSeries, interval)
+		}
+		close(res)
+		return res
+	}
+
+	sem := make(chan struct{}, concurrency)
+	go func() {
+		wg := sync.WaitGroup{}
+		for _, rule := range rules {
+			sem <- struct{}{}
+			wg.Add(1)
+			go func(r Rule) {
+				res <- e.exec(ctx, r, returnSeries, interval)
+				<-sem
+				wg.Done()
+			}(rule)
+		}
+		wg.Wait()
+		close(res)
+	}()
+	return res
+}
+
+func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, interval time.Duration) error {
+	execTotal.Inc()
+	execStart := time.Now()
+	defer func() {
+		execDuration.UpdateDuration(execStart)
+	}()
+
+	tss, err := rule.Exec(ctx, e.querier, returnSeries)
+	if err != nil {
+		execErrors.Inc()
+		return fmt.Errorf("rule %q: failed to execute: %s", rule, err)
+	}
+
+	if len(tss) > 0 && e.rw != nil {
+		remoteWriteSent.Add(len(tss))
+		for _, ts := range tss {
+			if err := e.rw.Push(ts); err != nil {
+				remoteWriteErrors.Inc()
+				return fmt.Errorf("rule %q: remote write failure: %s", rule, err)
+			}
+		}
+	}
+
+	ar, ok := rule.(*AlertingRule)
+	if !ok {
+		return nil
+	}
+	var alerts []notifier.Alert
+	for _, a := range ar.alerts {
+		switch a.State {
+		case notifier.StateFiring:
+			// set End to execStart + 3 intervals
+			// so notifier can resolve it automatically if `vmalert`
+			// won't be able to send resolve for some reason
+			a.End = time.Now().Add(3 * interval)
+			alerts = append(alerts, *a)
+		case notifier.StateInactive:
+			// set End to execStart to notify
+			// that it was just resolved
+			a.End = time.Now()
+			alerts = append(alerts, *a)
+		}
+	}
+	if len(alerts) < 1 {
+		return nil
+	}
+	alertsSent.Add(len(alerts))
+	if err := e.notifier.Send(ctx, alerts); err != nil {
+		alertsSendErrors.Inc()
+		return fmt.Errorf("rule %q: failed to send alerts: %s", rule, err)
+	}
+	return nil
+}
diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go
index dc2b530390..410be42d7e 100644
--- a/app/vmalert/group_test.go
+++ b/app/vmalert/group_test.go
@@ -2,7 +2,6 @@ package main
 
 import (
 	"context"
-	"reflect"
 	"sort"
 	"testing"
 	"time"
@@ -139,6 +138,7 @@ func TestGroupStart(t *testing.T) {
 	}
 	const evalInterval = time.Millisecond
 	g := newGroup(groups[0], evalInterval)
+	g.Concurrency = 2
 
 	fn := &fakeNotifier{}
 	fs := &fakeQuerier{}
@@ -192,34 +192,3 @@ func TestGroupStart(t *testing.T) {
 	g.close()
 	<-finished
 }
-
-func compareAlerts(t *testing.T, as, bs []notifier.Alert) {
-	t.Helper()
-	if len(as) != len(bs) {
-		t.Fatalf("expected to have length %d; got %d", len(as), len(bs))
-	}
-	sort.Slice(as, func(i, j int) bool {
-		return as[i].ID < as[j].ID
-	})
-	sort.Slice(bs, func(i, j int) bool {
-		return bs[i].ID < bs[j].ID
-	})
-	for i := range as {
-		a, b := as[i], bs[i]
-		if a.Name != b.Name {
-			t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name)
-		}
-		if a.State != b.State {
-			t.Fatalf("expected t have State %q; got %q", a.State, b.State)
-		}
-		if a.Value != b.Value {
-			t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value)
-		}
-		if !reflect.DeepEqual(a.Annotations, b.Annotations) {
-			t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations)
-		}
-		if !reflect.DeepEqual(a.Labels, b.Labels) {
-			t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
-		}
-	}
-}
diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go
index 2c2249bc2c..e4fcc1d597 100644
--- a/app/vmalert/helpers_test.go
+++ b/app/vmalert/helpers_test.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"reflect"
+	"sort"
 	"sync"
 	"testing"
 
@@ -198,3 +199,34 @@ func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error {
 	}
 	return nil
 }
+
+func compareAlerts(t *testing.T, as, bs []notifier.Alert) {
+	t.Helper()
+	if len(as) != len(bs) {
+		t.Fatalf("expected to have length %d; got %d", len(as), len(bs))
+	}
+	sort.Slice(as, func(i, j int) bool {
+		return as[i].ID < as[j].ID
+	})
+	sort.Slice(bs, func(i, j int) bool {
+		return bs[i].ID < bs[j].ID
+	})
+	for i := range as {
+		a, b := as[i], bs[i]
+		if a.Name != b.Name {
+			t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name)
+		}
+		if a.State != b.State {
+			t.Fatalf("expected t have State %q; got %q", a.State, b.State)
+		}
+		if a.Value != b.Value {
+			t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value)
+		}
+		if !reflect.DeepEqual(a.Annotations, b.Annotations) {
+			t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations)
+		}
+		if !reflect.DeepEqual(a.Labels, b.Labels) {
+			t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
+		}
+	}
+}
diff --git a/app/vmalert/main.go b/app/vmalert/main.go
index 13ab4c0010..9cfb902837 100644
--- a/app/vmalert/main.go
+++ b/app/vmalert/main.go
@@ -42,12 +42,12 @@ absolute path to all .yaml files in root.`)
 	basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password for -datasource.url")
 
 	remoteWriteURL = flag.String("remoteWrite.url", "", "Optional URL to Victoria Metrics or VMInsert where to persist alerts state"+
-		" in form of timeseries. E.g. http://127.0.0.1:8428")
+		" and recording rules results in form of timeseries. E.g. http://127.0.0.1:8428")
 	remoteWriteUsername     = flag.String("remoteWrite.basicAuth.username", "", "Optional basic auth username for -remoteWrite.url")
 	remoteWritePassword     = flag.String("remoteWrite.basicAuth.password", "", "Optional basic auth password for -remoteWrite.url")
 	remoteWriteMaxQueueSize = flag.Int("remoteWrite.maxQueueSize", 1e5, "Defines the max number of pending datapoints to remote write endpoint")
 	remoteWriteMaxBatchSize = flag.Int("remoteWrite.maxBatchSize", 1e3, "Defines defines max number of timeseries to be flushed at once")
-	remoteWriteConcurrency  = flag.Int("remoteWrite.concurrency", 1, "Defines number of readers that concurrently write into remote storage")
+	remoteWriteConcurrency  = flag.Int("remoteWrite.concurrency", 1, "Defines number of writers for concurrent writing into remote storage")
 
 	remoteReadURL = flag.String("remoteRead.url", "", "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts"+
 		" state. This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state."+
diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go
index 30928f0cbd..ab89751897 100644
--- a/app/vmalert/manager.go
+++ b/app/vmalert/manager.go
@@ -117,10 +117,11 @@ func (m *manager) update(ctx context.Context, path []string, validateTpl, valida
 func (g *Group) toAPI() APIGroup {
 	ag := APIGroup{
 		// encode as strings to avoid rounding
-		ID:       fmt.Sprintf("%d", g.ID()),
-		Name:     g.Name,
-		File:     g.File,
-		Interval: g.Interval.String(),
+		ID:          fmt.Sprintf("%d", g.ID()),
+		Name:        g.Name,
+		File:        g.File,
+		Interval:    g.Interval.String(),
+		Concurrency: g.Concurrency,
 	}
 	for _, r := range g.Rules {
 		switch v := r.(type) {
diff --git a/app/vmalert/web_types.go b/app/vmalert/web_types.go
index ebc395ab14..b26a20d2c4 100644
--- a/app/vmalert/web_types.go
+++ b/app/vmalert/web_types.go
@@ -24,6 +24,7 @@ type APIGroup struct {
 	ID             string             `json:"id"`
 	File           string             `json:"file"`
 	Interval       string             `json:"interval"`
+	Concurrency    int                `json:"concurrency"`
 	AlertingRules  []APIAlertingRule  `json:"alerting_rules"`
 	RecordingRules []APIRecordingRule `json:"recording_rules"`
 }