vmalert: automatically reload configuration on file change (#1326)

New flag `-rule.configCheckInterval` defines how often `vmalert` will re-read
config file. If it detects any changes, the config will be reloaded.
This behaviour is turned off by default.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/512
This commit is contained in:
Roman Khavronenko 2021-05-25 14:27:22 +01:00 committed by Aliaksandr Valialkin
parent 6b90570ed3
commit e183a5c532
6 changed files with 300 additions and 145 deletions

View file

@ -66,7 +66,8 @@ run-vmalert: vmalert
-remoteRead.url=http://localhost:8428 \ -remoteRead.url=http://localhost:8428 \
-external.label=cluster=east-1 \ -external.label=cluster=east-1 \
-external.label=replica=a \ -external.label=replica=a \
-evaluationInterval=3s -evaluationInterval=3s \
-rule.configCheckInterval=10s
vmalert-amd64: vmalert-amd64:
CGO_ENABLED=1 GOARCH=amd64 $(MAKE) vmalert-local-with-goarch CGO_ENABLED=1 GOARCH=amd64 $(MAKE) vmalert-local-with-goarch

View file

@ -396,6 +396,8 @@ The shortlist of configuration flags is the following:
absolute path to all .yaml files in root. absolute path to all .yaml files in root.
Rule files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars. Rule files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars.
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
-rule.configCheckInterval duration
Interval for checking for changes in '-rule' files. By default the checking is disabled. Send SIGHUP signal in order to force config check for changes
-rule.validateExpressions -rule.validateExpressions
Whether to validate rules expressions via MetricsQL engine (default true) Whether to validate rules expressions via MetricsQL engine (default true)
-rule.validateTemplates -rule.validateTemplates
@ -413,8 +415,11 @@ The shortlist of configuration flags is the following:
Pass `-help` to `vmalert` in order to see the full list of supported Pass `-help` to `vmalert` in order to see the full list of supported
command-line flags with their descriptions. command-line flags with their descriptions.
To reload configuration without `vmalert` restart send SIGHUP signal `vmalert` supports "hot" config reload via the following methods:
or send GET request to `/-/reload` endpoint. * send SIGHUP signal to `vmalert` process;
* send GET request to `/-/reload` endpoint;
* configure `-rule.configCheckInterval` flag for periodic reload
on config change.
## Contributing ## Contributing

View file

@ -34,6 +34,9 @@ Examples:
absolute path to all .yaml files in root. absolute path to all .yaml files in root.
Rule files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars.`) Rule files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars.`)
rulesCheckInterval = flag.Duration("rule.configCheckInterval", 0, "Interval for checking for changes in '-rule' files. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules") evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules")
@ -78,34 +81,17 @@ func main() {
logger.Fatalf("failed to init: %s", err) logger.Fatalf("failed to init: %s", err)
} }
// Register SIGHUP handler for config re-read just before manager.start call. logger.Infof("reading rules configuration file from %q", strings.Join(*rulePath, ";"))
// This guarantees that the config will be re-read if the signal arrives during manager.start call. groupsCfg, err := config.Parse(*rulePath, *validateTemplates, *validateExpressions)
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 if err != nil {
sighupCh := procutil.NewSighupChan() logger.Fatalf("cannot parse configuration file: %s", err)
}
if err := manager.start(ctx, *rulePath, *validateTemplates, *validateExpressions); err != nil { if err := manager.start(ctx, groupsCfg); err != nil {
logger.Fatalf("failed to start: %s", err) logger.Fatalf("failed to start: %s", err)
} }
go func() { go configReload(ctx, manager, groupsCfg)
// init reload metrics with positive values to improve alerting conditions
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
for {
<-sighupCh
configReloads.Inc()
logger.Infof("SIGHUP received. Going to reload rules %q ...", *rulePath)
if err := manager.update(ctx, *rulePath, *validateTemplates, *validateExpressions, false); err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("error while reloading rules: %s", err)
continue
}
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("Rules reloaded successfully from %q", *rulePath)
}
}()
rh := &requestHandler{m: manager} rh := &requestHandler{m: manager}
go httpserver.Serve(*httpListenAddr, rh.handler) go httpserver.Serve(*httpListenAddr, rh.handler)
@ -228,3 +214,62 @@ See the docs at https://docs.victoriametrics.com/vmalert.html .
` `
flagutil.Usage(s) flagutil.Usage(s)
} }
func configReload(ctx context.Context, m *manager, groupsCfg []config.Group) {
// Register SIGHUP handler for config re-read just before manager.start call.
// This guarantees that the config will be re-read if the signal arrives during manager.start call.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
var configCheckCh <-chan time.Time
if *rulesCheckInterval > 0 {
ticker := time.NewTicker(*rulesCheckInterval)
configCheckCh = ticker.C
defer ticker.Stop()
}
// init reload metrics with positive values to improve alerting conditions
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
for {
select {
case <-ctx.Done():
return
case <-sighupCh:
logger.Infof("SIGHUP received. Going to reload rules %q ...", *rulePath)
configReloads.Inc()
case <-configCheckCh:
}
newGroupsCfg, err := config.Parse(*rulePath, *validateTemplates, *validateExpressions)
if err != nil {
logger.Errorf("cannot parse configuration file: %s", err)
continue
}
if configsEqual(newGroupsCfg, groupsCfg) {
// config didn't change - skip it
continue
}
groupsCfg = newGroupsCfg
if err := m.update(ctx, groupsCfg, false); err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("error while reloading rules: %s", err)
continue
}
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("Rules reloaded successfully from %q", *rulePath)
}
}
func configsEqual(a, b []config.Group) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i].Checksum != b[i].Checksum {
return false
}
}
return true
}

View file

@ -1,12 +1,16 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"io/ioutil"
"net/url" "net/url"
"os" "os"
"testing" "testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
) )
func TestGetExternalURL(t *testing.T) { func TestGetExternalURL(t *testing.T) {
@ -51,3 +55,95 @@ func TestGetAlertURLGenerator(t *testing.T) {
t.Errorf("unexpected url want %s, got %s", exp, fn(testAlert)) t.Errorf("unexpected url want %s, got %s", exp, fn(testAlert))
} }
} }
func TestConfigReload(t *testing.T) {
originalRulePath := *rulePath
defer func() {
*rulePath = originalRulePath
}()
const (
rules1 = `
groups:
- name: group-1
rules:
- alert: ExampleAlertAlwaysFiring
expr: sum by(job) (up == 1)
- record: handler:requests:rate5m
expr: sum(rate(prometheus_http_requests_total[5m])) by (handler)
`
rules2 = `
groups:
- name: group-1
rules:
- alert: ExampleAlertAlwaysFiring
expr: sum by(job) (up == 1)
- name: group-2
rules:
- record: handler:requests:rate5m
expr: sum(rate(prometheus_http_requests_total[5m])) by (handler)
`
)
f, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
writeToFile(t, f.Name(), rules1)
*rulesCheckInterval = 200 * time.Millisecond
*rulePath = []string{f.Name()}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m := &manager{
querierBuilder: &fakeQuerier{},
groups: make(map[uint64]*Group),
labels: map[string]string{},
}
go configReload(ctx, m, nil)
lenLocked := func(m *manager) int {
m.groupsMu.RLock()
defer m.groupsMu.RUnlock()
return len(m.groups)
}
time.Sleep(*rulesCheckInterval * 2)
groupsLen := lenLocked(m)
if groupsLen != 1 {
t.Fatalf("expected to have exactly 1 group loaded; got %d", groupsLen)
}
writeToFile(t, f.Name(), rules2)
time.Sleep(*rulesCheckInterval * 2)
groupsLen = lenLocked(m)
if groupsLen != 2 {
fmt.Println(m.groups)
t.Fatalf("expected to have exactly 2 groups loaded; got %d", groupsLen)
}
writeToFile(t, f.Name(), rules1)
procutil.SelfSIGHUP()
time.Sleep(*rulesCheckInterval / 2)
groupsLen = lenLocked(m)
if groupsLen != 1 {
t.Fatalf("expected to have exactly 1 group loaded; got %d", groupsLen)
}
writeToFile(t, f.Name(), `corrupted`)
procutil.SelfSIGHUP()
time.Sleep(*rulesCheckInterval / 2)
groupsLen = lenLocked(m)
if groupsLen != 1 { // should remain unchanged
t.Fatalf("expected to have exactly 1 group loaded; got %d", groupsLen)
}
}
func writeToFile(t *testing.T, file, b string) {
t.Helper()
err := ioutil.WriteFile(file, []byte(b), 0644)
if err != nil {
t.Fatal(err)
}
}

View file

@ -3,7 +3,6 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
@ -50,8 +49,8 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name) return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name)
} }
func (m *manager) start(ctx context.Context, path []string, validateTpl, validateExpr bool) error { func (m *manager) start(ctx context.Context, groupsCfg []config.Group) error {
return m.update(ctx, path, validateTpl, validateExpr, true) return m.update(ctx, groupsCfg, true)
} }
func (m *manager) close() { func (m *manager) close() {
@ -85,13 +84,7 @@ func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) er
return nil return nil
} }
func (m *manager) update(ctx context.Context, path []string, validateTpl, validateExpr, restore bool) error { func (m *manager) update(ctx context.Context, groupsCfg []config.Group, restore bool) error {
logger.Infof("reading rules configuration file from %q", strings.Join(path, ";"))
groupsCfg, err := config.Parse(path, validateTpl, validateExpr)
if err != nil {
return fmt.Errorf("cannot parse configuration file: %w", err)
}
groupsRegistry := make(map[uint64]*Group) groupsRegistry := make(map[uint64]*Group)
for _, cfg := range groupsCfg { for _, cfg := range groupsCfg {
ng := newGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels) ng := newGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels)

View file

@ -9,8 +9,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
) )
@ -25,9 +25,8 @@ func TestMain(m *testing.M) {
// starting with empty rules folder // starting with empty rules folder
func TestManagerEmptyRulesDir(t *testing.T) { func TestManagerEmptyRulesDir(t *testing.T) {
m := &manager{groups: make(map[uint64]*Group)} m := &manager{groups: make(map[uint64]*Group)}
path := []string{"foo/bar"} cfg := loadCfg(t, []string{"foo/bar"}, true, true)
err := m.update(context.Background(), path, true, true, false) if err := m.update(context.Background(), cfg, false); err != nil {
if err != nil {
t.Fatalf("expected to load succesfully with empty rules dir; got err instead: %v", err) t.Fatalf("expected to load succesfully with empty rules dir; got err instead: %v", err)
} }
} }
@ -51,7 +50,8 @@ func TestManagerUpdateConcurrent(t *testing.T) {
"config/testdata/rules2-good.rules", "config/testdata/rules2-good.rules",
} }
*evaluationInterval = time.Millisecond *evaluationInterval = time.Millisecond
if err := m.start(context.Background(), []string{paths[0]}, true, true); err != nil { cfg := loadCfg(t, []string{paths[0]}, true, true)
if err := m.start(context.Background(), cfg); err != nil {
t.Fatalf("failed to start: %s", err) t.Fatalf("failed to start: %s", err)
} }
@ -64,8 +64,11 @@ func TestManagerUpdateConcurrent(t *testing.T) {
defer wg.Done() defer wg.Done()
for i := 0; i < iterations; i++ { for i := 0; i < iterations; i++ {
rnd := rand.Intn(len(paths)) rnd := rand.Intn(len(paths))
path := []string{paths[rnd]} cfg, err := config.Parse([]string{paths[rnd]}, true, true)
_ = m.update(context.Background(), path, true, true, false) if err != nil { // update can fail and this is expected
continue
}
_ = m.update(context.Background(), cfg, false)
} }
}() }()
} }
@ -243,13 +246,16 @@ func TestManagerUpdate(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
m := &manager{groups: make(map[uint64]*Group), querierBuilder: &fakeQuerier{}} m := &manager{groups: make(map[uint64]*Group), querierBuilder: &fakeQuerier{}}
path := []string{tc.initPath}
if err := m.update(ctx, path, true, true, false); err != nil { cfgInit := loadCfg(t, []string{tc.initPath}, true, true)
if err := m.update(ctx, cfgInit, false); err != nil {
t.Fatalf("failed to complete initial rules update: %s", err) t.Fatalf("failed to complete initial rules update: %s", err)
} }
path = []string{tc.updatePath} cfgUpdate, err := config.Parse([]string{tc.updatePath}, true, true)
_ = m.update(ctx, path, true, true, false) if err == nil { // update can fail and that's expected
_ = m.update(ctx, cfgUpdate, false)
}
if len(tc.want) != len(m.groups) { if len(tc.want) != len(m.groups) {
t.Fatalf("\nwant number of groups: %d;\ngot: %d ", len(tc.want), len(m.groups)) t.Fatalf("\nwant number of groups: %d;\ngot: %d ", len(tc.want), len(m.groups))
} }
@ -267,3 +273,12 @@ func TestManagerUpdate(t *testing.T) {
}) })
} }
} }
func loadCfg(t *testing.T, path []string, validateAnnotations, validateExpressions bool) []config.Group {
t.Helper()
cfg, err := config.Parse(path, validateAnnotations, validateExpressions)
if err != nil {
t.Fatal(err)
}
return cfg
}