diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go index 6c38d9e67..f027dad59 100644 --- a/app/vmalert/config/config.go +++ b/app/vmalert/config/config.go @@ -1,6 +1,7 @@ package config import ( + "crypto/md5" "fmt" "hash/fnv" "io/ioutil" @@ -24,11 +25,30 @@ type Group struct { Interval time.Duration `yaml:"interval,omitempty"` Rules []Rule `yaml:"rules"` Concurrency int `yaml:"concurrency"` + // Checksum stores the hash of yaml definition for this group. + // May be used to detect any changes like rules re-ordering etc. + Checksum string // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (g *Group) UnmarshalYAML(unmarshal func(interface{}) error) error { + type group Group + if err := unmarshal((*group)(g)); err != nil { + return err + } + b, err := yaml.Marshal(g) + if err != nil { + return fmt.Errorf("failed to marshal group configuration for checksum: %s", err) + } + h := md5.New() + h.Write(b) + g.Checksum = fmt.Sprintf("%x", h.Sum(nil)) + return nil +} + // Validate check for internal Group or Rule configuration errors func (g *Group) Validate(validateAnnotations, validateExpressions bool) error { if g.Name == "" { @@ -101,7 +121,7 @@ func (r *Rule) Name() string { } // HashRule hashes significant Rule fields into -// unique hash value +// unique hash that supposed to define Rule uniqueness func HashRule(r Rule) uint64 { h := fnv.New64a() h.Write([]byte(r.Expr)) @@ -112,16 +132,7 @@ func HashRule(r Rule) uint64 { h.Write([]byte("alerting")) h.Write([]byte(r.Alert)) } - type item struct { - key, value string - } - var kv []item - for k, v := range r.Labels { - kv = append(kv, item{key: k, value: v}) - } - sort.Slice(kv, func(i, j int) bool { - return kv[i].key < kv[j].key - }) + kv := sortMap(r.Labels) for _, i := range kv { h.Write([]byte(i.key)) h.Write([]byte(i.value)) @@ -204,3 +215,18 @@ func checkOverflow(m map[string]interface{}, ctx string) error { } return nil } + +type item struct { + key, value string +} + +func sortMap(m map[string]string) []item { + var kv []item + for k, v := range m { + kv = append(kv, item{key: k, value: v}) + } + sort.Slice(kv, func(i, j int) bool { + return kv[i].key < kv[j].key + }) + return kv +} diff --git a/app/vmalert/config/config_test.go b/app/vmalert/config/config_test.go index d46b4ba23..d01665980 100644 --- a/app/vmalert/config/config_test.go +++ b/app/vmalert/config/config_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "gopkg.in/yaml.v2" ) func TestMain(m *testing.M) { @@ -320,3 +321,36 @@ func TestHashRule(t *testing.T) { } } } + +func TestGroupChecksum(t *testing.T) { + data := ` +name: TestGroup +rules: + - alert: ExampleAlertAlwaysFiring + expr: sum by(job) (up == 1) + - record: handler:requests:rate5m + expr: sum(rate(prometheus_http_requests_total[5m])) by (handler) +` + var g Group + if err := yaml.Unmarshal([]byte(data), &g); err != nil { + t.Fatalf("failed to unmarshal: %s", err) + } + if g.Checksum == "" { + t.Fatalf("expected to get non-empty checksum") + } + newData := ` +name: TestGroup +rules: + - record: handler:requests:rate5m + expr: sum(rate(prometheus_http_requests_total[5m])) by (handler) + - alert: ExampleAlertAlwaysFiring + expr: sum by(job) (up == 1) +` + var ng Group + if err := yaml.Unmarshal([]byte(newData), &g); err != nil { + t.Fatalf("failed to unmarshal: %s", err) + } + if g.Checksum == ng.Checksum { + t.Fatalf("expected to get different checksums") + } +} diff --git a/app/vmalert/group.go b/app/vmalert/group.go index e94dec112..f2a1b5bd7 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -24,6 +24,7 @@ type Group struct { Rules []Rule Interval time.Duration Concurrency int + Checksum string doneCh chan struct{} finishedCh chan struct{} @@ -53,6 +54,7 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string File: cfg.File, Interval: cfg.Interval, Concurrency: cfg.Concurrency, + Checksum: cfg.Checksum, doneCh: make(chan struct{}), finishedCh: make(chan struct{}), updateCh: make(chan *Group), @@ -156,6 +158,7 @@ func (g *Group) updateWith(newGroup *Group) error { newRules = append(newRules, nr) } g.Concurrency = newGroup.Concurrency + g.Checksum = newGroup.Checksum g.Rules = newRules return nil } diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index 2c749b830..3abdd1340 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -93,31 +93,53 @@ func (m *manager) update(ctx context.Context, path []string, validateTpl, valida groupsRegistry[ng.ID()] = ng } + type updateItem struct { + old *Group + new *Group + } + var toUpdate []updateItem + m.groupsMu.Lock() for _, og := range m.groups { ng, ok := groupsRegistry[og.ID()] if !ok { - // old group is not present in new list - // and must be stopped and deleted + // old group is not present in new list, + // so must be stopped and deleted og.close() delete(m.groups, og.ID()) og = nil continue } - og.updateCh <- ng delete(groupsRegistry, ng.ID()) + if og.Checksum != ng.Checksum { + toUpdate = append(toUpdate, updateItem{old: og, new: ng}) + } } - for _, ng := range groupsRegistry { m.startGroup(ctx, ng, restore) } m.groupsMu.Unlock() + + if len(toUpdate) > 0 { + var wg sync.WaitGroup + for _, item := range toUpdate { + wg.Add(1) + go func(old *Group, new *Group) { + old.updateCh <- new + wg.Done() + }(item.old, item.new) + } + wg.Wait() + } return nil } func (g *Group) toAPI() APIGroup { + g.mu.RLock() + defer g.mu.RUnlock() + ag := APIGroup{ - // encode as strings to avoid rounding + // encode as string to avoid rounding ID: fmt.Sprintf("%d", g.ID()), Name: g.Name, File: g.File,