From 8c8ff5d0cb181eea8b0051b71b411dc72fb93a3f Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Sun, 10 May 2020 17:58:17 +0100 Subject: [PATCH] vmalert: cleanup and restructure of code to improve maintainability (#471) The change introduces new entity `manager` which replaces `watchdog`, decouples requestHandler and groups. Manager supposed to control life cycle of groups, rules and config reloads. Groups export an ID method which returns a hash from filename and group name. ID supposed to be unique identifier across all loaded groups. Some tests were added to improve coverage. Bug with wrong annotation value if $value is used in templates after metrics being restored fixed. Notifier interface was extended to accept context. New set of metrics was introduced for config reload. --- app/vmalert/README.md | 4 + app/vmalert/config.go | 27 +-- app/vmalert/group.go | 165 ++++++++++++++ app/vmalert/group_test.go | 203 ++++++++++++++++++ app/vmalert/main.go | 185 ++++------------ app/vmalert/manager.go | 109 ++++++++++ app/vmalert/manager_test.go | 152 +++++++++++++ app/vmalert/notifier/alert.go | 2 +- app/vmalert/notifier/alertmanager.go | 15 +- .../notifier/alertmanager_request.qtpl | 2 +- .../notifier/alertmanager_request.qtpl.go | 112 +++++----- app/vmalert/notifier/alertmanager_test.go | 15 +- app/vmalert/notifier/notifier.go | 4 +- app/vmalert/rule.go | 97 +++------ app/vmalert/rule_test.go | 98 ++------- app/vmalert/testdata/rules1-good.rules | 11 + app/vmalert/web.go | 142 ++++-------- app/vmalert/web_test.go | 126 ++--------- 18 files changed, 876 insertions(+), 593 deletions(-) create mode 100644 app/vmalert/group.go create mode 100644 app/vmalert/group_test.go create mode 100644 app/vmalert/manager.go create mode 100644 app/vmalert/manager_test.go create mode 100644 app/vmalert/testdata/rules1-good.rules diff --git a/app/vmalert/README.md b/app/vmalert/README.md index d949d5497..f4dc3bc1c 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -48,6 +48,7 @@ Rules in group evaluated one-by-one sequentially. * `http:///api/v1///status" ` - get alert status by ID. Used as alert source in AlertManager. * `http:///metrics` - application metrics. +* `http:///-/reload` - hot configuration reload. `vmalert` may be configured with `-remotewrite` flag to write alerts state in form of timeseries via remote write protocol. Alerts state will be written as `ALERTS` timeseries. These timeseries @@ -109,6 +110,9 @@ Usage of vmalert: Pass `-help` to `vmalert` in order to see the full list of supported command-line flags with their descriptions. +To reload configuration without `vmalert` restart send SIGHUP signal +or send GET request to `/-/reload` endpoint. + ### Contributing `vmalert` is mostly designed and built by VictoriaMetrics community. diff --git a/app/vmalert/config.go b/app/vmalert/config.go index 59a10c187..e46c0c497 100644 --- a/app/vmalert/config.go +++ b/app/vmalert/config.go @@ -27,29 +27,32 @@ func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) { if err != nil { return nil, fmt.Errorf("file %s: %w", file, err) } - for _, group := range gr { - if _, ok := groupsNames[group.Name]; ok { - return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", file, group.Name) + for _, g := range gr { + if _, ok := groupsNames[g.Name]; ok { + return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", g.Name, file) } - groupsNames[group.Name] = struct{}{} - for _, rule := range group.Rules { + g.File = file + g.done = make(chan struct{}) + g.finished = make(chan struct{}) + + groupsNames[g.Name] = struct{}{} + for _, rule := range g.Rules { if err = rule.Validate(); err != nil { - return nil, fmt.Errorf("invalid rule filepath:%s, group %s:%w", file, group.Name, err) + return nil, fmt.Errorf("invalid rule filepath: %s, group %s: %w", file, g.Name, err) } - // TODO: this init looks weird here - rule.alerts = make(map[uint64]*notifier.Alert) if validateAnnotations { if err = notifier.ValidateTemplates(rule.Annotations); err != nil { - return nil, fmt.Errorf("invalid annotations filepath:%s, group %s:%w", file, group.Name, err) + return nil, fmt.Errorf("invalid annotations filepath: %s, group %s: %w", file, g.Name, err) } if err = notifier.ValidateTemplates(rule.Labels); err != nil { - return nil, fmt.Errorf("invalid labels filepath:%s, group %s:%w", file, group.Name, err) + return nil, fmt.Errorf("invalid labels filepath: %s, group %s: %w", file, g.Name, err) } } - rule.group = group + rule.group = g + rule.alerts = make(map[uint64]*notifier.Alert) } + groups = append(groups, g) } - groups = append(groups, gr...) } if len(groups) < 1 { return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";")) diff --git a/app/vmalert/group.go b/app/vmalert/group.go new file mode 100644 index 000000000..f5e0f0f0e --- /dev/null +++ b/app/vmalert/group.go @@ -0,0 +1,165 @@ +package main + +import ( + "context" + "fmt" + "hash/fnv" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" +) + +// Group is an entity for grouping rules +type Group struct { + Name string + File string + Rules []*Rule + + done chan struct{} + finished chan struct{} +} + +// ID return unique group ID that consists of +// rules file and group name +func (g Group) ID() uint64 { + hash := fnv.New64a() + hash.Write([]byte(g.File)) + hash.Write([]byte("\xff")) + hash.Write([]byte(g.Name)) + return hash.Sum64() +} + +// Restore restores alerts state for all group rules with For > 0 +func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error { + for _, rule := range g.Rules { + if rule.For == 0 { + return nil + } + if err := rule.Restore(ctx, q, lookback); err != nil { + return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err) + } + } + return nil +} + +// updateWith updates existing group with +// passed group object. +func (g *Group) updateWith(newGroup Group) { + rulesRegistry := make(map[string]*Rule) + for _, nr := range newGroup.Rules { + rulesRegistry[nr.id()] = nr + } + + for i, or := range g.Rules { + nr, ok := rulesRegistry[or.id()] + if !ok { + // old rule is not present in the new list + // and must be removed + or = nil + g.Rules = append(g.Rules[:i], g.Rules[i+1:]...) + continue + } + + // copy all significant fields. + // alerts state isn't copied since + // it should be updated in next 2 Evals + or.For = nr.For + or.Expr = nr.Expr + or.Labels = nr.Labels + or.Annotations = nr.Annotations + delete(rulesRegistry, nr.id()) + } + + for _, nr := range rulesRegistry { + g.Rules = append(g.Rules, nr) + } +} + +var ( + iterationTotal = metrics.NewCounter(`vmalert_iteration_total`) + iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`) + + execTotal = metrics.NewCounter(`vmalert_execution_total`) + execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) + execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) + + alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`) + alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) + alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`) + + remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`) + remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) +) + +func (g *Group) close() { + if g.done == nil { + return + } + close(g.done) + <-g.finished +} + +func (g *Group) start(ctx context.Context, interval time.Duration, + querier datasource.Querier, nr notifier.Notifier, rw *remotewrite.Client) { + logger.Infof("group %q started", g.Name) + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + logger.Infof("group %q: context cancelled", g.Name) + close(g.finished) + return + case <-g.done: + logger.Infof("group %q: received stop signal", g.Name) + close(g.finished) + return + case <-t.C: + iterationTotal.Inc() + iterationStart := time.Now() + for _, rule := range g.Rules { + execTotal.Inc() + + execStart := time.Now() + err := rule.Exec(ctx, querier) + execDuration.UpdateDuration(execStart) + + if err != nil { + execErrors.Inc() + logger.Errorf("failed to execute rule %q.%q: %s", g.Name, rule.Name, err) + continue + } + + var alertsToSend []notifier.Alert + for _, a := range rule.alerts { + if a.State != notifier.StatePending { + alertsToSend = append(alertsToSend, *a) + } + if a.State == notifier.StateInactive || rw == nil { + continue + } + tss := rule.AlertToTimeSeries(a, execStart) + for _, ts := range tss { + remoteWriteSent.Inc() + if err := rw.Push(ts); err != nil { + remoteWriteErrors.Inc() + logger.Errorf("failed to push timeseries to remotewrite: %s", err) + } + } + } + if len(alertsToSend) > 0 { + alertsSent.Add(len(alertsToSend)) + if err := nr.Send(ctx, alertsToSend); err != nil { + alertsSendErrors.Inc() + logger.Errorf("failed to send alert for rule %q.%q: %s", g.Name, rule.Name, err) + } + } + } + iterationDuration.UpdateDuration(iterationStart) + } + } +} diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go new file mode 100644 index 000000000..bfab6aa92 --- /dev/null +++ b/app/vmalert/group_test.go @@ -0,0 +1,203 @@ +package main + +import ( + "context" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" +) + +func TestUpdateWith(t *testing.T) { + testCases := []struct { + name string + currentRules []*Rule + newRules []*Rule + }{ + { + "new rule", + []*Rule{}, + []*Rule{{Name: "bar"}}, + }, + { + "update rule", + []*Rule{{ + Name: "foo", + Expr: "up > 0", + For: time.Second, + Labels: map[string]string{ + "bar": "baz", + }, + Annotations: map[string]string{ + "summary": "{{ $value|humanize }}", + "description": "{{$labels}}", + }, + }}, + []*Rule{{ + Name: "bar", + Expr: "up > 10", + For: time.Second, + Labels: map[string]string{ + "baz": "bar", + }, + Annotations: map[string]string{ + "summary": "none", + }, + }}, + }, + { + "empty rule", + []*Rule{{Name: "foo"}}, + []*Rule{}, + }, + { + "multiple rules", + []*Rule{{Name: "foo"}, {Name: "bar"}, {Name: "baz"}}, + []*Rule{{Name: "foo"}, {Name: "baz"}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + g := &Group{Rules: tc.currentRules} + g.updateWith(Group{Rules: tc.newRules}) + + if len(g.Rules) != len(tc.newRules) { + t.Fatalf("expected to have %d rules; got: %d", + len(g.Rules), len(tc.newRules)) + } + for i, r := range g.Rules { + got, want := r, tc.newRules[i] + if got.Name != want.Name { + t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name) + } + if got.Expr != want.Expr { + t.Fatalf("expected to have expression %q; got %q", want.Expr, got.Expr) + } + if got.For != want.For { + t.Fatalf("expected to have for %q; got %q", want.For, got.For) + } + if !reflect.DeepEqual(got.Annotations, want.Annotations) { + t.Fatalf("expected to have annotations %#v; got %#v", want.Annotations, got.Annotations) + } + if !reflect.DeepEqual(got.Labels, want.Labels) { + t.Fatalf("expected to have labels %#v; got %#v", want.Labels, got.Labels) + } + } + }) + } +} + +func TestGroupStart(t *testing.T) { + // TODO: make parsing from string instead of file + groups, err := Parse([]string{"testdata/rules1-good.rules"}, true) + if err != nil { + t.Fatalf("failed to parse rules: %s", err) + } + g := groups[0] + + fn := &fakeNotifier{} + fs := &fakeQuerier{} + + const inst1, inst2, job = "foo", "bar", "baz" + m1 := metricWithLabels(t, "instance", inst1, "job", job) + m2 := metricWithLabels(t, "instance", inst2, "job", job) + + r := g.Rules[0] + alert1, err := r.newAlert(m1) + if err != nil { + t.Fatalf("faield to create alert: %s", err) + } + alert1.State = notifier.StateFiring + alert1.ID = hash(m1) + + alert2, err := r.newAlert(m2) + if err != nil { + t.Fatalf("faield to create alert: %s", err) + } + alert2.State = notifier.StateFiring + alert2.ID = hash(m2) + + const evalInterval = time.Millisecond + finished := make(chan struct{}) + fs.add(m1) + fs.add(m2) + go func() { + g.start(context.Background(), evalInterval, fs, fn, nil) + close(finished) + }() + + // wait for multiple evals + time.Sleep(20 * evalInterval) + + gotAlerts := fn.getAlerts() + expectedAlerts := []notifier.Alert{*alert1, *alert2} + compareAlerts(t, expectedAlerts, gotAlerts) + + // reset previous data + fs.reset() + // and set only one datapoint for response + fs.add(m1) + + // wait for multiple evals + time.Sleep(20 * evalInterval) + + gotAlerts = fn.getAlerts() + expectedAlerts = []notifier.Alert{*alert1} + compareAlerts(t, expectedAlerts, gotAlerts) + + g.close() + <-finished +} + +func compareAlerts(t *testing.T, as, bs []notifier.Alert) { + t.Helper() + if len(as) != len(bs) { + t.Fatalf("expected to have length %d; got %d", len(as), len(bs)) + } + sort.Slice(as, func(i, j int) bool { + return as[i].ID < as[j].ID + }) + sort.Slice(bs, func(i, j int) bool { + return bs[i].ID < bs[j].ID + }) + for i := range as { + a, b := as[i], bs[i] + if a.Name != b.Name { + t.Fatalf("expected t have Name %q; got %q", a.Name, b.Name) + } + if a.State != b.State { + t.Fatalf("expected t have State %q; got %q", a.State, b.State) + } + if a.Value != b.Value { + t.Fatalf("expected t have Value %f; got %f", a.Value, b.Value) + } + if !reflect.DeepEqual(a.Annotations, b.Annotations) { + t.Fatalf("expected to have annotations %#v; got %#v", a.Annotations, b.Annotations) + } + if !reflect.DeepEqual(a.Labels, b.Labels) { + t.Fatalf("expected to have labels %#v; got %#v", a.Labels, b.Labels) + } + } +} + +type fakeNotifier struct { + sync.Mutex + alerts []notifier.Alert +} + +func (fn *fakeNotifier) Send(_ context.Context, alerts []notifier.Alert) error { + fn.Lock() + defer fn.Unlock() + fn.alerts = alerts + return nil +} + +func (fn *fakeNotifier) getAlerts() []notifier.Alert { + fn.Lock() + defer fn.Unlock() + return fn.alerts +} diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 853bfb6ce..7960bf04e 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -8,7 +8,6 @@ import ( "net/url" "os" "strings" - "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" @@ -64,23 +63,17 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) eu, err := getExternalURL(*externalURL, *httpListenAddr, httpserver.IsTLS()) if err != nil { - logger.Fatalf("can not get external url:%s ", err) + logger.Fatalf("can not get external url: %s ", err) } notifier.InitTemplateFunc(eu) - logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";")) - groups, err := readRules() - if err != nil { - logger.Fatalf("cannot parse configuration file: %s", err) - } - - w := &watchdog{ + manager := &manager{ + groups: make(map[uint64]*Group), storage: datasource.NewVMStorage(*datasourceURL, *basicAuthUsername, *basicAuthPassword, &http.Client{}), - alertProvider: notifier.NewAlertManager(*notifierURL, func(group, name string) string { - return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, name) + notifier: notifier.NewAlertManager(*notifierURL, func(group, alert string) string { + return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, alert) }, &http.Client{}), } - if *remoteWriteURL != "" { c, err := remotewrite.NewClient(ctx, remotewrite.Config{ Addr: *remoteWriteURL, @@ -91,26 +84,38 @@ func main() { if err != nil { logger.Fatalf("failed to init remotewrite client: %s", err) } - w.rw = c + manager.rw = c } - - var restoreDS *datasource.VMStorage if *remoteReadURL != "" { - restoreDS = datasource.NewVMStorage(*remoteReadURL, *remoteReadUsername, *remoteReadPassword, &http.Client{}) + manager.rr = datasource.NewVMStorage(*remoteReadURL, *remoteReadUsername, *remoteReadPassword, &http.Client{}) } - wg := sync.WaitGroup{} + if err := manager.start(ctx, *rulePath, *validateTemplates); err != nil { + logger.Fatalf("failed to start: %s", err) + } - groupUpdateStorage := startInitGroups(ctx, w, restoreDS, groups, &wg) - - rh := &requestHandler{groups: groups, mu: sync.RWMutex{}} - - //run config updater - wg.Add(1) - sigHup := procutil.NewSighupChan() - - go rh.runConfigUpdater(ctx, sigHup, groupUpdateStorage, w, &wg) + go func() { + // init reload metrics with positive values to improve alerting conditions + configSuccess.Set(1) + configTimestamp.Set(uint64(time.Now().UnixNano()) / 1e9) + sigHup := procutil.NewSighupChan() + for { + <-sigHup + configReloads.Inc() + logger.Infof("SIGHUP received. Going to reload rules %q ...", *rulePath) + if err := manager.update(ctx, *rulePath, *validateTemplates, false); err != nil { + configReloadErrors.Inc() + configSuccess.Set(0) + logger.Errorf("error while reloading rules: %s", err) + continue + } + configSuccess.Set(1) + configTimestamp.Set(uint64(time.Now().UnixNano()) / 1e9) + logger.Infof("Rules reloaded successfully from %q", *rulePath) + } + }() + rh := &requestHandler{m: manager} go httpserver.Serve(*httpListenAddr, (rh).handler) sig := procutil.WaitForSigterm() @@ -119,106 +124,16 @@ func main() { logger.Fatalf("cannot stop the webservice: %s", err) } cancel() - if w.rw != nil { - err := w.rw.Close() - if err != nil { - logger.Fatalf("cannot stop the remotewrite: %s", err) - } - } - wg.Wait() -} - -type watchdog struct { - storage *datasource.VMStorage - alertProvider notifier.Notifier - rw *remotewrite.Client + manager.close() } var ( - iterationTotal = metrics.NewCounter(`vmalert_iteration_total`) - iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`) - - execTotal = metrics.NewCounter(`vmalert_execution_total`) - execErrors = metrics.NewCounter(`vmalert_execution_errors_total`) - execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`) - - alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`) - alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) - alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`) - - remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`) - remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) - - configReloadTotal = metrics.NewCounter(`vmalert_config_reload_total`) - configReloadOkTotal = metrics.NewCounter(`vmalert_config_reload_ok_total`) - configReloadErrorTotal = metrics.NewCounter(`vmalert_config_reload_error_total`) + configReloads = metrics.NewCounter(`vmalert_config_last_reload_total`) + configReloadErrors = metrics.NewCounter(`vmalert_config_last_reload_errors_total`) + configSuccess = metrics.NewCounter(`vmalert_config_last_reload_successful`) + configTimestamp = metrics.NewCounter(`vmalert_config_last_reload_success_timestamp_seconds`) ) -func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration, groupUpdate chan Group) { - logger.Infof("watchdog for %s has been started", group.Name) - t := time.NewTicker(evaluationInterval) - defer t.Stop() - for { - - select { - case newGroup := <-groupUpdate: - if newGroup.Rules == nil || len(newGroup.Rules) == 0 { - //empty rules for group - //need to exit - logger.Infof("stopping group: %s, it contains 0 rules now", group.Name) - return - } - logger.Infof("new group update received, group: %s", group.Name) - group.Update(newGroup) - logger.Infof("group was reconciled, group: %s", group.Name) - - case <-t.C: - iterationTotal.Inc() - iterationStart := time.Now() - for _, rule := range group.Rules { - execTotal.Inc() - - execStart := time.Now() - err := rule.Exec(ctx, w.storage) - execDuration.UpdateDuration(execStart) - - if err != nil { - execErrors.Inc() - logger.Errorf("failed to execute rule %q.%q: %s", group.Name, rule.Name, err) - continue - } - - var alertsToSend []notifier.Alert - for _, a := range rule.alerts { - if a.State != notifier.StatePending { - alertsToSend = append(alertsToSend, *a) - } - if a.State == notifier.StateInactive || w.rw == nil { - continue - } - tss := rule.AlertToTimeSeries(a, execStart) - for _, ts := range tss { - remoteWriteSent.Inc() - if err := w.rw.Push(ts); err != nil { - remoteWriteErrors.Inc() - logger.Errorf("failed to push timeseries to remotewrite: %s", err) - } - } - } - alertsSent.Add(len(alertsToSend)) - if err := w.alertProvider.Send(alertsToSend); err != nil { - alertsSendErrors.Inc() - logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err) - } - } - iterationDuration.UpdateDuration(iterationStart) - case <-ctx.Done(): - logger.Infof("%s received stop signal", group.Name) - return - } - } -} - func getExternalURL(externalURL, httpListenAddr string, isSecure bool) (*url.URL, error) { if externalURL != "" { return url.Parse(externalURL) @@ -248,31 +163,3 @@ func checkFlags() { logger.Fatalf("datasource.url is empty") } } - -func startInitGroups(ctx context.Context, w *watchdog, restoreDS *datasource.VMStorage, groups []Group, wg *sync.WaitGroup) map[string]chan Group { - groupUpdateStorage := map[string]chan Group{} - for _, g := range groups { - if restoreDS != nil { - err := g.Restore(ctx, restoreDS, *remoteReadLookBack) - if err != nil { - logger.Errorf("error while restoring state for group %q: %s", g.Name, err) - } - } - - groupUpdateChan := make(chan Group, 1) - groupUpdateStorage[g.Name] = groupUpdateChan - - wg.Add(1) - go func(group Group) { - w.run(ctx, group, *evaluationInterval, groupUpdateChan) - wg.Done() - }(g) - } - return groupUpdateStorage -} - -//wrapper -func readRules() ([]Group, error) { - return Parse(*rulePath, *validateTemplates) - -} diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go new file mode 100644 index 000000000..92e093a2c --- /dev/null +++ b/app/vmalert/manager.go @@ -0,0 +1,109 @@ +package main + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +type manager struct { + storage datasource.Querier + notifier notifier.Notifier + + rw *remotewrite.Client + rr datasource.Querier + + wg sync.WaitGroup + + groupsMu sync.RWMutex + groups map[uint64]*Group +} + +// AlertAPI generates APIAlert object from alert by its id(hash) +func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { + m.groupsMu.RLock() + defer m.groupsMu.RUnlock() + + g, ok := m.groups[gID] + if !ok { + return nil, fmt.Errorf("can't find group with id %q", gID) + } + for _, rule := range g.Rules { + if apiAlert := rule.AlertAPI(aID); apiAlert != nil { + return apiAlert, nil + } + } + return nil, fmt.Errorf("can't func alert with id %q in group %q", aID, g.Name) +} + +func (m *manager) start(ctx context.Context, path []string, validate bool) error { + return m.update(ctx, path, validate, true) +} + +func (m *manager) close() { + if m.rw != nil { + err := m.rw.Close() + if err != nil { + logger.Fatalf("cannot stop the remotewrite: %s", err) + } + } + m.wg.Wait() +} + +func (m *manager) startGroup(ctx context.Context, group Group, restore bool) { + if restore { + err := group.Restore(ctx, m.rr, *remoteReadLookBack) + if err != nil { + logger.Errorf("error while restoring state for group %q: %s", group.Name, err) + } + } + + m.wg.Add(1) + id := group.ID() + go func() { + group.start(ctx, *evaluationInterval, m.storage, m.notifier, m.rw) + m.wg.Done() + }() + m.groups[id] = &group +} + +func (m *manager) update(ctx context.Context, path []string, validate, restore bool) error { + logger.Infof("reading alert rules configuration file from %q", strings.Join(path, ";")) + newGroups, err := Parse(path, validate) + if err != nil { + return fmt.Errorf("cannot parse configuration file: %s", err) + } + + groupsRegistry := make(map[uint64]Group) + for _, ng := range newGroups { + groupsRegistry[ng.ID()] = ng + } + + m.groupsMu.Lock() + for _, og := range m.groups { + id := og.ID() + ng, ok := groupsRegistry[id] + if !ok { + // old group is not present in new list + // and must be stopped and deleted + og.close() + delete(m.groups, og.ID()) + og = nil + continue + } + og.updateWith(ng) + delete(groupsRegistry, ng.ID()) + } + + for _, ng := range groupsRegistry { + m.startGroup(ctx, ng, restore) + } + m.groupsMu.Unlock() + return nil +} diff --git a/app/vmalert/manager_test.go b/app/vmalert/manager_test.go new file mode 100644 index 000000000..afe30d5b7 --- /dev/null +++ b/app/vmalert/manager_test.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + "math/rand" + "strings" + "sync" + "testing" + "time" +) + +func TestManagerUpdateError(t *testing.T) { + m := &manager{groups: make(map[uint64]*Group)} + path := []string{"foo/bar"} + err := m.update(context.Background(), path, true, false) + if err == nil { + t.Fatalf("expected to have err; got nil instead") + } + expErr := "no groups found" + if !strings.Contains(err.Error(), expErr) { + t.Fatalf("expected to got err %s; got %s", expErr, err) + } +} + +// TestManagerUpdateConcurrent supposed to test concurrent +// execution of configuration update. +// Should be executed with -race flag +func TestManagerUpdateConcurrent(t *testing.T) { + m := &manager{groups: make(map[uint64]*Group)} + paths := []string{ + "testdata/dir/rules0-good.rules", + "testdata/dir/rules1-good.rules", + "testdata/rules0-good.rules", + } + + const n = 500 + wg := sync.WaitGroup{} + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + rnd := rand.Intn(len(paths)) + path := []string{paths[rnd]} + err := m.update(context.Background(), path, true, false) + if err != nil { + t.Errorf("update error: %s", err) + } + }() + } + wg.Wait() +} + +// TestManagerUpdate tests sequential configuration +// updates. +func TestManagerUpdate(t *testing.T) { + testCases := []struct { + name string + initPath string + updatePath string + want []*Group + }{ + { + name: "update good rules", + initPath: "testdata/rules0-good.rules", + updatePath: "testdata/dir/rules1-good.rules", + want: []*Group{ + { + File: "testdata/dir/rules1-good.rules", + Name: "duplicatedGroupDiffFiles", + Rules: []*Rule{newTestRule("VMRows", time.Second*10)}, + }, + }, + }, + { + name: "update good rules from 1 to 2 groups", + initPath: "testdata/dir/rules1-good.rules", + updatePath: "testdata/rules0-good.rules", + want: []*Group{ + { + File: "testdata/rules0-good.rules", + Name: "groupGorSingleAlert", Rules: []*Rule{ + newTestRule("VMRows", time.Second*10), + }}, + { + File: "testdata/rules0-good.rules", + Name: "TestGroup", Rules: []*Rule{ + newTestRule("Conns", time.Duration(0)), + newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), + }}, + }, + }, + { + name: "update with one bad rule file", + initPath: "testdata/rules0-good.rules", + updatePath: "testdata/dir/rules2-bad.rules", + want: []*Group{ + { + File: "testdata/rules0-good.rules", + Name: "groupGorSingleAlert", Rules: []*Rule{ + newTestRule("VMRows", time.Second*10), + }}, + { + File: "testdata/rules0-good.rules", + Name: "TestGroup", Rules: []*Rule{ + newTestRule("Conns", time.Duration(0)), + newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), + }}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + m := &manager{groups: make(map[uint64]*Group)} + path := []string{tc.initPath} + if err := m.update(ctx, path, true, false); err != nil { + t.Fatalf("failed to complete initial rules update: %s", err) + } + + path = []string{tc.updatePath} + _ = m.update(ctx, path, true, false) + if len(tc.want) != len(m.groups) { + t.Fatalf("\nwant number of groups: %d;\ngot: %d ", len(tc.want), len(m.groups)) + } + + for _, wantG := range tc.want { + gotG, ok := m.groups[wantG.ID()] + if !ok { + t.Fatalf("expected to have group %q", wantG.Name) + } + compareGroups(t, gotG, wantG) + } + + cancel() + m.close() + }) + } +} + +func compareGroups(t *testing.T, a, b *Group) { + t.Helper() + if len(a.Rules) != len(b.Rules) { + t.Fatalf("expected group %s to have %d rules; got: %d", + a.Name, len(a.Rules), len(b.Rules)) + } + for i, r := range a.Rules { + got, want := r, b.Rules[i] + if got.Name != want.Name { + t.Fatalf("expected to have rule %q; got %q", want.Name, got.Name) + } + } +} diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 3a2b5de62..b883aa193 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -12,7 +12,7 @@ import ( // Alert the triggered alert // TODO: Looks like alert name isn't unique type Alert struct { - Group string + GroupID uint64 Name string Labels map[string]string Annotations map[string]string diff --git a/app/vmalert/notifier/alertmanager.go b/app/vmalert/notifier/alertmanager.go index ab1ba6cc6..9a970454c 100644 --- a/app/vmalert/notifier/alertmanager.go +++ b/app/vmalert/notifier/alertmanager.go @@ -2,6 +2,7 @@ package notifier import ( "bytes" + "context" "fmt" "io/ioutil" "net/http" @@ -17,13 +18,21 @@ type AlertManager struct { } // Send an alert or resolve message -func (am *AlertManager) Send(alerts []Alert) error { +func (am *AlertManager) Send(ctx context.Context, alerts []Alert) error { b := &bytes.Buffer{} writeamRequest(b, alerts, am.argFunc) - resp, err := am.client.Post(am.alertURL, "application/json", b) + + req, err := http.NewRequest("POST", am.alertURL, b) if err != nil { return err } + req.Header.Set("Content-Type", "application/json") + req = req.WithContext(ctx) + resp, err := am.client.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { @@ -37,7 +46,7 @@ func (am *AlertManager) Send(alerts []Alert) error { } // AlertURLGenerator returns URL to single alert by given name -type AlertURLGenerator func(group, id string) string +type AlertURLGenerator func(group, alert string) string const alertManagerPath = "/api/v2/alerts" diff --git a/app/vmalert/notifier/alertmanager_request.qtpl b/app/vmalert/notifier/alertmanager_request.qtpl index 1c64f1f37..249523948 100644 --- a/app/vmalert/notifier/alertmanager_request.qtpl +++ b/app/vmalert/notifier/alertmanager_request.qtpl @@ -9,7 +9,7 @@ {% for i, alert := range alerts %} { "startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %}, - "generatorURL": {%q= generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10)) %}, + "generatorURL": {%q= generatorURL(strconv.FormatUint(alert.GroupID, 10), strconv.FormatUint(alert.ID, 10)) %}, {% if !alert.End.IsZero() %} "endsAt":{%q= alert.End.Format(time.RFC3339Nano) %}, {% endif %} diff --git a/app/vmalert/notifier/alertmanager_request.qtpl.go b/app/vmalert/notifier/alertmanager_request.qtpl.go index 26aebf584..188fc3f37 100644 --- a/app/vmalert/notifier/alertmanager_request.qtpl.go +++ b/app/vmalert/notifier/alertmanager_request.qtpl.go @@ -1,131 +1,131 @@ // Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT. // See https://github.com/valyala/quicktemplate for details. -//line app/vmalert/notifier/alertmanager_request.qtpl:1 +//line notifier/alertmanager_request.qtpl:1 package notifier -//line app/vmalert/notifier/alertmanager_request.qtpl:1 +//line notifier/alertmanager_request.qtpl:1 import ( "strconv" "time" ) -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 import ( qtio422016 "io" qt422016 "github.com/valyala/quicktemplate" ) -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 var ( _ = qtio422016.Copy _ = qt422016.AcquireByteBuffer ) -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) { -//line app/vmalert/notifier/alertmanager_request.qtpl:7 +//line notifier/alertmanager_request.qtpl:7 qw422016.N().S(`[`) -//line app/vmalert/notifier/alertmanager_request.qtpl:9 +//line notifier/alertmanager_request.qtpl:9 for i, alert := range alerts { -//line app/vmalert/notifier/alertmanager_request.qtpl:9 +//line notifier/alertmanager_request.qtpl:9 qw422016.N().S(`{"startsAt":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:11 +//line notifier/alertmanager_request.qtpl:11 qw422016.N().Q(alert.Start.Format(time.RFC3339Nano)) -//line app/vmalert/notifier/alertmanager_request.qtpl:11 +//line notifier/alertmanager_request.qtpl:11 qw422016.N().S(`,"generatorURL":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:12 - qw422016.N().Q(generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10))) -//line app/vmalert/notifier/alertmanager_request.qtpl:12 +//line notifier/alertmanager_request.qtpl:12 + qw422016.N().Q(generatorURL(strconv.FormatUint(alert.GroupID, 10), strconv.FormatUint(alert.ID, 10))) +//line notifier/alertmanager_request.qtpl:12 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:13 +//line notifier/alertmanager_request.qtpl:13 if !alert.End.IsZero() { -//line app/vmalert/notifier/alertmanager_request.qtpl:13 +//line notifier/alertmanager_request.qtpl:13 qw422016.N().S(`"endsAt":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:14 +//line notifier/alertmanager_request.qtpl:14 qw422016.N().Q(alert.End.Format(time.RFC3339Nano)) -//line app/vmalert/notifier/alertmanager_request.qtpl:14 +//line notifier/alertmanager_request.qtpl:14 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:15 +//line notifier/alertmanager_request.qtpl:15 } -//line app/vmalert/notifier/alertmanager_request.qtpl:15 +//line notifier/alertmanager_request.qtpl:15 qw422016.N().S(`"labels": {"alertname":`) -//line app/vmalert/notifier/alertmanager_request.qtpl:17 +//line notifier/alertmanager_request.qtpl:17 qw422016.N().Q(alert.Name) -//line app/vmalert/notifier/alertmanager_request.qtpl:18 +//line notifier/alertmanager_request.qtpl:18 for k, v := range alert.Labels { -//line app/vmalert/notifier/alertmanager_request.qtpl:18 +//line notifier/alertmanager_request.qtpl:18 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:19 +//line notifier/alertmanager_request.qtpl:19 qw422016.N().Q(k) -//line app/vmalert/notifier/alertmanager_request.qtpl:19 +//line notifier/alertmanager_request.qtpl:19 qw422016.N().S(`:`) -//line app/vmalert/notifier/alertmanager_request.qtpl:19 +//line notifier/alertmanager_request.qtpl:19 qw422016.N().Q(v) -//line app/vmalert/notifier/alertmanager_request.qtpl:20 +//line notifier/alertmanager_request.qtpl:20 } -//line app/vmalert/notifier/alertmanager_request.qtpl:20 +//line notifier/alertmanager_request.qtpl:20 qw422016.N().S(`},"annotations": {`) -//line app/vmalert/notifier/alertmanager_request.qtpl:23 +//line notifier/alertmanager_request.qtpl:23 c := len(alert.Annotations) -//line app/vmalert/notifier/alertmanager_request.qtpl:24 +//line notifier/alertmanager_request.qtpl:24 for k, v := range alert.Annotations { -//line app/vmalert/notifier/alertmanager_request.qtpl:25 +//line notifier/alertmanager_request.qtpl:25 c = c - 1 -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().Q(k) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().S(`:`) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().Q(v) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 if c > 0 { -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:26 +//line notifier/alertmanager_request.qtpl:26 } -//line app/vmalert/notifier/alertmanager_request.qtpl:27 +//line notifier/alertmanager_request.qtpl:27 } -//line app/vmalert/notifier/alertmanager_request.qtpl:27 +//line notifier/alertmanager_request.qtpl:27 qw422016.N().S(`}}`) -//line app/vmalert/notifier/alertmanager_request.qtpl:30 +//line notifier/alertmanager_request.qtpl:30 if i != len(alerts)-1 { -//line app/vmalert/notifier/alertmanager_request.qtpl:30 +//line notifier/alertmanager_request.qtpl:30 qw422016.N().S(`,`) -//line app/vmalert/notifier/alertmanager_request.qtpl:30 +//line notifier/alertmanager_request.qtpl:30 } -//line app/vmalert/notifier/alertmanager_request.qtpl:31 +//line notifier/alertmanager_request.qtpl:31 } -//line app/vmalert/notifier/alertmanager_request.qtpl:31 +//line notifier/alertmanager_request.qtpl:31 qw422016.N().S(`]`) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 } -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) { -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 streamamRequest(qw422016, alerts, generatorURL) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qt422016.ReleaseWriter(qw422016) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 } -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 func amRequest(alerts []Alert, generatorURL func(string, string) string) string { -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 writeamRequest(qb422016, alerts, generatorURL) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qs422016 := string(qb422016.B) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 return qs422016 -//line app/vmalert/notifier/alertmanager_request.qtpl:33 +//line notifier/alertmanager_request.qtpl:33 } diff --git a/app/vmalert/notifier/alertmanager_test.go b/app/vmalert/notifier/alertmanager_test.go index 710d147bc..0aa8469ef 100644 --- a/app/vmalert/notifier/alertmanager_test.go +++ b/app/vmalert/notifier/alertmanager_test.go @@ -1,6 +1,7 @@ package notifier import ( + "context" "encoding/json" "net/http" "net/http/httptest" @@ -40,8 +41,8 @@ func TestAlertManager_Send(t *testing.T) { if len(a) != 1 { t.Errorf("expected 1 alert in array got %d", len(a)) } - if a[0].GeneratorURL != "group0" { - t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL) + if a[0].GeneratorURL != "0/0" { + t.Errorf("exptected 0/0 as generatorURL got %s", a[0].GeneratorURL) } if a[0].Labels["alertname"] != "alert0" { t.Errorf("exptected alert0 as alert name got %s", a[0].Labels["alertname"]) @@ -57,16 +58,16 @@ func TestAlertManager_Send(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() am := NewAlertManager(srv.URL, func(group, name string) string { - return group + name + return group + "/" + name }, srv.Client()) - if err := am.Send([]Alert{{}, {}}); err == nil { + if err := am.Send(context.Background(), []Alert{{}, {}}); err == nil { t.Error("expected connection error got nil") } - if err := am.Send([]Alert{}); err == nil { + if err := am.Send(context.Background(), []Alert{}); err == nil { t.Error("expected wrong http code error got nil") } - if err := am.Send([]Alert{{ - Group: "group", + if err := am.Send(context.Background(), []Alert{{ + GroupID: 0, Name: "alert0", Start: time.Now().UTC(), End: time.Now().UTC(), diff --git a/app/vmalert/notifier/notifier.go b/app/vmalert/notifier/notifier.go index 598be77d0..5564e23cb 100644 --- a/app/vmalert/notifier/notifier.go +++ b/app/vmalert/notifier/notifier.go @@ -1,6 +1,8 @@ package notifier +import "context" + // Notifier is common interface for alert manager provider type Notifier interface { - Send(alerts []Alert) error + Send(ctx context.Context, alerts []Alert) error } diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index 666b6cf0f..5f4b42cf0 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -17,51 +17,6 @@ import ( "github.com/VictoriaMetrics/metricsql" ) -// Group grouping array of alert -type Group struct { - Name string - Rules []*Rule -} - -// Restore restores alerts state for all group rules with For > 0 -func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration) error { - for _, rule := range g.Rules { - if rule.For == 0 { - return nil - } - if err := rule.Restore(ctx, q, lookback); err != nil { - return fmt.Errorf("error while restoring rule %q: %s", rule.Name, err) - } - } - return nil -} - -// Update group -func (g *Group) Update(newGroup Group) *Group { - //check if old rule exists at new rules - for _, newRule := range newGroup.Rules { - for _, oldRule := range g.Rules { - if newRule.Name == oldRule.Name { - //is lock nessesary? - oldRule.mu.Lock() - //we copy only rules related values - //it`s safe to add additional fields to rule - //struct - oldRule.Annotations = newRule.Annotations - oldRule.Labels = newRule.Labels - oldRule.For = newRule.For - oldRule.Expr = newRule.Expr - oldRule.group = newRule.group - newRule = oldRule - oldRule.mu.Unlock() - } - } - } - //swap rules - g.Rules = newGroup.Rules - return g -} - // Rule is basic alert entity type Rule struct { Name string `yaml:"alert"` @@ -84,6 +39,10 @@ type Rule struct { lastExecError error } +func (r *Rule) id() string { + return r.Name +} + // Validate validates rule func (r *Rule) Validate() error { if r.Name == "" { @@ -124,8 +83,16 @@ func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error { h := hash(m) updated[h] = struct{}{} if a, ok := r.alerts[h]; ok { - // update Value field with latest value - a.Value = m.Value + if a.Value != m.Value { + // update Value field with latest value + a.Value = m.Value + // and re-exec template since Value can be used + // in templates + err = r.template(a) + if err != nil { + return err + } + } continue } a, err := r.newAlert(m) @@ -180,15 +147,13 @@ func hash(m datasource.Metric) uint64 { func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) { a := ¬ifier.Alert{ - Group: r.group.Name, - Name: r.Name, - Labels: map[string]string{}, - Value: m.Value, - Start: time.Now(), + GroupID: r.group.ID(), + Name: r.Name, + Labels: map[string]string{}, + Value: m.Value, + Start: time.Now(), // TODO: support End time } - - // 1. use data labels for _, l := range m.Labels { // drop __name__ to be consistent with Prometheus alerting if l.Name == "__name__" { @@ -196,28 +161,31 @@ func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) { } a.Labels[l.Name] = l.Value } + return a, r.template(a) +} - // 2. template rule labels with data labels +func (r *Rule) template(a *notifier.Alert) error { + // 1. template rule labels with data labels rLabels, err := a.ExecTemplate(r.Labels) if err != nil { - return a, err + return err } - // 3. merge data labels and rule labels + // 2. merge data labels and rule labels // metric labels may be overridden by // rule labels for k, v := range rLabels { a.Labels[k] = v } - // 4. template merged labels + // 3. template merged labels a.Labels, err = a.ExecTemplate(a.Labels) if err != nil { - return a, err + return err } a.Annotations, err = a.ExecTemplate(r.Annotations) - return a, err + return err } // AlertAPI generates APIAlert object from alert by its id(hash) @@ -244,10 +212,11 @@ func (r *Rule) AlertsAPI() []*APIAlert { func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert { return &APIAlert{ - // encode as string to avoid rounding - ID: fmt.Sprintf("%d", a.ID), + // encode as strings to avoid rounding + ID: fmt.Sprintf("%d", a.ID), + GroupID: fmt.Sprintf("%d", a.GroupID), + Name: a.Name, - Group: a.Group, Expression: r.Expr, Labels: a.Labels, Annotations: a.Annotations, @@ -360,7 +329,7 @@ func (r *Rule) Restore(ctx context.Context, q datasource.Querier, lookback time. a.State = notifier.StatePending a.Start = time.Unix(int64(m.Value), 0) r.alerts[a.ID] = a - logger.Infof("alert %q.%q restored to state at %v", a.Group, a.Name, a.Start) + logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start) } return nil } diff --git a/app/vmalert/rule_test.go b/app/vmalert/rule_test.go index 476c78378..ae11105ba 100644 --- a/app/vmalert/rule_test.go +++ b/app/vmalert/rule_test.go @@ -2,7 +2,7 @@ package main import ( "context" - "reflect" + "sync" "testing" "time" @@ -393,19 +393,28 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { } type fakeQuerier struct { + sync.Mutex metrics []datasource.Metric } func (fq *fakeQuerier) reset() { + fq.Lock() fq.metrics = fq.metrics[:0] + fq.Unlock() } func (fq *fakeQuerier) add(metrics ...datasource.Metric) { + fq.Lock() fq.metrics = append(fq.metrics, metrics...) + fq.Unlock() } -func (fq fakeQuerier) Query(ctx context.Context, query string) ([]datasource.Metric, error) { - return fq.metrics, nil +func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) { + fq.Lock() + cpy := make([]datasource.Metric, len(fq.metrics)) + copy(cpy, fq.metrics) + fq.Unlock() + return cpy, nil } func TestRule_Restore(t *testing.T) { @@ -534,86 +543,3 @@ func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) dat m.Value = value return m } - -func TestGroup_Update(t *testing.T) { - type fields struct { - Name string - Rules []*Rule - } - type args struct { - newGroup Group - } - tests := []struct { - name string - fields fields - args args - want *Group - }{ - { - name: "update group with replace one value", - args: args{newGroup: Group{Name: "base-group", Rules: []*Rule{ - { - Annotations: map[string]string{"different": "annotation"}, - For: time.Second * 30, - }, - }}}, - fields: fields{ - Name: "base-group", - Rules: []*Rule{ - { - Annotations: map[string]string{"one": "annotations"}, - }, - }, - }, - want: &Group{ - Name: "base-group", - Rules: []*Rule{ - {Annotations: map[string]string{"different": "annotation"}, For: time.Second * 30}, - }, - }, - }, - { - name: "update group with change one value for rule", - args: args{newGroup: Group{Name: "base-group-2", Rules: []*Rule{ - { - Annotations: map[string]string{"different": "annotation", "replace-value": "new-one"}, - For: time.Second * 30, - Labels: map[string]string{"label-1": "value-1"}, - Expr: "rate(vm) > 1", - }, - }}}, - fields: fields{ - Name: "base-group-2", - Rules: []*Rule{ - { - Annotations: map[string]string{"different": "annotation", "replace-value": "old-one"}, - For: time.Second * 50, - Expr: "rate(vm) > 5", - }, - }, - }, - want: &Group{ - Name: "base-group-2", - Rules: []*Rule{ - { - Annotations: map[string]string{"different": "annotation", "replace-value": "new-one"}, - For: time.Second * 30, - Labels: map[string]string{"label-1": "value-1"}, - Expr: "rate(vm) > 1", - }, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - g := &Group{ - Name: tt.fields.Name, - Rules: tt.fields.Rules, - } - if got := g.Update(tt.args.newGroup); !reflect.DeepEqual(got, tt.want) { - t.Errorf("Update() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/app/vmalert/testdata/rules1-good.rules b/app/vmalert/testdata/rules1-good.rules new file mode 100644 index 000000000..8f7494731 --- /dev/null +++ b/app/vmalert/testdata/rules1-good.rules @@ -0,0 +1,11 @@ +groups: + - name: groupTest + rules: + - alert: VMRows + for: 1ms + expr: vm_rows > 0 + labels: + label: bar + host: "{{ $labels.instance }}" + annotations: + summary: "{{ $value }}" diff --git a/app/vmalert/web.go b/app/vmalert/web.go index b89b3b531..4146f3d48 100644 --- a/app/vmalert/web.go +++ b/app/vmalert/web.go @@ -1,27 +1,25 @@ package main import ( - "context" "encoding/json" "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "net/http" - "os" "sort" "strconv" "strings" - "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" ) -// APIAlert has info for an alert. +// APIAlert represents an notifier.Alert state +// for WEB view type APIAlert struct { ID string `json:"id"` Name string `json:"name"` - Group string `json:"group"` + GroupID string `json:"group_id"` Expression string `json:"expression"` State string `json:"state"` Value string `json:"value"` @@ -31,82 +29,15 @@ type APIAlert struct { } type requestHandler struct { - groups []Group - mu sync.RWMutex -} - -func (rh *requestHandler) runConfigUpdater(ctx context.Context, reloadChan <-chan os.Signal, groupUpdateStorage map[string]chan Group, w *watchdog, wg *sync.WaitGroup) { - logger.Infof("starting config updater") - defer wg.Done() - for { - select { - case <-reloadChan: - logger.Infof("get sighup signal, updating config") - configReloadTotal.Inc() - newRules, err := readRules() - if err != nil { - logger.Errorf("sighup, cannot read new rules: %v", err) - configReloadErrorTotal.Inc() - continue - } - - rh.mu.Lock() - configReloadOkTotal.Inc() - //send new group to running watchers - for _, group := range newRules { - //update or start new group - if updateChan, ok := groupUpdateStorage[group.Name]; ok { - updateChan <- group - } else { - //its new group, we need to start it - updateChan := make(chan Group, 1) - groupUpdateStorage[group.Name] = updateChan - wg.Add(1) - go func(grp Group) { - w.run(ctx, grp, *evaluationInterval, updateChan) - wg.Done() - }(group) - //add new group to route handler - rh.groups = append(rh.groups, group) - } - } - //we have to check, if group is missing and remove it - for groupName, updateChan := range groupUpdateStorage { - var exist bool - for _, newGroup := range newRules { - if groupName == newGroup.Name { - exist = true - } - } - if !exist { - logger.Infof("group not exists in new rules, remove it, group: %s", groupName) - delete(groupUpdateStorage, groupName) - updateChan <- Group{Rules: []*Rule{}} - for i, group := range rh.groups { - if group.Name == groupName { - rh.groups[i] = rh.groups[len(rh.groups)-1] - rh.groups[len(rh.groups)-1] = Group{} - rh.groups = rh.groups[:len(rh.groups)-1] - } - } - } - } - rh.mu.Unlock() - logger.Infof("finished sync") - - case <-ctx.Done(): - logger.Infof("exiting config updater") - return - - } - } + m *manager } var pathList = [][]string{ {"/api/v1/alerts", "list all active alerts"}, - {"/api/v1/groupName/alertID/status", "get alert status by ID"}, + {"/api/v1/groupID/alertID/status", "get alert status by ID"}, // /metrics is served by httpserver by default {"/metrics", "list of application metrics"}, + {"/-/reload", "reload configuration"}, } func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { @@ -126,7 +57,6 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { procutil.SelfSIGHUP() w.WriteHeader(http.StatusOK) return true - default: // /api/v1///status if strings.HasSuffix(r.URL.Path, "/status") { @@ -145,10 +75,10 @@ type listAlertsResponse struct { } func (rh *requestHandler) list() ([]byte, error) { - rh.mu.RLock() - defer rh.mu.RUnlock() + rh.m.groupsMu.RLock() + defer rh.m.groupsMu.RUnlock() lr := listAlertsResponse{Status: "success"} - for _, g := range rh.groups { + for _, g := range rh.m.groups { for _, r := range g.Rules { lr.Data.Alerts = append(lr.Data.Alerts, r.AlertsAPI()...) } @@ -170,8 +100,9 @@ func (rh *requestHandler) list() ([]byte, error) { } func (rh *requestHandler) alert(path string) ([]byte, error) { - rh.mu.RLock() - defer rh.mu.RUnlock() + rh.m.groupsMu.RLock() + defer rh.m.groupsMu.RUnlock() + parts := strings.SplitN(strings.TrimPrefix(path, "/api/v1/"), "/", 3) if len(parts) != 3 { return nil, &httpserver.ErrorWithStatusCode{ @@ -179,29 +110,20 @@ func (rh *requestHandler) alert(path string) ([]byte, error) { StatusCode: http.StatusBadRequest, } } - group := strings.TrimRight(parts[0], "/") - idStr := strings.TrimRight(parts[1], "/") - id, err := strconv.ParseUint(idStr, 10, 0) + + groupID, err := uint64FromPath(parts[0]) if err != nil { - return nil, &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf(`cannot parse int from %q`, idStr), - StatusCode: http.StatusBadRequest, - } + return nil, badRequest(fmt.Errorf(`cannot parse groupID: %s`, err)) } - for _, g := range rh.groups { - if g.Name != group { - continue - } - for _, rule := range g.Rules { - if apiAlert := rule.AlertAPI(id); apiAlert != nil { - return json.Marshal(apiAlert) - } - } + alertID, err := uint64FromPath(parts[1]) + if err != nil { + return nil, badRequest(fmt.Errorf(`cannot parse alertID: %s`, err)) } - return nil, &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf(`cannot find alert %s in %q`, idStr, group), - StatusCode: http.StatusNotFound, + resp, err := rh.m.AlertAPI(groupID, alertID) + if err != nil { + return nil, errResponse(err, http.StatusNotFound) } + return json.Marshal(resp) } // responseHandler wrapper on http.ResponseWriter with sugar @@ -215,3 +137,19 @@ func (w responseHandler) handle(b []byte, err error) { w.Header().Set("Content-Type", "application/json") w.Write(b) } + +func uint64FromPath(path string) (uint64, error) { + s := strings.TrimRight(path, "/") + return strconv.ParseUint(s, 10, 0) +} + +func badRequest(err error) *httpserver.ErrorWithStatusCode { + return errResponse(err, http.StatusBadRequest) +} + +func errResponse(err error, sc int) *httpserver.ErrorWithStatusCode { + return &httpserver.ErrorWithStatusCode{ + Err: err, + StatusCode: sc, + } +} diff --git a/app/vmalert/web_test.go b/app/vmalert/web_test.go index 6b88a54c5..012a334c5 100644 --- a/app/vmalert/web_test.go +++ b/app/vmalert/web_test.go @@ -1,17 +1,13 @@ package main import ( - "context" "encoding/json" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "net/http" "net/http/httptest" - "os" "reflect" - "sync" - "syscall" "testing" - "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" ) func TestHandler(t *testing.T) { @@ -21,13 +17,14 @@ func TestHandler(t *testing.T) { 0: {}, }, } - rh := &requestHandler{ - groups: []Group{{ - Name: "group", - Rules: []*Rule{rule}, - }}, - mu: sync.RWMutex{}, + g := &Group{ + Name: "group", + Rules: []*Rule{rule}, } + m := &manager{groups: make(map[uint64]*Group)} + m.groups[0] = g + rh := &requestHandler{m: m} + getResp := func(url string, to interface{}, code int) { t.Helper() resp, err := http.Get(url) @@ -57,114 +54,21 @@ func TestHandler(t *testing.T) { t.Errorf("expected 1 alert got %d", length) } }) - t.Run("/api/v1/group/0/status", func(t *testing.T) { + t.Run("/api/v1/0/0/status", func(t *testing.T) { alert := &APIAlert{} - getResp(ts.URL+"/api/v1/group/0/status", alert, 200) + getResp(ts.URL+"/api/v1/0/0/status", alert, 200) expAlert := rule.newAlertAPI(*rule.alerts[0]) if !reflect.DeepEqual(alert, expAlert) { t.Errorf("expected %v is equal to %v", alert, expAlert) } }) - t.Run("/api/v1/group/1/status", func(t *testing.T) { - getResp(ts.URL+"/api/v1/group/1/status", nil, 404) + t.Run("/api/v1/0/1/status", func(t *testing.T) { + getResp(ts.URL+"/api/v1/0/1/status", nil, 404) }) - t.Run("/api/v1/unknown-group/0/status", func(t *testing.T) { - getResp(ts.URL+"/api/v1/unknown-group/0/status", nil, 404) + t.Run("/api/v1/1/0/status", func(t *testing.T) { + getResp(ts.URL+"/api/v1/1/0/status", nil, 404) }) t.Run("/", func(t *testing.T) { getResp(ts.URL, nil, 200) }) } - -func Test_requestHandler_runConfigUpdater(t *testing.T) { - type fields struct { - groups []Group - } - type args struct { - updateChan chan os.Signal - w *watchdog - wg *sync.WaitGroup - initRulePath []string - updateRulePath string - } - tests := []struct { - name string - fields fields - args args - want []Group - }{ - { - name: "update good rules", - args: args{ - w: &watchdog{}, - wg: &sync.WaitGroup{}, - updateChan: make(chan os.Signal), - initRulePath: []string{"testdata/rules0-good.rules"}, - updateRulePath: "testdata/dir/rules1-good.rules", - }, - fields: fields{ - groups: []Group{}, - }, - want: []Group{{Name: "duplicatedGroupDiffFiles", Rules: []*Rule{newTestRule("VMRows", time.Second*10)}}}, - }, - { - name: "update with one bad rule file", - args: args{ - w: &watchdog{}, - wg: &sync.WaitGroup{}, - updateChan: make(chan os.Signal), - initRulePath: []string{"testdata/rules0-good.rules"}, - updateRulePath: "testdata/dir/rules2-bad.rules", - }, - fields: fields{ - groups: []Group{}, - }, - want: []Group{ - { - Name: "duplicatedGroupDiffFiles", Rules: []*Rule{ - newTestRule("VMRows", time.Second*10), - }}, - { - Name: "TestGroup", Rules: []*Rule{ - newTestRule("Conns", time.Duration(0)), - newTestRule("ExampleAlertAlwaysFiring", time.Duration(0)), - }}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.TODO()) - grp, err := Parse(tt.args.initRulePath, *validateTemplates) - if err != nil { - t.Errorf("cannot setup test: %v", err) - cancel() - return - } - groupUpdateStorage := startInitGroups(ctx, tt.args.w, nil, grp, tt.args.wg) - rh := &requestHandler{ - groups: grp, - mu: sync.RWMutex{}, - } - tt.args.wg.Add(1) - go func() { - //possible side effect with global var modification - err = rulePath.Set(tt.args.updateRulePath) - if err != nil { - t.Errorf("cannot update rule") - panic(err) - } - //need some delay - time.Sleep(time.Millisecond * 300) - tt.args.updateChan <- syscall.SIGHUP - cancel() - - }() - rh.runConfigUpdater(ctx, tt.args.updateChan, groupUpdateStorage, tt.args.w, tt.args.wg) - tt.args.wg.Wait() - if len(tt.want) != len(rh.groups) { - t.Errorf("want: %v,\ngot :%v ", tt.want, rh.groups) - } - }) - } -}