From 156c83d1126f588c2cb08ad8da67aa7c63baa6c2 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 29 Jun 2020 20:21:03 +0100 Subject: [PATCH] app/vmalert: support multiple notifier urls (#584) (#590) * app/vmalert: support multiple notifier urls (#584) User now can set multiple notifier URLs in the same fashion as for other vmutils (e.g. vmagent). The same is correct for TLS setting for every configured URL. Alerts sending is done in sequential way for respecting the specified URLs order. * app/vmalert: add basicAuth support for notifier client (#585) The change adds possibility to set basicAuth creds for notifier client in the same fasion as for remote write/read and datasource. --- app/vmalert/Makefile | 1 + app/vmalert/datasource/vm.go | 7 ++-- app/vmalert/group.go | 23 +++++++----- app/vmalert/group_test.go | 2 +- app/vmalert/main.go | 8 ++--- app/vmalert/manager.go | 6 ++-- app/vmalert/manager_test.go | 6 ++-- app/vmalert/notifier/alert.go | 8 +++-- app/vmalert/notifier/alertmanager.go | 22 ++++++++---- app/vmalert/notifier/alertmanager_test.go | 11 +++++- app/vmalert/notifier/init.go | 38 +++++++++++++------- app/vmalert/notifier/utils.go | 21 ----------- app/vmalert/utils.go | 3 +- app/vmalert/utils/err_group.go | 43 +++++++++++++++++++++++ app/vmalert/utils/err_group_test.go | 38 ++++++++++++++++++++ 15 files changed, 169 insertions(+), 68 deletions(-) delete mode 100644 app/vmalert/notifier/utils.go create mode 100644 app/vmalert/utils/err_group.go create mode 100644 app/vmalert/utils/err_group_test.go diff --git a/app/vmalert/Makefile b/app/vmalert/Makefile index 8f378e67a0..3f8ad175bf 100644 --- a/app/vmalert/Makefile +++ b/app/vmalert/Makefile @@ -61,6 +61,7 @@ run-vmalert: vmalert ./bin/vmalert -rule=app/vmalert/config/testdata/rules2-good.rules \ -datasource.url=http://localhost:8428 \ -notifier.url=http://localhost:9093 \ + -notifier.url=http://127.0.0.1:9093 \ -remoteWrite.url=http://localhost:8428 \ -remoteRead.url=http://localhost:8428 \ -evaluationInterval=3s diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index 8ec74193d4..295cf6ac9e 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -49,9 +49,10 @@ const queryPath = "/api/v1/query?query=" // VMStorage represents vmstorage entity with ability to read and write metrics type VMStorage struct { - c *http.Client - queryURL string - basicAuthUser, basicAuthPass string + c *http.Client + queryURL string + basicAuthUser string + basicAuthPass string } // NewVMStorage is a constructor for VMStorage diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 0de9b10cef..2ae35c0871 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" ) @@ -155,10 +156,10 @@ func (g *Group) close() { <-g.finishedCh } -func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) { +func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { defer func() { close(g.finishedCh) }() logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) - e := &executor{querier, nr, rw} + e := &executor{querier, nts, rw} t := time.NewTicker(g.Interval) defer t.Stop() for { @@ -201,9 +202,9 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifi } type executor struct { - querier datasource.Querier - notifier notifier.Notifier - rw *remotewrite.Client + querier datasource.Querier + notifiers []notifier.Notifier + rw *remotewrite.Client } func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, interval time.Duration) chan error { @@ -286,10 +287,14 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter if len(alerts) < 1 { return nil } + alertsSent.Add(len(alerts)) - if err := e.notifier.Send(ctx, alerts); err != nil { - alertsSendErrors.Inc() - return fmt.Errorf("rule %q: failed to send alerts: %s", rule, err) + errGr := new(utils.ErrGroup) + for _, nt := range e.notifiers { + if err := nt.Send(ctx, alerts); err != nil { + alertsSendErrors.Inc() + errGr.Add(fmt.Errorf("rule %q: failed to send alerts: %s", rule, err)) + } } - return nil + return errGr.Err() } diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index ee69c9fdc8..5439ebae27 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -179,7 +179,7 @@ func TestGroupStart(t *testing.T) { fs.add(m1) fs.add(m2) go func() { - g.start(context.Background(), fs, fn, nil) + g.start(context.Background(), fs, []notifier.Notifier{fn}, nil) close(finished) }() diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 22ef3fa1c4..3737c72807 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -116,15 +116,15 @@ func newManager(ctx context.Context) (*manager, error) { if err != nil { return nil, fmt.Errorf("failed to init `external.alert.source`: %s", err) } - nt, err := notifier.Init(aug) + nts, err := notifier.Init(aug) if err != nil { return nil, fmt.Errorf("failed to init notifier: %s", err) } manager := &manager{ - groups: make(map[uint64]*Group), - querier: q, - notifier: nt, + groups: make(map[uint64]*Group), + querier: q, + notifiers: nts, } rw, err := remotewrite.Init(ctx) if err != nil { diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index b312cddc84..bce4026bf5 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -15,8 +15,8 @@ import ( // manager controls group states type manager struct { - querier datasource.Querier - notifier notifier.Notifier + querier datasource.Querier + notifiers []notifier.Notifier rw *remotewrite.Client rr datasource.Querier @@ -73,7 +73,7 @@ func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) { m.wg.Add(1) id := group.ID() go func() { - group.start(ctx, m.querier, m.notifier, m.rw) + group.start(ctx, m.querier, m.notifiers, m.rw) m.wg.Done() }() m.groups[id] = group diff --git a/app/vmalert/manager_test.go b/app/vmalert/manager_test.go index 11fcc6d232..bc6f308560 100644 --- a/app/vmalert/manager_test.go +++ b/app/vmalert/manager_test.go @@ -37,9 +37,9 @@ func TestManagerUpdateError(t *testing.T) { // Should be executed with -race flag func TestManagerUpdateConcurrent(t *testing.T) { m := &manager{ - groups: make(map[uint64]*Group), - querier: &fakeQuerier{}, - notifier: &fakeNotifier{}, + groups: make(map[uint64]*Group), + querier: &fakeQuerier{}, + notifiers: []notifier.Notifier{&fakeNotifier{}}, } paths := []string{ "config/testdata/dir/rules0-good.rules", diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 985bd47d59..67302ca175 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -7,6 +7,8 @@ import ( "strings" "text/template" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" ) // Alert the triggered alert @@ -77,7 +79,7 @@ func ValidateTemplates(annotations map[string]string) error { func templateAnnotations(annotations map[string]string, header string, data alertTplData) (map[string]string, error) { var builder strings.Builder var buf bytes.Buffer - eg := errGroup{} + eg := new(utils.ErrGroup) r := make(map[string]string, len(annotations)) for key, text := range annotations { r[key] = text @@ -87,12 +89,12 @@ func templateAnnotations(annotations map[string]string, header string, data aler builder.WriteString(header) builder.WriteString(text) if err := templateAnnotation(&buf, builder.String(), data); err != nil { - eg.errs = append(eg.errs, fmt.Sprintf("key %q, template %q: %s", key, text, err)) + eg.Add(fmt.Errorf("key %q, template %q: %s", key, text, err)) continue } r[key] = buf.String() } - return r, eg.err() + return r, eg.Err() } func templateAnnotation(dst io.Writer, text string, data alertTplData) error { diff --git a/app/vmalert/notifier/alertmanager.go b/app/vmalert/notifier/alertmanager.go index 6e06949ce2..78fb453bcd 100644 --- a/app/vmalert/notifier/alertmanager.go +++ b/app/vmalert/notifier/alertmanager.go @@ -12,9 +12,11 @@ import ( // AlertManager represents integration provider with Prometheus alert manager // https://github.com/prometheus/alertmanager type AlertManager struct { - alertURL string - argFunc AlertURLGenerator - client *http.Client + alertURL string + basicAuthUser string + basicAuthPass string + argFunc AlertURLGenerator + client *http.Client } // Send an alert or resolve message @@ -28,6 +30,9 @@ func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error { } req.Header.Set("Content-Type", "application/json") req = req.WithContext(ctx) + if am.basicAuthPass != "" { + req.SetBasicAuth(am.basicAuthUser, am.basicAuthPass) + } resp, err := am.client.Do(req) if err != nil { return err @@ -51,10 +56,13 @@ type AlertURLGenerator func(Alert) string const alertManagerPath = "/api/v2/alerts" // NewAlertManager is a constructor for AlertManager -func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager { +func NewAlertManager(alertManagerURL, user, pass string, fn AlertURLGenerator, c *http.Client) *AlertManager { + addr := strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath return &AlertManager{ - alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath, - argFunc: fn, - client: c, + alertURL: addr, + argFunc: fn, + client: c, + basicAuthUser: user, + basicAuthPass: pass, } } diff --git a/app/vmalert/notifier/alertmanager_test.go b/app/vmalert/notifier/alertmanager_test.go index cfdcc334c3..18f2efcf4b 100644 --- a/app/vmalert/notifier/alertmanager_test.go +++ b/app/vmalert/notifier/alertmanager_test.go @@ -11,12 +11,21 @@ import ( ) func TestAlertManager_Send(t *testing.T) { + const baUser, baPass = "foo", "bar" mux := http.NewServeMux() mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { t.Errorf("should not be called") }) c := -1 mux.HandleFunc(alertManagerPath, func(w http.ResponseWriter, r *http.Request) { + user, pass, ok := r.BasicAuth() + if !ok { + t.Errorf("unauthorized request") + } + if user != baUser || pass != baPass { + t.Errorf("wrong creds %q:%q; expected %q:%q", + user, pass, baUser, baPass) + } c++ if r.Method != http.MethodPost { t.Errorf("expected POST method got %s", r.Method) @@ -58,7 +67,7 @@ func TestAlertManager_Send(t *testing.T) { }) srv := httptest.NewServer(mux) defer srv.Close() - am := NewAlertManager(srv.URL, func(alert Alert) string { + am := NewAlertManager(srv.URL, baUser, baPass, func(alert Alert) string { return strconv.FormatUint(alert.GroupID, 10) + "/" + strconv.FormatUint(alert.ID, 10) }, srv.Client()) if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil { diff --git a/app/vmalert/notifier/init.go b/app/vmalert/notifier/init.go index 6871cb010c..b84c42c26a 100644 --- a/app/vmalert/notifier/init.go +++ b/app/vmalert/notifier/init.go @@ -6,28 +6,42 @@ import ( "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" ) var ( - addr = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") + addrs = flagutil.NewArray("notifier.url", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093") + basicAuthUsername = flagutil.NewArray("notifier.basicAuth.username", "Optional basic auth username for -datasource.url") + basicAuthPassword = flagutil.NewArray("notifier.basicAuth.password", "Optional basic auth password for -datasource.url") + tlsInsecureSkipVerify = flag.Bool("notifier.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -notifier.url") - tlsCertFile = flag.String("notifier.tlsCertFile", "", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url") - tlsKeyFile = flag.String("notifier.tlsKeyFile", "", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url") - tlsCAFile = flag.String("notifier.tlsCAFile", "", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+ + tlsCertFile = flagutil.NewArray("notifier.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url") + tlsKeyFile = flagutil.NewArray("notifier.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url") + tlsCAFile = flagutil.NewArray("notifier.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+ "By default system CA is used") - tlsServerName = flag.String("notifier.tlsServerName", "", "Optional TLS server name to use for connections to -notifier.url. "+ + tlsServerName = flagutil.NewArray("notifier.tlsServerName", "Optional TLS server name to use for connections to -notifier.url. "+ "By default the server name from -notifier.url is used") ) // Init creates a Notifier object based on provided flags. -func Init(gen AlertURLGenerator) (Notifier, error) { - if *addr == "" { +func Init(gen AlertURLGenerator) ([]Notifier, error) { + if len(*addrs) == 0 { flag.PrintDefaults() - return nil, fmt.Errorf("notifier.url is empty") + return nil, fmt.Errorf("at least one `-notifier.url` must be set") } - tr, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify) - if err != nil { - return nil, fmt.Errorf("failed to create transport: %s", err) + + var notifiers []Notifier + for i, addr := range *addrs { + cert, key := tlsCertFile.GetOptionalArg(i), tlsKeyFile.GetOptionalArg(i) + ca, serverName := tlsCAFile.GetOptionalArg(i), tlsServerName.GetOptionalArg(i) + tr, err := utils.Transport(addr, cert, key, ca, serverName, *tlsInsecureSkipVerify) + if err != nil { + return nil, fmt.Errorf("failed to create transport: %s", err) + } + user, pass := basicAuthUsername.GetOptionalArg(i), basicAuthPassword.GetOptionalArg(i) + am := NewAlertManager(addr, user, pass, gen, &http.Client{Transport: tr}) + notifiers = append(notifiers, am) } - return NewAlertManager(*addr, gen, &http.Client{Transport: tr}), nil + + return notifiers, nil } diff --git a/app/vmalert/notifier/utils.go b/app/vmalert/notifier/utils.go deleted file mode 100644 index 4bce9485a7..0000000000 --- a/app/vmalert/notifier/utils.go +++ /dev/null @@ -1,21 +0,0 @@ -package notifier - -import ( - "fmt" - "strings" -) - -type errGroup struct { - errs []string -} - -func (eg *errGroup) err() error { - if eg == nil || len(eg.errs) == 0 { - return nil - } - return eg -} - -func (eg *errGroup) Error() string { - return fmt.Sprintf("errors: %s", strings.Join(eg.errs, "\n")) -} diff --git a/app/vmalert/utils.go b/app/vmalert/utils.go index dcf46f8eca..7a824a096e 100644 --- a/app/vmalert/utils.go +++ b/app/vmalert/utils.go @@ -1,9 +1,10 @@ package main import ( - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "sort" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries { diff --git a/app/vmalert/utils/err_group.go b/app/vmalert/utils/err_group.go new file mode 100644 index 0000000000..a2f5750e84 --- /dev/null +++ b/app/vmalert/utils/err_group.go @@ -0,0 +1,43 @@ +package utils + +import ( + "fmt" + "strings" +) + +// ErrGroup accumulates multiple errors +// and produces single error message. +type ErrGroup struct { + errs []error +} + +// Add adds a new error to group. +// Isn't thread-safe. +func (eg *ErrGroup) Add(err error) { + eg.errs = append(eg.errs, err) +} + +// Err checks if group contains at least +// one error. +func (eg *ErrGroup) Err() error { + if eg == nil || len(eg.errs) == 0 { + return nil + } + return eg +} + +// Error satisfies Error interface +func (eg *ErrGroup) Error() string { + if len(eg.errs) == 0 { + return "" + } + var b strings.Builder + fmt.Fprintf(&b, "errors(%d): ", len(eg.errs)) + for i, err := range eg.errs { + b.WriteString(err.Error()) + if i != len(eg.errs)-1 { + b.WriteString("\n") + } + } + return b.String() +} diff --git a/app/vmalert/utils/err_group_test.go b/app/vmalert/utils/err_group_test.go new file mode 100644 index 0000000000..ae409bcc4a --- /dev/null +++ b/app/vmalert/utils/err_group_test.go @@ -0,0 +1,38 @@ +package utils + +import ( + "errors" + "testing" +) + +func TestErrGroup(t *testing.T) { + testCases := []struct { + errs []error + exp string + }{ + {nil, ""}, + {[]error{errors.New("timeout")}, "errors(1): timeout"}, + { + []error{errors.New("timeout"), errors.New("deadline")}, + "errors(2): timeout\ndeadline", + }, + } + for _, tc := range testCases { + eg := new(ErrGroup) + for _, err := range tc.errs { + eg.Add(err) + } + if len(tc.errs) == 0 { + if eg.Err() != nil { + t.Fatalf("expected to get nil error") + } + continue + } + if eg.Err() == nil { + t.Fatalf("expected to get non-nil error") + } + if eg.Error() != tc.exp { + t.Fatalf("expected to have: \n%q\ngot:\n%q", tc.exp, eg.Error()) + } + } +}