vmalert: support configuration file for notifiers (#2127)

vmalert: support configuration file for notifiers

* vmalert notifiers now can be configured via file
see https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file
* add support of Consul service discovery for notifiers config
see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1947
* add UI section for currently loaded/discovered notifiers
* deprecate `-rule.configCheckInterval` in favour of `-configCheckInterval`
* add ability to suppress logs for duplicated targets for notifiers discovery
* change behaviour of `vmalert_alerts_send_errors_total` - it now accounts
for failed alerts, not HTTP calls.
This commit is contained in:
Roman Khavronenko 2022-02-02 14:11:41 +02:00 committed by GitHub
parent 2016a2c899
commit 5da71eb685
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 1827 additions and 495 deletions

View file

@ -70,6 +70,13 @@ run-vmalert: vmalert
-evaluationInterval=3s \
-rule.configCheckInterval=10s
run-vmalert-sd: vmalert
./bin/vmalert -rule=app/vmalert/config/testdata/rules2-good.rules \
-datasource.url=http://localhost:8428 \
-remoteWrite.url=http://localhost:8428 \
-notifier.config=app/vmalert/notifier/testdata/consul.good.yaml \
-configCheckInterval=10s
replay-vmalert: vmalert
./bin/vmalert -rule=app/vmalert/config/testdata/rules-replay-good.rules \
-datasource.url=http://localhost:8428 \

View file

@ -43,7 +43,8 @@ To start using `vmalert` you will need the following things:
* list of rules - PromQL/MetricsQL expressions to execute;
* datasource address - reachable MetricsQL endpoint to run queries against;
* notifier address [optional] - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing,
aggregating alerts, and sending notifications.
aggregating alerts, and sending notifications. Please note, notifier address also supports Consul Service Discovery via
[config file](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/notifier/config.go).
* remote write address [optional] - [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations)
compatible storage to persist rules and alerts state info;
* remote read address [optional] - MetricsQL compatible datasource to restore alerts state from.
@ -689,8 +690,8 @@ The shortlist of configuration flags is the following:
absolute path to all .yaml files in root.
Rule files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars.
Supports an array of values separated by comma or specified via multiple flags.
-rule.configCheckInterval duration
Interval for checking for changes in '-rule' files. By default the checking is disabled. Send SIGHUP signal in order to force config check for changes
-configCheckInterval duration
Interval for checking for changes in '-rule' or '-notifier.config' files. By default the checking is disabled. Send SIGHUP signal in order to force config check for changes
-rule.maxResolveDuration duration
Limits the maximum duration for automatic alert expiration, which is by default equal to 3 evaluation intervals of the parent group.
-rule.validateExpressions
@ -703,6 +704,14 @@ The shortlist of configuration flags is the following:
Path to file with TLS certificate. Used only if -tls is set. Prefer ECDSA certs instead of RSA certs as RSA certs are slower
-tlsKeyFile string
Path to file with TLS key. Used only if -tls is set
-promscrape.consul.waitTime duration
Wait time used by Consul service discovery. Default value is used if not set
-promscrape.consulSDCheckInterval duration
Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details (default 30s)
-promscrape.discovery.concurrency int
The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.) (default 100)
-promscrape.discovery.concurrentWaitTime duration
The maximum duration for waiting to perform API requests if more than -promscrape.discovery.concurrency requests are simultaneously performed (default 1m0s)
-version
Show VictoriaMetrics version
```
@ -711,7 +720,7 @@ The shortlist of configuration flags is the following:
`vmalert` supports "hot" config reload via the following methods:
* send SIGHUP signal to `vmalert` process;
* send GET request to `/-/reload` endpoint;
* configure `-rule.configCheckInterval` flag for periodic reload
* configure `-configCheckInterval` flag for periodic reload
on config change.
### URL params
@ -732,6 +741,88 @@ Please note, `params` are used only for executing rules expressions (requests to
If there would be a conflict between URL params set in `datasource.url` flag and params in group definition
the latter will have higher priority.
### Notifier configuration file
Notifier also supports configuration vai file specified with flag `notifier.config`:
```
./bin/vmalert -rule=app/vmalert/config/testdata/rules.good.rules \
-datasource.url=http://localhost:8428 \
-notifier.config=app/vmalert/notifier/testdata/consul.good.yaml
```
The configuration file allows to configure static notifiers or discover notifiers via
[Consul](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config).
For example:
```
static_configs:
- targets:
- localhost:9093
- localhost:9095
consul_sd_configs:
- server: localhost:8500
services:
- alertmanager
```
The list of configured or discovered Notifiers can be explored via [UI](#Web).
The configuration file [specification](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/notifier/config.go)
is the following:
```
# Per-target Notifier timeout when pushing alerts.
[ timeout: <duration> | default = 10s ]
# Prefix for the HTTP path alerts are pushed to.
[ path_prefix: <path> | default = / ]
# Configures the protocol scheme used for requests.
[ scheme: <scheme> | default = http ]
# Sets the `Authorization` header on every request with the
# configured username and password.
# password and password_file are mutually exclusive.
basic_auth:
[ username: <string> ]
[ password: <secret> ]
[ password_file: <string> ]
# Optional `Authorization` header configuration.
authorization:
# Sets the authentication type.
[ type: <string> | default: Bearer ]
# Sets the credentials. It is mutually exclusive with
# `credentials_file`.
[ credentials: <secret> ]
# Sets the credentials to the credentials read from the configured file.
# It is mutually exclusive with `credentials`.
[ credentials_file: <filename> ]
# Configures the scrape request's TLS settings.
# see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config
tls_config:
[ <tls_config> ]
# List of labeled statically configured Notifiers.
static_configs:
targets:
[ - '<host>' ]
# List of Consul service discovery configurations.
# See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
consul_sd_configs:
[ - <consul_sd_config> ... ]
# List of relabel configurations.
# Supports the same relabeling features as the rest of VictoriaMetrics components.
# See https://docs.victoriametrics.com/vmagent.html#relabeling
relabel_configs:
[ - <relabel_config> ... ]
```
The configuration file can be [hot-reloaded](#hot-config-reload).
## Contributing

View file

@ -12,9 +12,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
// AlertingRule is basic alert entity
@ -50,10 +50,10 @@ type AlertingRule struct {
}
type alertingRuleMetrics struct {
errors *gauge
pending *gauge
active *gauge
samples *gauge
errors *utils.Gauge
pending *utils.Gauge
active *utils.Gauge
samples *utils.Gauge
}
func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *AlertingRule {
@ -78,7 +78,7 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
}
labels := fmt.Sprintf(`alertname=%q, group=%q, id="%d"`, ar.Name, group.Name, ar.ID())
ar.metrics.pending = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels),
ar.metrics.pending = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_pending{%s}`, labels),
func() float64 {
ar.mu.RLock()
defer ar.mu.RUnlock()
@ -90,7 +90,7 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
}
return float64(num)
})
ar.metrics.active = getOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels),
ar.metrics.active = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerts_firing{%s}`, labels),
func() float64 {
ar.mu.RLock()
defer ar.mu.RUnlock()
@ -102,7 +102,7 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
}
return float64(num)
})
ar.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_error{%s}`, labels),
ar.metrics.errors = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_error{%s}`, labels),
func() float64 {
ar.mu.RLock()
defer ar.mu.RUnlock()
@ -111,7 +111,7 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
}
return 1
})
ar.metrics.samples = getOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_last_evaluation_samples{%s}`, labels),
ar.metrics.samples = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_last_evaluation_samples{%s}`, labels),
func() float64 {
ar.mu.RLock()
defer ar.mu.RUnlock()
@ -122,10 +122,10 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
// Close unregisters rule metrics
func (ar *AlertingRule) Close() {
metrics.UnregisterMetric(ar.metrics.active.name)
metrics.UnregisterMetric(ar.metrics.pending.name)
metrics.UnregisterMetric(ar.metrics.errors.name)
metrics.UnregisterMetric(ar.metrics.samples.name)
ar.metrics.active.Unregister()
ar.metrics.pending.Unregister()
ar.metrics.errors.Unregister()
ar.metrics.samples.Unregister()
}
// String implements Stringer interface
@ -153,7 +153,7 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
return nil, fmt.Errorf("`query` template isn't supported in replay mode")
}
for _, s := range series {
// set additional labels to identify group and rule name
// set additional labels to identify group and rule Name
if ar.Name != "" {
s.SetLabel(alertNameLabel, ar.Name)
}

View file

@ -41,15 +41,15 @@ type Group struct {
}
type groupMetrics struct {
iterationTotal *counter
iterationDuration *summary
iterationTotal *utils.Counter
iterationDuration *utils.Summary
}
func newGroupMetrics(name, file string) *groupMetrics {
m := &groupMetrics{}
labels := fmt.Sprintf(`group=%q, file=%q`, name, file)
m.iterationTotal = getOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
m.iterationDuration = getOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
m.iterationTotal = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
m.iterationDuration = utils.GetOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
return m
}
@ -122,7 +122,7 @@ func (g *Group) newRule(qb datasource.QuerierBuilder, rule config.Rule) Rule {
}
// ID return unique group ID that consists of
// rules file and group name
// rules file and group Name
func (g *Group) ID() uint64 {
g.mu.RLock()
defer g.mu.RUnlock()
@ -213,8 +213,8 @@ func (g *Group) close() {
close(g.doneCh)
<-g.finishedCh
metrics.UnregisterMetric(g.metrics.iterationDuration.name)
metrics.UnregisterMetric(g.metrics.iterationTotal.name)
g.metrics.iterationDuration.Unregister()
g.metrics.iterationTotal.Unregister()
for _, rule := range g.Rules {
rule.Close()
}
@ -222,7 +222,7 @@ func (g *Group) close() {
var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, nts []notifier.Notifier, rw *remotewrite.Client) {
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client) {
defer func() { close(g.finishedCh) }()
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
@ -246,16 +246,7 @@ func (g *Group) start(ctx context.Context, nts []notifier.Notifier, rw *remotewr
}
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
e := &executor{rw: rw}
for _, nt := range nts {
ent := eNotifier{
Notifier: nt,
alertsSent: getOrCreateCounter(fmt.Sprintf("vmalert_alerts_sent_total{addr=%q}", nt.Addr())),
alertsSendErrors: getOrCreateCounter(fmt.Sprintf("vmalert_alerts_send_errors_total{addr=%q}", nt.Addr())),
}
e.notifiers = append(e.notifiers, ent)
}
e := &executor{rw: rw, notifiers: nts}
t := time.NewTicker(g.Interval)
defer t.Stop()
for {
@ -310,16 +301,10 @@ func getResolveDuration(groupInterval time.Duration) time.Duration {
}
type executor struct {
notifiers []eNotifier
notifiers func() []notifier.Notifier
rw *remotewrite.Client
}
type eNotifier struct {
notifier.Notifier
alertsSent *counter
alertsSendErrors *counter
}
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, resolveDuration time.Duration) chan error {
res := make(chan error, len(rules))
if concurrency == 1 {
@ -400,11 +385,9 @@ func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Dur
}
errGr := new(utils.ErrGroup)
for _, nt := range e.notifiers {
nt.alertsSent.Add(len(alerts))
for _, nt := range e.notifiers() {
if err := nt.Send(ctx, alerts); err != nil {
nt.alertsSendErrors.Inc()
errGr.Add(fmt.Errorf("rule %q: failed to send alerts: %w", rule, err))
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
}
}
return errGr.Err()

View file

@ -212,7 +212,7 @@ func TestGroupStart(t *testing.T) {
fs.add(m1)
fs.add(m2)
go func() {
g.start(context.Background(), []notifier.Notifier{fn}, nil)
g.start(context.Background(), func() []notifier.Notifier { return []notifier.Notifier{fn} }, nil)
close(finished)
}()

View file

@ -63,6 +63,7 @@ type fakeNotifier struct {
alerts []notifier.Alert
}
func (*fakeNotifier) Close() {}
func (*fakeNotifier) Addr() string { return "" }
func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error {
fn.Lock()

View file

@ -35,7 +35,10 @@ absolute path to all .yaml files in root.
Rule files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars.`)
rulesCheckInterval = flag.Duration("rule.configCheckInterval", 0, "Interval for checking for changes in '-rule' files. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes")
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes. DEPRECATED - see '-configCheckInterval' instead")
configCheckInterval = flag.Duration("configCheckInterval", 0, "Interval for checking for changes in '-rule' or '-notifier.config' files. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes.")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules")
@ -47,14 +50,14 @@ Rule files may contain %{ENV_VAR} placeholders, which are substituted by the cor
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier")
externalAlertSource = flag.String("external.alert.source", "", `External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service.
eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|queryEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used`)
externalLabels = flagutil.NewArray("external.label", "Optional label in the form 'name=value' to add to all generated recording rules and alerts. "+
externalLabels = flagutil.NewArray("external.label", "Optional label in the form 'Name=value' to add to all generated recording rules and alerts. "+
"Pass multiple -label flags in order to add multiple label sets.")
remoteReadLookBack = flag.Duration("remoteRead.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries."+
" For example, if lookback=1h then range from now() to now()-1h will be scanned.")
remoteReadIgnoreRestoreErrors = flag.Bool("remoteRead.ignoreRestoreErrors", true, "Whether to ignore errors from remote storage when restoring alerts state on startup.")
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's name as label to generated alerts and time series.")
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's Name as label to generated alerts and time series.")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The `-rule` flag must be specified.")
)
@ -192,7 +195,7 @@ func newManager(ctx context.Context) (*manager, error) {
}
n := strings.IndexByte(s, '=')
if n < 0 {
return nil, fmt.Errorf("missing '=' in `-label`. It must contain label in the form `name=value`; got %q", s)
return nil, fmt.Errorf("missing '=' in `-label`. It must contain label in the form `Name=value`; got %q", s)
}
manager.labels[s[:n]] = s[n+1:]
}
@ -254,8 +257,13 @@ See the docs at https://docs.victoriametrics.com/vmalert.html .
func configReload(ctx context.Context, m *manager, groupsCfg []config.Group, sighupCh <-chan os.Signal) {
var configCheckCh <-chan time.Time
if *rulesCheckInterval > 0 {
ticker := time.NewTicker(*rulesCheckInterval)
checkInterval := *configCheckInterval
if checkInterval == 0 && *rulesCheckInterval > 0 {
logger.Warnf("flag `rule.configCheckInterval` is deprecated - use `configCheckInterval` instead")
checkInterval = *rulesCheckInterval
}
if checkInterval > 0 {
ticker := time.NewTicker(checkInterval)
configCheckCh = ticker.C
defer ticker.Stop()
}
@ -272,6 +280,12 @@ func configReload(ctx context.Context, m *manager, groupsCfg []config.Group, sig
configReloads.Inc()
case <-configCheckCh:
}
if err := notifier.Reload(); err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("failed to reload notifier config: %s", err)
continue
}
newGroupsCfg, err := config.Parse(*rulePath, *validateTemplates, *validateExpressions)
if err != nil {
configReloadErrors.Inc()

View file

@ -100,7 +100,7 @@ groups:
querierBuilder: &fakeQuerier{},
groups: make(map[uint64]*Group),
labels: map[string]string{},
notifiers: []notifier.Notifier{&fakeNotifier{}},
notifiers: func() []notifier.Notifier { return []notifier.Notifier{&fakeNotifier{}} },
rw: &remotewrite.Client{},
}

View file

@ -17,7 +17,7 @@ import (
// manager controls group states
type manager struct {
querierBuilder datasource.QuerierBuilder
notifiers []notifier.Notifier
notifiers func() []notifier.Notifier
rw *remotewrite.Client
// remote read builder.
@ -109,7 +109,7 @@ func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore
return fmt.Errorf("config contains recording rules but `-remoteWrite.url` isn't set")
}
if arPresent && m.notifiers == nil {
return fmt.Errorf("config contains alerting rules but `-notifier.url` isn't set")
return fmt.Errorf("config contains alerting rules but neither `-notifier.url` nor `-notifier.config` aren't set")
}
type updateItem struct {

View file

@ -40,7 +40,7 @@ func TestManagerUpdateConcurrent(t *testing.T) {
m := &manager{
groups: make(map[uint64]*Group),
querierBuilder: &fakeQuerier{},
notifiers: []notifier.Notifier{&fakeNotifier{}},
notifiers: func() []notifier.Notifier { return []notifier.Notifier{&fakeNotifier{}} },
}
paths := []string{
"config/testdata/dir/rules0-good.rules",
@ -223,7 +223,7 @@ func TestManagerUpdate(t *testing.T) {
m := &manager{
groups: make(map[uint64]*Group),
querierBuilder: &fakeQuerier{},
notifiers: []notifier.Notifier{&fakeNotifier{}},
notifiers: func() []notifier.Notifier { return []notifier.Notifier{&fakeNotifier{}} },
}
cfgInit := loadCfg(t, []string{tc.initPath}, true, true)
@ -311,9 +311,11 @@ func TestManagerUpdateNegative(t *testing.T) {
m := &manager{
groups: make(map[uint64]*Group),
querierBuilder: &fakeQuerier{},
notifiers: tc.notifiers,
rw: tc.rw,
}
if tc.notifiers != nil {
m.notifiers = func() []notifier.Notifier { return tc.notifiers }
}
err := m.update(context.Background(), []config.Group{tc.cfg}, false)
if err == nil {
t.Fatalf("expected to get error; got nil")

View file

@ -1,39 +0,0 @@
package main
import "github.com/VictoriaMetrics/metrics"
type gauge struct {
name string
*metrics.Gauge
}
func getOrCreateGauge(name string, f func() float64) *gauge {
return &gauge{
name: name,
Gauge: metrics.GetOrCreateGauge(name, f),
}
}
type counter struct {
name string
*metrics.Counter
}
func getOrCreateCounter(name string) *counter {
return &counter{
name: name,
Counter: metrics.GetOrCreateCounter(name),
}
}
type summary struct {
name string
*metrics.Summary
}
func getOrCreateSummary(name string) *summary {
return &summary{
name: name,
Summary: metrics.GetOrCreateSummary(name),
}
}

View file

@ -6,18 +6,41 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
// AlertManager represents integration provider with Prometheus alert manager
// https://github.com/prometheus/alertmanager
type AlertManager struct {
addr string
alertURL string
basicAuthUser string
basicAuthPass string
argFunc AlertURLGenerator
client *http.Client
addr string
argFunc AlertURLGenerator
client *http.Client
timeout time.Duration
authCfg *promauth.Config
metrics *metrics
}
type metrics struct {
alertsSent *utils.Counter
alertsSendErrors *utils.Counter
}
func newMetrics(addr string) *metrics {
return &metrics{
alertsSent: utils.GetOrCreateCounter(fmt.Sprintf("vmalert_alerts_sent_total{addr=%q}", addr)),
alertsSendErrors: utils.GetOrCreateCounter(fmt.Sprintf("vmalert_alerts_send_errors_total{addr=%q}", addr)),
}
}
// Close is a destructor method for AlertManager
func (am *AlertManager) Close() {
am.metrics.alertsSent.Unregister()
am.metrics.alertsSendErrors.Unregister()
}
// Addr returns address where alerts are sent.
@ -25,17 +48,36 @@ func (am AlertManager) Addr() string { return am.addr }
// Send an alert or resolve message
func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error {
am.metrics.alertsSent.Add(len(alerts))
err := am.send(ctx, alerts)
if err != nil {
am.metrics.alertsSendErrors.Add(len(alerts))
}
return err
}
func (am *AlertManager) send(ctx context.Context, alerts []Alert) error {
b := &bytes.Buffer{}
writeamRequest(b, alerts, am.argFunc)
req, err := http.NewRequest("POST", am.alertURL, b)
req, err := http.NewRequest("POST", am.addr, b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if am.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, am.timeout)
defer cancel()
}
req = req.WithContext(ctx)
if am.basicAuthPass != "" {
req.SetBasicAuth(am.basicAuthUser, am.basicAuthPass)
if am.authCfg != nil {
if auth := am.authCfg.GetAuthHeader(); auth != "" {
req.Header.Set("Authorization", auth)
}
}
resp, err := am.client.Do(req)
if err != nil {
@ -47,9 +89,9 @@ func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error {
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response from %q: %w", am.alertURL, err)
return fmt.Errorf("failed to read response from %q: %w", am.addr, err)
}
return fmt.Errorf("invalid SC %d from %q; response body: %s", resp.StatusCode, am.alertURL, string(body))
return fmt.Errorf("invalid SC %d from %q; response body: %s", resp.StatusCode, am.addr, string(body))
}
return nil
}
@ -60,14 +102,31 @@ type AlertURLGenerator func(Alert) string
const alertManagerPath = "/api/v2/alerts"
// NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL, user, pass string, fn AlertURLGenerator, c *http.Client) *AlertManager {
url := strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath
return &AlertManager{
addr: alertManagerURL,
alertURL: url,
argFunc: fn,
client: c,
basicAuthUser: user,
basicAuthPass: pass,
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, authCfg promauth.HTTPClientConfig, timeout time.Duration) (*AlertManager, error) {
tls := &promauth.TLSConfig{}
if authCfg.TLSConfig != nil {
tls = authCfg.TLSConfig
}
tr, err := utils.Transport(alertManagerURL, tls.CertFile, tls.KeyFile, tls.CAFile, tls.ServerName, tls.InsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}
ba := &promauth.BasicAuthConfig{}
if authCfg.BasicAuth != nil {
ba = authCfg.BasicAuth
}
aCfg, err := utils.AuthConfig(ba.Username, ba.Password.String(), ba.PasswordFile, authCfg.BearerToken.String(), authCfg.BearerTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to configure auth: %w", err)
}
return &AlertManager{
addr: alertManagerURL,
argFunc: fn,
authCfg: aCfg,
client: &http.Client{Transport: tr},
timeout: timeout,
metrics: newMetrics(alertManagerURL),
}, nil
}

View file

@ -8,11 +8,16 @@ import (
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
func TestAlertManager_Addr(t *testing.T) {
const addr = "http://localhost"
am := NewAlertManager(addr, "", "", nil, nil)
am, err := NewAlertManager(addr, nil, promauth.HTTPClientConfig{}, 0)
if err != nil {
t.Errorf("unexpected error: %s", err)
}
if am.Addr() != addr {
t.Errorf("expected to have %q; got %q", addr, am.Addr())
}
@ -75,9 +80,19 @@ func TestAlertManager_Send(t *testing.T) {
})
srv := httptest.NewServer(mux)
defer srv.Close()
am := NewAlertManager(srv.URL, baUser, baPass, func(alert Alert) string {
aCfg := promauth.HTTPClientConfig{
BasicAuth: &promauth.BasicAuthConfig{
Username: baUser,
Password: promauth.NewSecret(baPass),
},
}
am, err := NewAlertManager(srv.URL+alertManagerPath, func(alert Alert) string {
return strconv.FormatUint(alert.GroupID, 10) + "/" + strconv.FormatUint(alert.ID, 10)
}, srv.Client())
}, aCfg, 0)
if err != nil {
t.Errorf("unexpected error: %s", err)
}
if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil {
t.Error("expected connection error got nil")
}

View file

@ -0,0 +1,182 @@
package notifier
import (
"crypto/md5"
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
"net/url"
"path"
"path/filepath"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
)
// Config contains list of supported configuration settings
// for Notifier
type Config struct {
// Scheme defines the HTTP scheme for Notifier address
Scheme string `yaml:"scheme,omitempty"`
// PathPrefix is added to URL path before adding alertManagerPath value
PathPrefix string `yaml:"path_prefix,omitempty"`
// ConsulSDConfigs contains list of settings for service discovery via Consul
// see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs,omitempty"`
// StaticConfigs contains list of static targets
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
// HTTPClientConfig contains HTTP configuration for Notifier clients
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
// RelabelConfigs contains list of relabeling rules
RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs,omitempty"`
// The timeout used when sending alerts.
Timeout utils.PromDuration `yaml:"timeout,omitempty"`
// Checksum stores the hash of yaml definition for the config.
// May be used to detect any changes to the config file.
Checksum string
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
// This is set to the directory from where the config has been loaded.
baseDir string
// stores already parsed RelabelConfigs object
parsedRelabelConfigs *promrelabel.ParsedConfigs
}
// StaticConfig contains list of static targets in the following form:
// targets:
// [ - '<host>' ]
type StaticConfig struct {
Targets []string `yaml:"targets"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
type config Config
if err := unmarshal((*config)(cfg)); err != nil {
return err
}
if cfg.Scheme == "" {
cfg.Scheme = "http"
}
if cfg.Timeout.Duration() == 0 {
cfg.Timeout = utils.NewPromDuration(time.Second * 10)
}
rCfg, err := promrelabel.ParseRelabelConfigs(cfg.RelabelConfigs, false)
if err != nil {
return fmt.Errorf("failed to parse relabeling config: %w", err)
}
cfg.parsedRelabelConfigs = rCfg
b, err := yaml.Marshal(cfg)
if err != nil {
return fmt.Errorf("failed to marshal configuration for checksum: %w", err)
}
h := md5.New()
h.Write(b)
cfg.Checksum = fmt.Sprintf("%x", h.Sum(nil))
return nil
}
func parseConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("error reading config file: %w", err)
}
var cfg *Config
err = yaml.Unmarshal(data, &cfg)
if err != nil {
return nil, err
}
if len(cfg.XXX) > 0 {
var keys []string
for k := range cfg.XXX {
keys = append(keys, k)
}
return nil, fmt.Errorf("unknown fields in %s", strings.Join(keys, ", "))
}
absPath, err := filepath.Abs(path)
if err != nil {
return nil, fmt.Errorf("cannot obtain abs path for %q: %w", path, err)
}
cfg.baseDir = filepath.Dir(absPath)
return cfg, nil
}
func parseLabels(target string, metaLabels map[string]string, cfg *Config) (string, []prompbmarshal.Label, error) {
labels := mergeLabels(target, metaLabels, cfg)
labels = cfg.parsedRelabelConfigs.Apply(labels, 0, false)
labels = promrelabel.RemoveMetaLabels(labels[:0], labels)
// Remove references to already deleted labels, so GC could clean strings for label name and label value past len(labels).
// This should reduce memory usage when relabeling creates big number of temporary labels with long names and/or values.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825 for details.
labels = append([]prompbmarshal.Label{}, labels...)
if len(labels) == 0 {
return "", nil, nil
}
schemeRelabeled := promrelabel.GetLabelValueByName(labels, "__scheme__")
if len(schemeRelabeled) == 0 {
schemeRelabeled = "http"
}
addressRelabeled := promrelabel.GetLabelValueByName(labels, "__address__")
if len(addressRelabeled) == 0 {
return "", nil, nil
}
if strings.Contains(addressRelabeled, "/") {
return "", nil, nil
}
addressRelabeled = addMissingPort(schemeRelabeled, addressRelabeled)
alertsPathRelabeled := promrelabel.GetLabelValueByName(labels, "__alerts_path__")
if !strings.HasPrefix(alertsPathRelabeled, "/") {
alertsPathRelabeled = "/" + alertsPathRelabeled
}
u := fmt.Sprintf("%s://%s%s", schemeRelabeled, addressRelabeled, alertsPathRelabeled)
if _, err := url.Parse(u); err != nil {
return "", nil, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q, metrics_path=%q (%q): %w",
u, cfg.Scheme, schemeRelabeled, target, addressRelabeled, alertsPathRelabeled, err)
}
return u, labels, nil
}
func addMissingPort(scheme, target string) string {
if strings.Contains(target, ":") {
return target
}
if scheme == "https" {
target += ":443"
} else {
target += ":80"
}
return target
}
func mergeLabels(target string, metaLabels map[string]string, cfg *Config) []prompbmarshal.Label {
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
m := make(map[string]string)
m["__address__"] = target
m["__scheme__"] = cfg.Scheme
m["__alerts_path__"] = path.Join("/", cfg.PathPrefix, alertManagerPath)
for k, v := range metaLabels {
m[k] = v
}
result := make([]prompbmarshal.Label, 0, len(m))
for k, v := range m {
result = append(result, prompbmarshal.Label{
Name: k,
Value: v,
})
}
return result
}

View file

@ -0,0 +1,31 @@
package notifier
import (
"strings"
"testing"
)
func TestConfigParseGood(t *testing.T) {
f := func(path string) {
_, err := parseConfig(path)
checkErr(t, err)
}
f("testdata/mixed.good.yaml")
f("testdata/consul.good.yaml")
f("testdata/static.good.yaml")
}
func TestConfigParseBad(t *testing.T) {
f := func(path, expErr string) {
_, err := parseConfig(path)
if err == nil {
t.Fatalf("expected to get non-nil err for config %q", path)
}
if !strings.Contains(err.Error(), expErr) {
t.Errorf("expected err to contain %q; got %q instead", expErr, err)
}
}
f("testdata/unknownFields.bad.yaml", "unknown field")
f("non-existing-file", "error reading")
}

View file

@ -0,0 +1,244 @@
package notifier
import (
"fmt"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
)
// configWatcher supports dynamic reload of Notifier objects
// from static configuration and service discovery.
// Use newWatcher to create a new object.
type configWatcher struct {
cfg *Config
genFn AlertURLGenerator
wg sync.WaitGroup
reloadCh chan struct{}
syncCh chan struct{}
targetsMu sync.RWMutex
targets map[TargetType][]Target
}
func newWatcher(path string, gen AlertURLGenerator) (*configWatcher, error) {
cfg, err := parseConfig(path)
if err != nil {
return nil, err
}
cw := &configWatcher{
cfg: cfg,
wg: sync.WaitGroup{},
reloadCh: make(chan struct{}, 1),
syncCh: make(chan struct{}),
genFn: gen,
targetsMu: sync.RWMutex{},
targets: make(map[TargetType][]Target),
}
return cw, cw.start()
}
func (cw *configWatcher) notifiers() []Notifier {
cw.targetsMu.RLock()
defer cw.targetsMu.RUnlock()
var notifiers []Notifier
for _, ns := range cw.targets {
for _, n := range ns {
notifiers = append(notifiers, n.Notifier)
}
}
return notifiers
}
func (cw *configWatcher) reload(path string) error {
select {
case cw.reloadCh <- struct{}{}:
default:
return nil
}
defer func() { <-cw.reloadCh }()
cfg, err := parseConfig(path)
if err != nil {
return err
}
if cfg.Checksum == cw.cfg.Checksum {
return nil
}
// stop existing discovery
close(cw.syncCh)
cw.wg.Wait()
// re-start cw with new config
cw.syncCh = make(chan struct{})
cw.cfg = cfg
cw.resetTargets()
return cw.start()
}
const (
addRetryBackoff = time.Millisecond * 100
addRetryCount = 2
)
func (cw *configWatcher) add(typeK TargetType, interval time.Duration, labelsFn getLabels) error {
var targets []Target
var errors []error
var count int
for { // retry addRetryCount times if first discovery attempts gave no results
targets, errors = targetsFromLabels(labelsFn, cw.cfg, cw.genFn)
for _, err := range errors {
return fmt.Errorf("failed to init notifier for %q: %s", typeK, err)
}
if len(targets) > 0 || count >= addRetryCount {
break
}
time.Sleep(addRetryBackoff)
}
cw.setTargets(typeK, targets)
cw.wg.Add(1)
go func() {
defer cw.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-cw.syncCh:
return
case <-ticker.C:
}
updateTargets, errors := targetsFromLabels(labelsFn, cw.cfg, cw.genFn)
for _, err := range errors {
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
}
cw.setTargets(typeK, updateTargets)
}
}()
return nil
}
func targetsFromLabels(labelsFn getLabels, cfg *Config, genFn AlertURLGenerator) ([]Target, []error) {
metaLabels, err := labelsFn()
if err != nil {
return nil, []error{fmt.Errorf("failed to get labels: %s", err)}
}
var targets []Target
var errors []error
duplicates := make(map[string]struct{})
for _, labels := range metaLabels {
target := labels["__address__"]
u, processedLabels, err := parseLabels(target, labels, cfg)
if err != nil {
errors = append(errors, err)
continue
}
if len(u) == 0 {
continue
}
if _, ok := duplicates[u]; ok { // check for duplicates
if !*suppressDuplicateTargetErrors {
logger.Errorf("skipping duplicate target with identical address %q; "+
"make sure service discovery and relabeling is set up properly; "+
"original labels: %s; resulting labels: %s",
u, labels, processedLabels)
}
continue
}
duplicates[u] = struct{}{}
am, err := NewAlertManager(u, genFn, cfg.HTTPClientConfig, cfg.Timeout.Duration())
if err != nil {
errors = append(errors, err)
continue
}
targets = append(targets, Target{
Notifier: am,
Labels: processedLabels,
})
}
return targets, errors
}
type getLabels func() ([]map[string]string, error)
func (cw *configWatcher) start() error {
if len(cw.cfg.StaticConfigs) > 0 {
var targets []Target
for _, cfg := range cw.cfg.StaticConfigs {
for _, target := range cfg.Targets {
address, labels, err := parseLabels(target, nil, cw.cfg)
if err != nil {
return fmt.Errorf("failed to parse labels for target %q: %s", target, err)
}
notifier, err := NewAlertManager(address, cw.genFn, cw.cfg.HTTPClientConfig, cw.cfg.Timeout.Duration())
if err != nil {
return fmt.Errorf("failed to init alertmanager for addr %q: %s", address, err)
}
targets = append(targets, Target{
Notifier: notifier,
Labels: labels,
})
}
}
cw.setTargets(TargetStatic, targets)
}
if len(cw.cfg.ConsulSDConfigs) > 0 {
err := cw.add(TargetConsul, *consul.SDCheckInterval, func() ([]map[string]string, error) {
var labels []map[string]string
for i := range cw.cfg.ConsulSDConfigs {
sdc := &cw.cfg.ConsulSDConfigs[i]
targetLabels, err := sdc.GetLabels(cw.cfg.baseDir)
if err != nil {
return nil, fmt.Errorf("got labels err: %s", err)
}
labels = append(labels, targetLabels...)
}
return labels, nil
})
if err != nil {
return fmt.Errorf("failed to start consulSD discovery: %s", err)
}
}
return nil
}
func (cw *configWatcher) resetTargets() {
cw.targetsMu.Lock()
for _, targets := range cw.targets {
for _, t := range targets {
t.Close()
}
}
cw.targets = make(map[TargetType][]Target)
cw.targetsMu.Unlock()
}
func (cw *configWatcher) setTargets(key TargetType, targets []Target) {
cw.targetsMu.Lock()
newT := make(map[string]Target)
for _, t := range targets {
newT[t.Addr()] = t
}
oldT := cw.targets[key]
for _, ot := range oldT {
if _, ok := newT[ot.Addr()]; !ok {
ot.Notifier.Close()
}
}
cw.targets[key] = targets
cw.targetsMu.Unlock()
}

View file

@ -0,0 +1,307 @@
package notifier
import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
)
func TestConfigWatcherReload(t *testing.T) {
f, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(f.Name()) }()
writeToFile(t, f.Name(), `
static_configs:
- targets:
- localhost:9093
- localhost:9094
`)
cw, err := newWatcher(f.Name(), nil)
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
ns := cw.notifiers()
if len(ns) != 2 {
t.Fatalf("expected to have 2 notifiers; got %d %#v", len(ns), ns)
}
f2, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(f2.Name()) }()
writeToFile(t, f2.Name(), `
static_configs:
- targets:
- 127.0.0.1:9093
`)
checkErr(t, cw.reload(f2.Name()))
ns = cw.notifiers()
if len(ns) != 1 {
t.Fatalf("expected to have 1 notifier; got %d", len(ns))
}
expAddr := "http://127.0.0.1:9093/api/v2/alerts"
if ns[0].Addr() != expAddr {
t.Fatalf("expected to get %q; got %q instead", expAddr, ns[0].Addr())
}
}
func TestConfigWatcherStart(t *testing.T) {
consulSDServer := newFakeConsulServer()
defer consulSDServer.Close()
consulSDFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(consulSDFile.Name()) }()
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
scheme: https
path_prefix: proxy
consul_sd_configs:
- server: %s
services:
- alertmanager
`, consulSDServer.URL))
prevCheckInterval := *consul.SDCheckInterval
defer func() { *consul.SDCheckInterval = prevCheckInterval }()
*consul.SDCheckInterval = time.Millisecond * 100
cw, err := newWatcher(consulSDFile.Name(), nil)
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
time.Sleep(*consul.SDCheckInterval * 2)
if len(cw.notifiers()) != 2 {
t.Fatalf("expected to get 2 notifiers; got %d", len(cw.notifiers()))
}
expAddr1 := fmt.Sprintf("https://%s/proxy/api/v2/alerts", fakeConsulService1)
expAddr2 := fmt.Sprintf("https://%s/proxy/api/v2/alerts", fakeConsulService2)
n1, n2 := cw.notifiers()[0], cw.notifiers()[1]
if n1.Addr() != expAddr1 {
t.Fatalf("exp address %q; got %q", expAddr1, n1.Addr())
}
if n2.Addr() != expAddr2 {
t.Fatalf("exp address %q; got %q", expAddr2, n2.Addr())
}
}
// TestConfigWatcherReloadConcurrent supposed to test concurrent
// execution of configuration update.
// Should be executed with -race flag
func TestConfigWatcherReloadConcurrent(t *testing.T) {
consulSDServer1 := newFakeConsulServer()
defer consulSDServer1.Close()
consulSDServer2 := newFakeConsulServer()
defer consulSDServer2.Close()
consulSDFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(consulSDFile.Name()) }()
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
consul_sd_configs:
- server: %s
services:
- alertmanager
- server: %s
services:
- consul
`, consulSDServer1.URL, consulSDServer2.URL))
staticAndConsulSDFile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.Remove(staticAndConsulSDFile.Name()) }()
writeToFile(t, staticAndConsulSDFile.Name(), fmt.Sprintf(`
static_configs:
- targets:
- localhost:9093
- localhost:9095
consul_sd_configs:
- server: %s
services:
- alertmanager
- server: %s
services:
- consul
`, consulSDServer1.URL, consulSDServer2.URL))
paths := []string{
staticAndConsulSDFile.Name(),
consulSDFile.Name(),
"testdata/static.good.yaml",
"unknownFields.bad.yaml",
}
cw, err := newWatcher(paths[0], nil)
if err != nil {
t.Fatalf("failed to start config watcher: %s", err)
}
const workers = 500
const iterations = 10
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
rnd := rand.Intn(len(paths))
_ = cw.reload(paths[rnd]) // update can fail and this is expected
_ = cw.notifiers()
}
}()
}
wg.Wait()
}
func writeToFile(t *testing.T, file, b string) {
t.Helper()
checkErr(t, ioutil.WriteFile(file, []byte(b), 0644))
}
func checkErr(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
}
const (
fakeConsulService1 = "127.0.0.1:9093"
fakeConsulService2 = "127.0.0.1:9095"
)
func newFakeConsulServer() *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc("/v1/agent/self", func(rw http.ResponseWriter, _ *http.Request) {
rw.Write([]byte(`{"Config": {"Datacenter": "dc1"}}`))
})
mux.HandleFunc("/v1/catalog/services", func(rw http.ResponseWriter, _ *http.Request) {
rw.Header().Set("X-Consul-Index", "1")
rw.Write([]byte(`{
"alertmanager": [
"alertmanager",
"__scheme__=http"
]
}`))
})
mux.HandleFunc("/v1/health/service/alertmanager", func(rw http.ResponseWriter, _ *http.Request) {
rw.Header().Set("X-Consul-Index", "1")
rw.Write([]byte(`
[
{
"Node": {
"ID": "e8e3629a-3f50-9d6e-aaf8-f173b5b05c72",
"Node": "machine",
"Address": "127.0.0.1",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "127.0.0.1",
"lan_ipv4": "127.0.0.1",
"wan": "127.0.0.1",
"wan_ipv4": "127.0.0.1"
},
"Meta": {
"consul-network-segment": ""
},
"CreateIndex": 13,
"ModifyIndex": 14
},
"Service": {
"ID": "am1",
"Service": "alertmanager",
"Tags": [
"alertmanager",
"__scheme__=http"
],
"Address": "",
"Meta": null,
"Port": 9093,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"Proxy": {
"Mode": "",
"MeshGateway": {},
"Expose": {}
},
"Connect": {},
"CreateIndex": 16,
"ModifyIndex": 16
}
},
{
"Node": {
"ID": "e8e3629a-3f50-9d6e-aaf8-f173b5b05c72",
"Node": "machine",
"Address": "127.0.0.1",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "127.0.0.1",
"lan_ipv4": "127.0.0.1",
"wan": "127.0.0.1",
"wan_ipv4": "127.0.0.1"
},
"Meta": {
"consul-network-segment": ""
},
"CreateIndex": 13,
"ModifyIndex": 14
},
"Service": {
"ID": "am2",
"Service": "alertmanager",
"Tags": [
"alertmanager",
"bad-node"
],
"Address": "",
"Meta": null,
"Port": 9095,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"Proxy": {
"Mode": "",
"MeshGateway": {},
"Expose": {}
},
"Connect": {},
"CreateIndex": 15,
"ModifyIndex": 15
}
}
]`))
})
return httptest.NewServer(mux)
}

View file

@ -1,14 +1,19 @@
package notifier
import (
"flag"
"fmt"
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
var (
configPath = flag.String("notifier.config", "", "Path to configuration file for notifiers")
suppressDuplicateTargetErrors = flag.Bool("notifier.suppressDuplicateTargetErrors", false, "Whether to suppress 'duplicate target' errors during discovery")
addrs = flagutil.NewArray("notifier.url", "Prometheus alertmanager URL, e.g. http://127.0.0.1:9093")
basicAuthUsername = flagutil.NewArray("notifier.basicAuth.username", "Optional basic auth username for -notifier.url")
basicAuthPassword = flagutil.NewArray("notifier.basicAuth.password", "Optional basic auth password for -notifier.url")
@ -22,20 +27,117 @@ var (
"By default the server name from -notifier.url is used")
)
// Init creates a Notifier object based on provided flags.
func Init(gen AlertURLGenerator) ([]Notifier, error) {
var notifiers []Notifier
for i, addr := range *addrs {
cert, key := tlsCertFile.GetOptionalArg(i), tlsKeyFile.GetOptionalArg(i)
ca, serverName := tlsCAFile.GetOptionalArg(i), tlsServerName.GetOptionalArg(i)
tr, err := utils.Transport(addr, cert, key, ca, serverName, tlsInsecureSkipVerify.GetOptionalArg(i))
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}
user, pass := basicAuthUsername.GetOptionalArg(i), basicAuthPassword.GetOptionalArg(i)
am := NewAlertManager(addr, user, pass, gen, &http.Client{Transport: tr})
notifiers = append(notifiers, am)
// cw holds a configWatcher for configPath configuration file
// configWatcher provides a list of Notifier objects discovered
// from static config or via service discovery.
// cw is not nil only if configPath is provided.
var cw *configWatcher
// Reload checks the changes in configPath configuration file
// and applies changes if any.
func Reload() error {
if cw == nil {
return nil
}
return cw.reload(*configPath)
}
var staticNotifiersFn func() []Notifier
// Init returns a function for retrieving actual list of Notifier objects.
// Init works in two mods:
// * configuration via flags (for backward compatibility). Is always static
// and don't support live reloads.
// * configuration via file. Supports live reloads and service discovery.
// Init returns an error if both mods are used.
func Init(gen AlertURLGenerator) (func() []Notifier, error) {
if *configPath == "" && len(*addrs) == 0 {
return nil, nil
}
if *configPath != "" && len(*addrs) > 0 {
return nil, fmt.Errorf("only one of -notifier.config or -notifier.url flags must be specified")
}
if len(*addrs) > 0 {
notifiers, err := notifiersFromFlags(gen)
if err != nil {
return nil, fmt.Errorf("failed to create notifier from flag values: %s", err)
}
staticNotifiersFn = func() []Notifier {
return notifiers
}
return staticNotifiersFn, nil
}
var err error
cw, err = newWatcher(*configPath, gen)
if err != nil {
return nil, fmt.Errorf("failed to init config watcher: %s", err)
}
return cw.notifiers, nil
}
func notifiersFromFlags(gen AlertURLGenerator) ([]Notifier, error) {
var notifiers []Notifier
for i, addr := range *addrs {
authCfg := promauth.HTTPClientConfig{
TLSConfig: &promauth.TLSConfig{
CAFile: tlsCAFile.GetOptionalArg(i),
CertFile: tlsCertFile.GetOptionalArg(i),
KeyFile: tlsKeyFile.GetOptionalArg(i),
ServerName: tlsServerName.GetOptionalArg(i),
InsecureSkipVerify: tlsInsecureSkipVerify.GetOptionalArg(i),
},
BasicAuth: &promauth.BasicAuthConfig{
Username: basicAuthUsername.GetOptionalArg(i),
Password: promauth.NewSecret(basicAuthPassword.GetOptionalArg(i)),
},
}
am, err := NewAlertManager(addr+alertManagerPath, gen, authCfg, time.Minute)
if err != nil {
return nil, err
}
notifiers = append(notifiers, am)
}
return notifiers, nil
}
// Target represents a Notifier and optional
// list of labels added during discovery.
type Target struct {
Notifier
Labels []prompbmarshal.Label
}
// TargetType defines how the Target was discovered
type TargetType string
const (
// TargetStatic is for targets configured statically
TargetStatic TargetType = "static"
// TargetConsul is for targets discovered via Consul
TargetConsul TargetType = "consulSD"
)
// GetTargets returns list of static or discovered targets
// via notifier configuration.
func GetTargets() map[TargetType][]Target {
var targets = make(map[TargetType][]Target)
if staticNotifiersFn != nil {
for _, ns := range staticNotifiersFn() {
targets[TargetStatic] = append(targets[TargetStatic], Target{
Notifier: ns,
})
}
}
if cw != nil {
cw.targetsMu.RLock()
for key, ns := range cw.targets {
targets[key] = append(targets[key], ns...)
}
cw.targetsMu.RUnlock()
}
return targets
}

View file

@ -10,4 +10,6 @@ type Notifier interface {
Send(ctx context.Context, alerts []Alert) error
// Addr returns address where alerts are sent.
Addr() string
// Close is a destructor for the Notifier
Close()
}

View file

@ -0,0 +1,13 @@
consul_sd_configs:
- server: localhost:8500
scheme: http
services:
- alertmanager
- server: localhost:8500
services:
- consul
relabel_configs:
- source_labels: [__meta_consul_tags]
regex: .*,__scheme__=([^,]+),.*
replacement: '${1}'
target_label: __scheme__

View file

@ -0,0 +1,18 @@
static_configs:
- targets:
- localhost:9093
- localhost:9095
consul_sd_configs:
- server: localhost:8500
scheme: http
services:
- alertmanager
- server: localhost:8500
services:
- consul
relabel_configs:
- source_labels: [__meta_consul_tags]
regex: .*,__scheme__=([^,]+),.*
replacement: '${1}'
target_label: __scheme__

View file

@ -0,0 +1,4 @@
static_configs:
- targets:
- localhost:9093
- localhost:9095

View file

@ -0,0 +1,5 @@
scheme: https
unknown: field
static_configs:
- targets:
- localhost:9093

View file

@ -10,8 +10,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
// RecordingRule is a Rule that supposed
@ -43,8 +43,8 @@ type RecordingRule struct {
}
type recordingRuleMetrics struct {
errors *gauge
samples *gauge
errors *utils.Gauge
samples *utils.Gauge
}
// String implements Stringer interface
@ -75,7 +75,7 @@ func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
}
labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID())
rr.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels),
rr.metrics.errors = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels),
func() float64 {
rr.mu.RLock()
defer rr.mu.RUnlock()
@ -84,7 +84,7 @@ func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
}
return 1
})
rr.metrics.samples = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_last_evaluation_samples{%s}`, labels),
rr.metrics.samples = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_last_evaluation_samples{%s}`, labels),
func() float64 {
rr.mu.RLock()
defer rr.mu.RUnlock()
@ -95,8 +95,8 @@ func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
// Close unregisters rule metrics
func (rr *RecordingRule) Close() {
metrics.UnregisterMetric(rr.metrics.errors.name)
metrics.UnregisterMetric(rr.metrics.samples.name)
rr.metrics.errors.Unregister()
rr.metrics.samples.Unregister()
}
// ExecRange executes recording rule on the given time range similarly to Exec.

View file

@ -0,0 +1,54 @@
package utils
import "github.com/VictoriaMetrics/metrics"
type namedMetric struct {
Name string
}
// Unregister removes the metric by name from default registry
func (nm namedMetric) Unregister() {
metrics.UnregisterMetric(nm.Name)
}
// Gauge is a metrics.Gauge with Name
type Gauge struct {
namedMetric
*metrics.Gauge
}
// GetOrCreateGauge creates a new Gauge with the given name
func GetOrCreateGauge(name string, f func() float64) *Gauge {
return &Gauge{
namedMetric: namedMetric{Name: name},
Gauge: metrics.GetOrCreateGauge(name, f),
}
}
// Counter is a metrics.Counter with Name
type Counter struct {
namedMetric
*metrics.Counter
}
// GetOrCreateCounter creates a new Counter with the given name
func GetOrCreateCounter(name string) *Counter {
return &Counter{
namedMetric: namedMetric{Name: name},
Counter: metrics.GetOrCreateCounter(name),
}
}
// Summary is a metrics.Summary with Name
type Summary struct {
namedMetric
*metrics.Summary
}
// GetOrCreateSummary creates a new Summary with the given name
func GetOrCreateSummary(name string) *Summary {
return &Summary{
namedMetric: namedMetric{Name: name},
Summary: metrics.GetOrCreateSummary(name),
}
}

View file

@ -10,6 +10,7 @@ import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/tpl"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -33,9 +34,10 @@ func initLinks() {
{path.Join(pathPrefix, "-/reload"), "reload configuration"},
}
navItems = []tpl.NavItem{
{Name: "vmalert", Url: pathPrefix},
{Name: "vmalert", Url: path.Join(pathPrefix, "/")},
{Name: "Groups", Url: path.Join(pathPrefix, "groups")},
{Name: "Alerts", Url: path.Join(pathPrefix, "alerts")},
{Name: "Notifiers", Url: path.Join(pathPrefix, "notifiers")},
{Name: "Docs", Url: "https://docs.victoriametrics.com/vmalert.html"},
}
}
@ -62,6 +64,9 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
case "/groups":
WriteListGroups(w, rh.groups())
return true
case "/notifiers":
WriteListTargets(w, notifier.GetTargets())
return true
case "/api/v1/groups":
data, err := rh.listGroups()
if err != nil {

View file

@ -5,6 +5,7 @@
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/tpl"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
) %}
@ -205,6 +206,62 @@
{% endfunc %}
{% func ListTargets(targets map[notifier.TargetType][]notifier.Target) %}
{%= tpl.Header("Notifiers", navItems) %}
{% if len(targets) > 0 %}
<a class="btn btn-primary" role="button" onclick="collapseAll()">Collapse All</a>
<a class="btn btn-primary" role="button" onclick="expandAll()">Expand All</a>
{%code
var keys []string
for key := range targets {
keys = append(keys, string(key))
}
sort.Strings(keys)
%}
{% for i := range keys %}
{%code typeK, ns := keys[i], targets[notifier.TargetType(keys[i])]
count := len(ns)
%}
<div class="group-heading data-bs-target="rules-{%s typeK %}">
<span class="anchor" id="notifiers-{%s typeK %}"></span>
<a href="#notifiers-{%s typeK %}">{%s typeK %} ({%d count %})</a>
</div>
<div class="collapse show" id="notifiers-{%s typeK %}">
<table class="table table-striped table-hover table-sm">
<thead>
<tr>
<th scope="col">Labels</th>
<th scope="col">Address</th>
</tr>
</thead>
<tbody>
{% for _, n := range ns %}
<tr>
<td>
{% for _, l := range n.Labels %}
<span class="ms-1 badge bg-primary">{%s l.Name %}={%s l.Value %}</span>
{% endfor %}
</td>
<td>{%s n.Notifier.Addr() %}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endfor %}
{% else %}
<div>
<p>No items...</p>
</div>
{% endif %}
{%= tpl.Footer() %}
{% endfunc %}
{% func Alert(alert *APIAlert) %}
{%= tpl.Header("", navItems) %}
{%code

File diff suppressed because it is too large Load diff