vmalert: allow configuring custom notifier headers per group (#4088)

vmalert: allow configuring custom notifier headers per group
This commit is contained in:
Haleygo 2023-04-27 18:17:26 +08:00 committed by GitHub
parent 9e99f2f5b3
commit 6c322b4a00
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 55 additions and 35 deletions

View file

@ -36,7 +36,8 @@ type Group struct {
Params url.Values `yaml:"params"` Params url.Values `yaml:"params"`
// Headers contains optional HTTP headers added to each rule request // Headers contains optional HTTP headers added to each rule request
Headers []Header `yaml:"headers,omitempty"` Headers []Header `yaml:"headers,omitempty"`
// NotifierHeaders contains optional HTTP headers added to each alert request which will send to notifier
NotifierHeaders []Header `yaml:"notifier_headers,omitempty"`
// Catches all undefined fields and must be empty after parsing. // Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"` XXX map[string]interface{} `yaml:",inline"`
} }

View file

@ -36,9 +36,10 @@ type Group struct {
Checksum string Checksum string
LastEvaluation time.Time LastEvaluation time.Time
Labels map[string]string Labels map[string]string
Params url.Values Params url.Values
Headers map[string]string Headers map[string]string
NotifierHeaders map[string]string
doneCh chan struct{} doneCh chan struct{}
finishedCh chan struct{} finishedCh chan struct{}
@ -93,16 +94,17 @@ func mergeLabels(groupName, ruleName string, set1, set2 map[string]string) map[s
func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group { func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group {
g := &Group{ g := &Group{
Type: cfg.Type, Type: cfg.Type,
Name: cfg.Name, Name: cfg.Name,
File: cfg.File, File: cfg.File,
Interval: cfg.Interval.Duration(), Interval: cfg.Interval.Duration(),
Limit: cfg.Limit, Limit: cfg.Limit,
Concurrency: cfg.Concurrency, Concurrency: cfg.Concurrency,
Checksum: cfg.Checksum, Checksum: cfg.Checksum,
Params: cfg.Params, Params: cfg.Params,
Headers: make(map[string]string), Headers: make(map[string]string),
Labels: cfg.Labels, NotifierHeaders: make(map[string]string),
Labels: cfg.Labels,
doneCh: make(chan struct{}), doneCh: make(chan struct{}),
finishedCh: make(chan struct{}), finishedCh: make(chan struct{}),
@ -117,6 +119,9 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
for _, h := range cfg.Headers { for _, h := range cfg.Headers {
g.Headers[h.Key] = h.Value g.Headers[h.Key] = h.Value
} }
for _, h := range cfg.NotifierHeaders {
g.NotifierHeaders[h.Key] = h.Value
}
g.metrics = newGroupMetrics(g) g.metrics = newGroupMetrics(g)
rules := make([]Rule, len(cfg.Rules)) rules := make([]Rule, len(cfg.Rules))
for i, r := range cfg.Rules { for i, r := range cfg.Rules {
@ -230,6 +235,7 @@ func (g *Group) updateWith(newGroup *Group) error {
g.Concurrency = newGroup.Concurrency g.Concurrency = newGroup.Concurrency
g.Params = newGroup.Params g.Params = newGroup.Params
g.Headers = newGroup.Headers g.Headers = newGroup.Headers
g.NotifierHeaders = newGroup.NotifierHeaders
g.Labels = newGroup.Labels g.Labels = newGroup.Labels
g.Limit = newGroup.Limit g.Limit = newGroup.Limit
g.Checksum = newGroup.Checksum g.Checksum = newGroup.Checksum
@ -294,7 +300,10 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
e := &executor{ e := &executor{
rw: rw, rw: rw,
notifiers: nts, notifiers: nts,
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)} previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
notifierHeaders: g.NotifierHeaders,
}
evalTS := time.Now() evalTS := time.Now()
@ -412,6 +421,8 @@ type executor struct {
// where `ruleID` is ID of the Rule within a Group // where `ruleID` is ID of the Rule within a Group
// and `ruleLabels` is []prompb.Label marshalled to a string // and `ruleLabels` is []prompb.Label marshalled to a string
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
notifierHeaders map[string]string
} }
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error { func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration, limit int) chan error {
@ -504,7 +515,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur
for _, nt := range e.notifiers() { for _, nt := range e.notifiers() {
wg.Add(1) wg.Add(1)
go func(nt notifier.Notifier) { go func(nt notifier.Notifier) {
if err := nt.Send(ctx, alerts); err != nil { if err := nt.Send(ctx, alerts, e.notifierHeaders); err != nil {
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err)) errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
} }
wg.Done() wg.Done()

View file

@ -131,7 +131,7 @@ type fakeNotifier struct {
func (*fakeNotifier) Close() {} func (*fakeNotifier) Close() {}
func (*fakeNotifier) Addr() string { return "" } func (*fakeNotifier) Addr() string { return "" }
func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error { func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert, _ map[string]string) error {
fn.Lock() fn.Lock()
defer fn.Unlock() defer fn.Unlock()
fn.counter += len(alerts) fn.counter += len(alerts)
@ -155,7 +155,7 @@ type faultyNotifier struct {
fakeNotifier fakeNotifier
} }
func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert) error { func (fn *faultyNotifier) Send(ctx context.Context, _ []notifier.Alert, _ map[string]string) error {
d, ok := ctx.Deadline() d, ok := ctx.Deadline()
if ok { if ok {
time.Sleep(time.Until(d)) time.Sleep(time.Until(d))

View file

@ -176,15 +176,17 @@ func (g *Group) toAPI() APIGroup {
// encode as string to avoid rounding // encode as string to avoid rounding
ID: fmt.Sprintf("%d", g.ID()), ID: fmt.Sprintf("%d", g.ID()),
Name: g.Name, Name: g.Name,
Type: g.Type.String(), Type: g.Type.String(),
File: g.File, File: g.File,
Interval: g.Interval.Seconds(), Interval: g.Interval.Seconds(),
LastEvaluation: g.LastEvaluation, LastEvaluation: g.LastEvaluation,
Concurrency: g.Concurrency, Concurrency: g.Concurrency,
Params: urlValuesToStrings(g.Params), Params: urlValuesToStrings(g.Params),
Headers: headersToStrings(g.Headers), Headers: headersToStrings(g.Headers),
Labels: g.Labels, NotifierHeaders: headersToStrings(g.NotifierHeaders),
Labels: g.Labels,
} }
for _, r := range g.Rules { for _, r := range g.Rules {
ag.Rules = append(ag.Rules, r.ToAPI()) ag.Rules = append(ag.Rules, r.ToAPI())

View file

@ -51,16 +51,16 @@ func (am *AlertManager) Close() {
func (am AlertManager) Addr() string { return am.addr } func (am AlertManager) Addr() string { return am.addr }
// Send an alert or resolve message // Send an alert or resolve message
func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error { func (am *AlertManager) Send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error {
am.metrics.alertsSent.Add(len(alerts)) am.metrics.alertsSent.Add(len(alerts))
err := am.send(ctx, alerts) err := am.send(ctx, alerts, notifierHeaders)
if err != nil { if err != nil {
am.metrics.alertsSendErrors.Add(len(alerts)) am.metrics.alertsSendErrors.Add(len(alerts))
} }
return err return err
} }
func (am *AlertManager) send(ctx context.Context, alerts []Alert) error { func (am *AlertManager) send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error {
b := &bytes.Buffer{} b := &bytes.Buffer{}
writeamRequest(b, alerts, am.argFunc, am.relabelConfigs) writeamRequest(b, alerts, am.argFunc, am.relabelConfigs)
@ -69,6 +69,9 @@ func (am *AlertManager) send(ctx context.Context, alerts []Alert) error {
return err return err
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
for key, value := range notifierHeaders {
req.Header.Set(key, value)
}
if am.timeout > 0 { if am.timeout > 0 {
var cancel context.CancelFunc var cancel context.CancelFunc
@ -105,7 +108,8 @@ const alertManagerPath = "/api/v2/alerts"
// NewAlertManager is a constructor for AlertManager // NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, authCfg promauth.HTTPClientConfig, func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, authCfg promauth.HTTPClientConfig,
relabelCfg *promrelabel.ParsedConfigs, timeout time.Duration) (*AlertManager, error) { relabelCfg *promrelabel.ParsedConfigs, timeout time.Duration,
) (*AlertManager, error) {
tls := &promauth.TLSConfig{} tls := &promauth.TLSConfig{}
if authCfg.TLSConfig != nil { if authCfg.TLSConfig != nil {
tls = authCfg.TLSConfig tls = authCfg.TLSConfig

View file

@ -90,10 +90,10 @@ func TestAlertManager_Send(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unexpected error: %s", err) t.Errorf("unexpected error: %s", err)
} }
if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil { if err := am.Send(context.Background(), []Alert{{}, {}}, nil); err == nil {
t.Error("expected connection error got nil") t.Error("expected connection error got nil")
} }
if err := am.Send(context.Background(), []Alert{}); err == nil { if err := am.Send(context.Background(), []Alert{}, nil); err == nil {
t.Error("expected wrong http code error got nil") t.Error("expected wrong http code error got nil")
} }
if err := am.Send(context.Background(), []Alert{{ if err := am.Send(context.Background(), []Alert{{
@ -102,7 +102,7 @@ func TestAlertManager_Send(t *testing.T) {
Start: time.Now().UTC(), Start: time.Now().UTC(),
End: time.Now().UTC(), End: time.Now().UTC(),
Annotations: map[string]string{"a": "b", "c": "d", "e": "f"}, Annotations: map[string]string{"a": "b", "c": "d", "e": "f"},
}}); err != nil { }}, nil); err != nil {
t.Errorf("unexpected error %s", err) t.Errorf("unexpected error %s", err)
} }
if c != 2 { if c != 2 {

View file

@ -7,7 +7,7 @@ type Notifier interface {
// Send sends the given list of alerts. // Send sends the given list of alerts.
// Returns an error if fails to send the alerts. // Returns an error if fails to send the alerts.
// Must unblock if the given ctx is cancelled. // Must unblock if the given ctx is cancelled.
Send(ctx context.Context, alerts []Alert) error Send(ctx context.Context, alerts []Alert, notifierHeaders map[string]string) error
// Addr returns address where alerts are sent. // Addr returns address where alerts are sent.
Addr() string Addr() string
// Close is a destructor for the Notifier // Close is a destructor for the Notifier

View file

@ -72,6 +72,8 @@ type APIGroup struct {
Params []string `json:"params,omitempty"` Params []string `json:"params,omitempty"`
// Headers contains HTTP headers added to each Rule's request // Headers contains HTTP headers added to each Rule's request
Headers []string `json:"headers,omitempty"` Headers []string `json:"headers,omitempty"`
// NotifierHeaders contains HTTP headers added to each alert request which will send to notifier
NotifierHeaders []string `json:"notifier_headers,omitempty"`
// Labels is a set of label value pairs, that will be added to every rule. // Labels is a set of label value pairs, that will be added to every rule.
Labels map[string]string `json:"labels,omitempty"` Labels map[string]string `json:"labels,omitempty"`
} }