From ef7f52e0e6eb864a2dd6a635ac470b54a5266f31 Mon Sep 17 00:00:00 2001
From: Roman Khavronenko <roman@victoriametrics.com>
Date: Sat, 18 Jun 2022 09:11:37 +0200
Subject: [PATCH] Vmalert notifiers (#2744)

* vmalert: remove head of line blocking for sending alerts

This change makes sending alerts to notifiers concurrent instead
of sequential. This eliminates head of line blocking, where first
faulty notifier address prevents the rest of notifiers from
receiving notifications.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: make default timeout for sending alerts 10s

Previous value of 1m was too high and was inconsistent
with default timeout defined for notifiers via
configuration file.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: linter checks fix

Signed-off-by: hagen1778 <roman@victoriametrics.com>
---
 app/vmalert/group.go                | 12 ++++++---
 app/vmalert/group_test.go           | 39 +++++++++++++++++++++++++++++
 app/vmalert/helpers_test.go         | 12 +++++++++
 app/vmalert/notifier/init.go        |  2 +-
 app/vmalert/utils/err_group.go      | 17 +++++++++++--
 app/vmalert/utils/err_group_test.go | 27 ++++++++++++++++++++
 6 files changed, 103 insertions(+), 6 deletions(-)

diff --git a/app/vmalert/group.go b/app/vmalert/group.go
index 747a4ca540..ac8e712bd1 100644
--- a/app/vmalert/group.go
+++ b/app/vmalert/group.go
@@ -438,11 +438,17 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur
 		return nil
 	}
 
+	wg := sync.WaitGroup{}
 	for _, nt := range e.notifiers() {
-		if err := nt.Send(ctx, alerts); err != nil {
-			errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
-		}
+		wg.Add(1)
+		go func(nt notifier.Notifier) {
+			if err := nt.Send(ctx, alerts); err != nil {
+				errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
+			}
+			wg.Done()
+		}(nt)
 	}
+	wg.Wait()
 	return errGr.Err()
 }
 
diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go
index 95e50c43e9..046acd2b09 100644
--- a/app/vmalert/group_test.go
+++ b/app/vmalert/group_test.go
@@ -413,3 +413,42 @@ func TestPurgeStaleSeries(t *testing.T) {
 		[]Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}},
 	)
 }
+
+func TestFaultyNotifier(t *testing.T) {
+	fq := &fakeQuerier{}
+	fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
+
+	r := newTestAlertingRule("instant", 0)
+	r.q = fq
+
+	fn := &fakeNotifier{}
+	e := &executor{
+		notifiers: func() []notifier.Notifier {
+			return []notifier.Notifier{
+				&faultyNotifier{},
+				fn,
+			}
+		},
+	}
+	delay := 5 * time.Second
+	ctx, cancel := context.WithTimeout(context.Background(), delay)
+	defer cancel()
+
+	go func() {
+		_ = e.exec(ctx, r, time.Now(), 0, 10)
+	}()
+
+	tn := time.Now()
+	deadline := tn.Add(delay / 2)
+	for {
+		if fn.getCounter() > 0 {
+			return
+		}
+		if tn.After(deadline) {
+			break
+		}
+		tn = time.Now()
+		time.Sleep(time.Millisecond * 100)
+	}
+	t.Fatalf("alive notifier didn't receive notification by %v", deadline)
+}
diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go
index 4524ba4496..96bd2b1e21 100644
--- a/app/vmalert/helpers_test.go
+++ b/app/vmalert/helpers_test.go
@@ -87,6 +87,18 @@ func (fn *fakeNotifier) getAlerts() []notifier.Alert {
 	return fn.alerts
 }
 
+type faultyNotifier struct {
+	fakeNotifier
+}
+
+func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert) error {
+	d, ok := ctx.Deadline()
+	if ok {
+		time.Sleep(time.Until(d))
+	}
+	return fmt.Errorf("send failed")
+}
+
 func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric {
 	return metricWithValuesAndLabels(t, []float64{value}, labels...)
 }
diff --git a/app/vmalert/notifier/init.go b/app/vmalert/notifier/init.go
index fdc49af1a7..a1b8bcac6a 100644
--- a/app/vmalert/notifier/init.go
+++ b/app/vmalert/notifier/init.go
@@ -145,7 +145,7 @@ func notifiersFromFlags(gen AlertURLGenerator) ([]Notifier, error) {
 		}
 
 		addr = strings.TrimSuffix(addr, "/")
-		am, err := NewAlertManager(addr+alertManagerPath, gen, authCfg, nil, time.Minute)
+		am, err := NewAlertManager(addr+alertManagerPath, gen, authCfg, nil, time.Second*10)
 		if err != nil {
 			return nil, err
 		}
diff --git a/app/vmalert/utils/err_group.go b/app/vmalert/utils/err_group.go
index a2f5750e84..c8ce3a0d84 100644
--- a/app/vmalert/utils/err_group.go
+++ b/app/vmalert/utils/err_group.go
@@ -3,24 +3,34 @@ package utils
 import (
 	"fmt"
 	"strings"
+	"sync"
 )
 
 // ErrGroup accumulates multiple errors
 // and produces single error message.
 type ErrGroup struct {
+	mu   sync.Mutex
 	errs []error
 }
 
 // Add adds a new error to group.
-// Isn't thread-safe.
+// Is thread-safe.
 func (eg *ErrGroup) Add(err error) {
+	eg.mu.Lock()
 	eg.errs = append(eg.errs, err)
+	eg.mu.Unlock()
 }
 
 // Err checks if group contains at least
 // one error.
 func (eg *ErrGroup) Err() error {
-	if eg == nil || len(eg.errs) == 0 {
+	if eg == nil {
+		return nil
+	}
+
+	eg.mu.Lock()
+	defer eg.mu.Unlock()
+	if len(eg.errs) == 0 {
 		return nil
 	}
 	return eg
@@ -28,6 +38,9 @@ func (eg *ErrGroup) Err() error {
 
 // Error satisfies Error interface
 func (eg *ErrGroup) Error() string {
+	eg.mu.Lock()
+	defer eg.mu.Unlock()
+
 	if len(eg.errs) == 0 {
 		return ""
 	}
diff --git a/app/vmalert/utils/err_group_test.go b/app/vmalert/utils/err_group_test.go
index ae409bcc4a..de57ee32dd 100644
--- a/app/vmalert/utils/err_group_test.go
+++ b/app/vmalert/utils/err_group_test.go
@@ -2,6 +2,7 @@ package utils
 
 import (
 	"errors"
+	"fmt"
 	"testing"
 )
 
@@ -36,3 +37,29 @@ func TestErrGroup(t *testing.T) {
 		}
 	}
 }
+
+// TestErrGroupConcurrent supposed to test concurrent
+// use of error group.
+// Should be executed with -race flag
+func TestErrGroupConcurrent(t *testing.T) {
+	eg := new(ErrGroup)
+
+	const writersN = 4
+	payload := make(chan error, writersN)
+	for i := 0; i < writersN; i++ {
+		go func() {
+			for err := range payload {
+				eg.Add(err)
+			}
+		}()
+	}
+
+	const iterations = 500
+	for i := 0; i < iterations; i++ {
+		payload <- fmt.Errorf("error %d", i)
+		if i%10 == 0 {
+			_ = eg.Err()
+		}
+	}
+	close(payload)
+}