diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go index 35e00978f2..16fb5bab13 100644 --- a/app/vmalert/config/config.go +++ b/app/vmalert/config/config.go @@ -36,7 +36,8 @@ type Group struct { Params url.Values `yaml:"params"` // Headers contains optional HTTP headers added to each rule request Headers []Header `yaml:"headers,omitempty"` - + // NotifierHeaders contains optional HTTP headers added to each alert request which will send to notifier + NotifierHeaders []Header `yaml:"notifier_headers,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 94b74b27ee..acab27d644 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -36,9 +36,10 @@ type Group struct { Checksum string LastEvaluation time.Time - Labels map[string]string - Params url.Values - Headers map[string]string + Labels map[string]string + Params url.Values + Headers map[string]string + NotifierHeaders map[string]string doneCh chan struct{} finishedCh chan struct{} @@ -93,16 +94,17 @@ func mergeLabels(groupName, ruleName string, set1, set2 map[string]string) map[s func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group { g := &Group{ - Type: cfg.Type, - Name: cfg.Name, - File: cfg.File, - Interval: cfg.Interval.Duration(), - Limit: cfg.Limit, - Concurrency: cfg.Concurrency, - Checksum: cfg.Checksum, - Params: cfg.Params, - Headers: make(map[string]string), - Labels: cfg.Labels, + Type: cfg.Type, + Name: cfg.Name, + File: cfg.File, + Interval: cfg.Interval.Duration(), + Limit: cfg.Limit, + Concurrency: cfg.Concurrency, + Checksum: cfg.Checksum, + Params: cfg.Params, + Headers: make(map[string]string), + NotifierHeaders: make(map[string]string), + Labels: cfg.Labels, doneCh: make(chan struct{}), finishedCh: make(chan struct{}), @@ -117,6 +119,9 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti for _, h := range cfg.Headers { g.Headers[h.Key] = h.Value } + for _, h := range cfg.NotifierHeaders { + g.NotifierHeaders[h.Key] = h.Value + } g.metrics = newGroupMetrics(g) rules := make([]Rule, len(cfg.Rules)) for i, r := range cfg.Rules { @@ -230,6 +235,7 @@ func (g *Group) updateWith(newGroup *Group) error { g.Concurrency = newGroup.Concurrency g.Params = newGroup.Params g.Headers = newGroup.Headers + g.NotifierHeaders = newGroup.NotifierHeaders g.Labels = newGroup.Labels g.Limit = newGroup.Limit g.Checksum = newGroup.Checksum @@ -294,7 +300,10 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r e := &executor{ rw: rw, notifiers: nts, - previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)} + previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + + notifierHeaders: g.NotifierHeaders, + } evalTS := time.Now() @@ -412,6 +421,8 @@ type executor struct { // where `ruleID` is ID of the Rule within a Group // and `ruleLabels` is []prompb.Label marshalled to a string previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label + + notifierHeaders map[string]string } func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error { @@ -504,7 +515,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur for _, nt := range e.notifiers() { wg.Add(1) go func(nt notifier.Notifier) { - if err := nt.Send(ctx, alerts); err != nil { + if err := nt.Send(ctx, alerts, e.notifierHeaders); err != nil { errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err)) } wg.Done() diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go index 60183aad86..19684e52b4 100644 --- a/app/vmalert/helpers_test.go +++ b/app/vmalert/helpers_test.go @@ -131,7 +131,7 @@ type fakeNotifier struct { func (*fakeNotifier) Close() {} func (*fakeNotifier) Addr() string { return "" } -func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error { +func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert, _ map[string]string) error { fn.Lock() defer fn.Unlock() fn.counter += len(alerts) @@ -155,7 +155,7 @@ type faultyNotifier struct { fakeNotifier } -func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert) error { +func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert, _ map[string]string) error { d, ok := ctx.Deadline() if ok { time.Sleep(time.Until(d)) diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index ac1a516f3a..fa04f778a3 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -176,15 +176,17 @@ func (g *Group) toAPI() APIGroup { // encode as string to avoid rounding ID: fmt.Sprintf("%d", g.ID()), - Name: g.Name, - Type: g.Type.String(), - File: g.File, - Interval: g.Interval.Seconds(), - LastEvaluation: g.LastEvaluation, - Concurrency: g.Concurrency, - Params: urlValuesToStrings(g.Params), - Headers: headersToStrings(g.Headers), - Labels: g.Labels, + Name: g.Name, + Type: g.Type.String(), + File: g.File, + Interval: g.Interval.Seconds(), + LastEvaluation: g.LastEvaluation, + Concurrency: g.Concurrency, + Params: urlValuesToStrings(g.Params), + Headers: headersToStrings(g.Headers), + NotifierHeaders: headersToStrings(g.NotifierHeaders), + + Labels: g.Labels, } for _, r := range g.Rules { ag.Rules = append(ag.Rules, r.ToAPI()) diff --git a/app/vmalert/notifier/alertmanager.go b/app/vmalert/notifier/alertmanager.go index 22d09784c3..d30d34d7c3 100644 --- a/app/vmalert/notifier/alertmanager.go +++ b/app/vmalert/notifier/alertmanager.go @@ -51,16 +51,16 @@ func (am *AlertManager) Close() { func (am AlertManager) Addr() string { return am.addr } // Send an alert or resolve message -func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error { +func (am *AlertManager) Send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error { am.metrics.alertsSent.Add(len(alerts)) - err := am.send(ctx, alerts) + err := am.send(ctx, alerts, notifierHeaders) if err != nil { am.metrics.alertsSendErrors.Add(len(alerts)) } return err } -func (am *AlertManager) send(ctx context.Context, alerts []Alert) error { +func (am *AlertManager) send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error { b := &bytes.Buffer{} writeamRequest(b, alerts, am.argFunc, am.relabelConfigs) @@ -69,6 +69,9 @@ func (am *AlertManager) send(ctx context.Context, alerts []Alert) error { return err } req.Header.Set("Content-Type", "application/json") + for key, value := range notifierHeaders { + req.Header.Set(key, value) + } if am.timeout > 0 { var cancel context.CancelFunc @@ -105,7 +108,8 @@ const alertManagerPath = "/api/v2/alerts" // NewAlertManager is a constructor for AlertManager func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, authCfg promauth.HTTPClientConfig, - relabelCfg *promrelabel.ParsedConfigs, timeout time.Duration) (*AlertManager, error) { + relabelCfg *promrelabel.ParsedConfigs, timeout time.Duration, +) (*AlertManager, error) { tls := &promauth.TLSConfig{} if authCfg.TLSConfig != nil { tls = authCfg.TLSConfig diff --git a/app/vmalert/notifier/alertmanager_test.go b/app/vmalert/notifier/alertmanager_test.go index 9680edcd70..9bc79009f9 100644 --- a/app/vmalert/notifier/alertmanager_test.go +++ b/app/vmalert/notifier/alertmanager_test.go @@ -90,10 +90,10 @@ func TestAlertManager_Send(t *testing.T) { if err != nil { t.Errorf("unexpected error: %s", err) } - if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil { + if err := am.Send(context.Background(), []Alert{{}, {}}, nil); err == nil { t.Error("expected connection error got nil") } - if err := am.Send(context.Background(), []Alert{}); err == nil { + if err := am.Send(context.Background(), []Alert{}, nil); err == nil { t.Error("expected wrong http code error got nil") } if err := am.Send(context.Background(), []Alert{{ @@ -102,7 +102,7 @@ func TestAlertManager_Send(t *testing.T) { Start: time.Now().UTC(), End: time.Now().UTC(), Annotations: map[string]string{"a": "b", "c": "d", "e": "f"}, - }}); err != nil { + }}, nil); err != nil { t.Errorf("unexpected error %s", err) } if c != 2 { diff --git a/app/vmalert/notifier/notifier.go b/app/vmalert/notifier/notifier.go index 996805a71a..378cd81240 100644 --- a/app/vmalert/notifier/notifier.go +++ b/app/vmalert/notifier/notifier.go @@ -7,7 +7,7 @@ type Notifier interface { // Send sends the given list of alerts. // Returns an error if fails to send the alerts. // Must unblock if the given ctx is cancelled. - Send(ctx context.Context, alerts []Alert) error + Send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error // Addr returns address where alerts are sent. Addr() string // Close is a destructor for the Notifier diff --git a/app/vmalert/web_types.go b/app/vmalert/web_types.go index 48b5a298a1..a231632160 100644 --- a/app/vmalert/web_types.go +++ b/app/vmalert/web_types.go @@ -72,6 +72,8 @@ type APIGroup struct { Params []string `json:"params,omitempty"` // Headers contains HTTP headers added to each Rule's request Headers []string `json:"headers,omitempty"` + // NotifierHeaders contains HTTP headers added to each alert request which will send to notifier + NotifierHeaders []string `json:"notifier_headers,omitempty"` // Labels is a set of label value pairs, that will be added to every rule. Labels map[string]string `json:"labels,omitempty"` }