Vmalert notifiers (#2744)

* vmalert: remove head of line blocking for sending alerts

This change makes sending alerts to notifiers concurrent instead
of sequential. This eliminates head of line blocking, where first
faulty notifier address prevents the rest of notifiers from
receiving notifications.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: make default timeout for sending alerts 10s

Previous value of 1m was too high and was inconsistent
with default timeout defined for notifiers via
configuration file.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* vmalert: linter checks fix

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-06-18 09:11:37 +02:00 committed by Aliaksandr Valialkin
parent 7a79e7c0ef
commit 3e45e1ff63
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
6 changed files with 103 additions and 6 deletions

View file

@ -438,11 +438,17 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur
return nil
}
wg := sync.WaitGroup{}
for _, nt := range e.notifiers() {
if err := nt.Send(ctx, alerts); err != nil {
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
}
wg.Add(1)
go func(nt notifier.Notifier) {
if err := nt.Send(ctx, alerts); err != nil {
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
}
wg.Done()
}(nt)
}
wg.Wait()
return errGr.Err()
}

View file

@ -413,3 +413,42 @@ func TestPurgeStaleSeries(t *testing.T) {
[]Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}},
)
}
func TestFaultyNotifier(t *testing.T) {
fq := &fakeQuerier{}
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
r := newTestAlertingRule("instant", 0)
r.q = fq
fn := &fakeNotifier{}
e := &executor{
notifiers: func() []notifier.Notifier {
return []notifier.Notifier{
&faultyNotifier{},
fn,
}
},
}
delay := 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), delay)
defer cancel()
go func() {
_ = e.exec(ctx, r, time.Now(), 0, 10)
}()
tn := time.Now()
deadline := tn.Add(delay / 2)
for {
if fn.getCounter() > 0 {
return
}
if tn.After(deadline) {
break
}
tn = time.Now()
time.Sleep(time.Millisecond * 100)
}
t.Fatalf("alive notifier didn't receive notification by %v", deadline)
}

View file

@ -87,6 +87,18 @@ func (fn *fakeNotifier) getAlerts() []notifier.Alert {
return fn.alerts
}
type faultyNotifier struct {
fakeNotifier
}
func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert) error {
d, ok := ctx.Deadline()
if ok {
time.Sleep(time.Until(d))
}
return fmt.Errorf("send failed")
}
func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric {
return metricWithValuesAndLabels(t, []float64{value}, labels...)
}

View file

@ -145,7 +145,7 @@ func notifiersFromFlags(gen AlertURLGenerator) ([]Notifier, error) {
}
addr = strings.TrimSuffix(addr, "/")
am, err := NewAlertManager(addr+alertManagerPath, gen, authCfg, nil, time.Minute)
am, err := NewAlertManager(addr+alertManagerPath, gen, authCfg, nil, time.Second*10)
if err != nil {
return nil, err
}

View file

@ -3,24 +3,34 @@ package utils
import (
"fmt"
"strings"
"sync"
)
// ErrGroup accumulates multiple errors
// and produces single error message.
type ErrGroup struct {
mu sync.Mutex
errs []error
}
// Add adds a new error to group.
// Isn't thread-safe.
// Is thread-safe.
func (eg *ErrGroup) Add(err error) {
eg.mu.Lock()
eg.errs = append(eg.errs, err)
eg.mu.Unlock()
}
// Err checks if group contains at least
// one error.
func (eg *ErrGroup) Err() error {
if eg == nil || len(eg.errs) == 0 {
if eg == nil {
return nil
}
eg.mu.Lock()
defer eg.mu.Unlock()
if len(eg.errs) == 0 {
return nil
}
return eg
@ -28,6 +38,9 @@ func (eg *ErrGroup) Err() error {
// Error satisfies Error interface
func (eg *ErrGroup) Error() string {
eg.mu.Lock()
defer eg.mu.Unlock()
if len(eg.errs) == 0 {
return ""
}

View file

@ -2,6 +2,7 @@ package utils
import (
"errors"
"fmt"
"testing"
)
@ -36,3 +37,29 @@ func TestErrGroup(t *testing.T) {
}
}
}
// TestErrGroupConcurrent supposed to test concurrent
// use of error group.
// Should be executed with -race flag
func TestErrGroupConcurrent(t *testing.T) {
eg := new(ErrGroup)
const writersN = 4
payload := make(chan error, writersN)
for i := 0; i < writersN; i++ {
go func() {
for err := range payload {
eg.Add(err)
}
}()
}
const iterations = 500
for i := 0; i < iterations; i++ {
payload <- fmt.Errorf("error %d", i)
if i%10 == 0 {
_ = eg.Err()
}
}
close(payload)
}