app/vmalert: support multiple notifier urls (#584) (#590)

* app/vmalert: support multiple notifier urls (#584)

User now can set multiple notifier URLs in the same fashion
as for other vmutils (e.g. vmagent). The same is correct for
TLS setting for every configured URL. Alerts sending is done
in sequential way for respecting the specified URLs order.

* app/vmalert: add basicAuth support for notifier client (#585)

The change adds possibility to set basicAuth creds for notifier
client in the same fasion as for remote write/read and datasource.
This commit is contained in:
Roman Khavronenko 2020-06-29 20:21:03 +01:00 committed by Aliaksandr Valialkin
parent 5341596f96
commit 156c83d112
15 changed files with 169 additions and 68 deletions

View file

@ -61,6 +61,7 @@ run-vmalert: vmalert
./bin/vmalert -rule=app/vmalert/config/testdata/rules2-good.rules \
-datasource.url=http://localhost:8428 \
-notifier.url=http://localhost:9093 \
-notifier.url=http://127.0.0.1:9093 \
-remoteWrite.url=http://localhost:8428 \
-remoteRead.url=http://localhost:8428 \
-evaluationInterval=3s

View file

@ -49,9 +49,10 @@ const queryPath = "/api/v1/query?query="
// VMStorage represents vmstorage entity with ability to read and write metrics
type VMStorage struct {
c *http.Client
queryURL string
basicAuthUser, basicAuthPass string
c *http.Client
queryURL string
basicAuthUser string
basicAuthPass string
}
// NewVMStorage is a constructor for VMStorage

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
@ -155,10 +156,10 @@ func (g *Group) close() {
<-g.finishedCh
}
func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) {
func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) {
defer func() { close(g.finishedCh) }()
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
e := &executor{querier, nr, rw}
e := &executor{querier, nts, rw}
t := time.NewTicker(g.Interval)
defer t.Stop()
for {
@ -201,9 +202,9 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nr notifi
}
type executor struct {
querier datasource.Querier
notifier notifier.Notifier
rw *remotewrite.Client
querier datasource.Querier
notifiers []notifier.Notifier
rw *remotewrite.Client
}
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, interval time.Duration) chan error {
@ -286,10 +287,14 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter
if len(alerts) < 1 {
return nil
}
alertsSent.Add(len(alerts))
if err := e.notifier.Send(ctx, alerts); err != nil {
alertsSendErrors.Inc()
return fmt.Errorf("rule %q: failed to send alerts: %s", rule, err)
errGr := new(utils.ErrGroup)
for _, nt := range e.notifiers {
if err := nt.Send(ctx, alerts); err != nil {
alertsSendErrors.Inc()
errGr.Add(fmt.Errorf("rule %q: failed to send alerts: %s", rule, err))
}
}
return nil
return errGr.Err()
}

View file

@ -179,7 +179,7 @@ func TestGroupStart(t *testing.T) {
fs.add(m1)
fs.add(m2)
go func() {
g.start(context.Background(), fs, fn, nil)
g.start(context.Background(), fs, []notifier.Notifier{fn}, nil)
close(finished)
}()

View file

@ -116,15 +116,15 @@ func newManager(ctx context.Context) (*manager, error) {
if err != nil {
return nil, fmt.Errorf("failed to init `external.alert.source`: %s", err)
}
nt, err := notifier.Init(aug)
nts, err := notifier.Init(aug)
if err != nil {
return nil, fmt.Errorf("failed to init notifier: %s", err)
}
manager := &manager{
groups: make(map[uint64]*Group),
querier: q,
notifier: nt,
groups: make(map[uint64]*Group),
querier: q,
notifiers: nts,
}
rw, err := remotewrite.Init(ctx)
if err != nil {

View file

@ -15,8 +15,8 @@ import (
// manager controls group states
type manager struct {
querier datasource.Querier
notifier notifier.Notifier
querier datasource.Querier
notifiers []notifier.Notifier
rw *remotewrite.Client
rr datasource.Querier
@ -73,7 +73,7 @@ func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) {
m.wg.Add(1)
id := group.ID()
go func() {
group.start(ctx, m.querier, m.notifier, m.rw)
group.start(ctx, m.querier, m.notifiers, m.rw)
m.wg.Done()
}()
m.groups[id] = group

View file

@ -37,9 +37,9 @@ func TestManagerUpdateError(t *testing.T) {
// Should be executed with -race flag
func TestManagerUpdateConcurrent(t *testing.T) {
m := &manager{
groups: make(map[uint64]*Group),
querier: &fakeQuerier{},
notifier: &fakeNotifier{},
groups: make(map[uint64]*Group),
querier: &fakeQuerier{},
notifiers: []notifier.Notifier{&fakeNotifier{}},
}
paths := []string{
"config/testdata/dir/rules0-good.rules",

View file

@ -7,6 +7,8 @@ import (
"strings"
"text/template"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
)
// Alert the triggered alert
@ -77,7 +79,7 @@ func ValidateTemplates(annotations map[string]string) error {
func templateAnnotations(annotations map[string]string, header string, data alertTplData) (map[string]string, error) {
var builder strings.Builder
var buf bytes.Buffer
eg := errGroup{}
eg := new(utils.ErrGroup)
r := make(map[string]string, len(annotations))
for key, text := range annotations {
r[key] = text
@ -87,12 +89,12 @@ func templateAnnotations(annotations map[string]string, header string, data aler
builder.WriteString(header)
builder.WriteString(text)
if err := templateAnnotation(&buf, builder.String(), data); err != nil {
eg.errs = append(eg.errs, fmt.Sprintf("key %q, template %q: %s", key, text, err))
eg.Add(fmt.Errorf("key %q, template %q: %s", key, text, err))
continue
}
r[key] = buf.String()
}
return r, eg.err()
return r, eg.Err()
}
func templateAnnotation(dst io.Writer, text string, data alertTplData) error {

View file

@ -12,9 +12,11 @@ import (
// AlertManager represents integration provider with Prometheus alert manager
// https://github.com/prometheus/alertmanager
type AlertManager struct {
alertURL string
argFunc AlertURLGenerator
client *http.Client
alertURL string
basicAuthUser string
basicAuthPass string
argFunc AlertURLGenerator
client *http.Client
}
// Send an alert or resolve message
@ -28,6 +30,9 @@ func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error {
}
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx)
if am.basicAuthPass != "" {
req.SetBasicAuth(am.basicAuthUser, am.basicAuthPass)
}
resp, err := am.client.Do(req)
if err != nil {
return err
@ -51,10 +56,13 @@ type AlertURLGenerator func(Alert) string
const alertManagerPath = "/api/v2/alerts"
// NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager {
func NewAlertManager(alertManagerURL, user, pass string, fn AlertURLGenerator, c *http.Client) *AlertManager {
addr := strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath
return &AlertManager{
alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath,
argFunc: fn,
client: c,
alertURL: addr,
argFunc: fn,
client: c,
basicAuthUser: user,
basicAuthPass: pass,
}
}

View file

@ -11,12 +11,21 @@ import (
)
func TestAlertManager_Send(t *testing.T) {
const baUser, baPass = "foo", "bar"
mux := http.NewServeMux()
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
t.Errorf("should not be called")
})
c := -1
mux.HandleFunc(alertManagerPath, func(w http.ResponseWriter, r *http.Request) {
user, pass, ok := r.BasicAuth()
if !ok {
t.Errorf("unauthorized request")
}
if user != baUser || pass != baPass {
t.Errorf("wrong creds %q:%q; expected %q:%q",
user, pass, baUser, baPass)
}
c++
if r.Method != http.MethodPost {
t.Errorf("expected POST method got %s", r.Method)
@ -58,7 +67,7 @@ func TestAlertManager_Send(t *testing.T) {
})
srv := httptest.NewServer(mux)
defer srv.Close()
am := NewAlertManager(srv.URL, func(alert Alert) string {
am := NewAlertManager(srv.URL, baUser, baPass, func(alert Alert) string {
return strconv.FormatUint(alert.GroupID, 10) + "/" + strconv.FormatUint(alert.ID, 10)
}, srv.Client())
if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil {

View file

@ -6,28 +6,42 @@ import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
)
var (
addr = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093")
addrs = flagutil.NewArray("notifier.url", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093")
basicAuthUsername = flagutil.NewArray("notifier.basicAuth.username", "Optional basic auth username for -datasource.url")
basicAuthPassword = flagutil.NewArray("notifier.basicAuth.password", "Optional basic auth password for -datasource.url")
tlsInsecureSkipVerify = flag.Bool("notifier.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -notifier.url")
tlsCertFile = flag.String("notifier.tlsCertFile", "", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url")
tlsKeyFile = flag.String("notifier.tlsKeyFile", "", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url")
tlsCAFile = flag.String("notifier.tlsCAFile", "", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+
tlsCertFile = flagutil.NewArray("notifier.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url")
tlsKeyFile = flagutil.NewArray("notifier.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url")
tlsCAFile = flagutil.NewArray("notifier.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+
"By default system CA is used")
tlsServerName = flag.String("notifier.tlsServerName", "", "Optional TLS server name to use for connections to -notifier.url. "+
tlsServerName = flagutil.NewArray("notifier.tlsServerName", "Optional TLS server name to use for connections to -notifier.url. "+
"By default the server name from -notifier.url is used")
)
// Init creates a Notifier object based on provided flags.
func Init(gen AlertURLGenerator) (Notifier, error) {
if *addr == "" {
func Init(gen AlertURLGenerator) ([]Notifier, error) {
if len(*addrs) == 0 {
flag.PrintDefaults()
return nil, fmt.Errorf("notifier.url is empty")
return nil, fmt.Errorf("at least one `-notifier.url` must be set")
}
tr, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %s", err)
var notifiers []Notifier
for i, addr := range *addrs {
cert, key := tlsCertFile.GetOptionalArg(i), tlsKeyFile.GetOptionalArg(i)
ca, serverName := tlsCAFile.GetOptionalArg(i), tlsServerName.GetOptionalArg(i)
tr, err := utils.Transport(addr, cert, key, ca, serverName, *tlsInsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %s", err)
}
user, pass := basicAuthUsername.GetOptionalArg(i), basicAuthPassword.GetOptionalArg(i)
am := NewAlertManager(addr, user, pass, gen, &http.Client{Transport: tr})
notifiers = append(notifiers, am)
}
return NewAlertManager(*addr, gen, &http.Client{Transport: tr}), nil
return notifiers, nil
}

View file

@ -1,21 +0,0 @@
package notifier
import (
"fmt"
"strings"
)
type errGroup struct {
errs []string
}
func (eg *errGroup) err() error {
if eg == nil || len(eg.errs) == 0 {
return nil
}
return eg
}
func (eg *errGroup) Error() string {
return fmt.Sprintf("errors: %s", strings.Join(eg.errs, "\n"))
}

View file

@ -1,9 +1,10 @@
package main
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"sort"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries {

View file

@ -0,0 +1,43 @@
package utils
import (
"fmt"
"strings"
)
// ErrGroup accumulates multiple errors
// and produces single error message.
type ErrGroup struct {
errs []error
}
// Add adds a new error to group.
// Isn't thread-safe.
func (eg *ErrGroup) Add(err error) {
eg.errs = append(eg.errs, err)
}
// Err checks if group contains at least
// one error.
func (eg *ErrGroup) Err() error {
if eg == nil || len(eg.errs) == 0 {
return nil
}
return eg
}
// Error satisfies Error interface
func (eg *ErrGroup) Error() string {
if len(eg.errs) == 0 {
return ""
}
var b strings.Builder
fmt.Fprintf(&b, "errors(%d): ", len(eg.errs))
for i, err := range eg.errs {
b.WriteString(err.Error())
if i != len(eg.errs)-1 {
b.WriteString("\n")
}
}
return b.String()
}

View file

@ -0,0 +1,38 @@
package utils
import (
"errors"
"testing"
)
func TestErrGroup(t *testing.T) {
testCases := []struct {
errs []error
exp string
}{
{nil, ""},
{[]error{errors.New("timeout")}, "errors(1): timeout"},
{
[]error{errors.New("timeout"), errors.New("deadline")},
"errors(2): timeout\ndeadline",
},
}
for _, tc := range testCases {
eg := new(ErrGroup)
for _, err := range tc.errs {
eg.Add(err)
}
if len(tc.errs) == 0 {
if eg.Err() != nil {
t.Fatalf("expected to get nil error")
}
continue
}
if eg.Err() == nil {
t.Fatalf("expected to get non-nil error")
}
if eg.Error() != tc.exp {
t.Fatalf("expected to have: \n%q\ngot:\n%q", tc.exp, eg.Error())
}
}
}