From 16e0bb496e139348e67967ea1d71f463dc1c03cd Mon Sep 17 00:00:00 2001
From: Roman Khavronenko <hagen1778@gmail.com>
Date: Fri, 11 Sep 2020 20:14:30 +0100
Subject: [PATCH] vmalert: update groups on config reload only if changes
 detected (#759)

On config reload event `vmalert` reloads configuration for every group. While
it works for simple configurations, the more complex and heavy installations may
suffer from frequent config reloads.
The change introduces the `checksum` field for every group and is set to md5 hash
of yaml configuration. The checksum will change if on any change to group
definition like rules order or annotation change. Comparing the `checksum` field
on config reload event helps to detect if group should be updated.
The groups update is now done concurrently, so reload duration will be limited by
the slowest group now.

Partially solves #691 by improving config reload speed.
---
 app/vmalert/config/config.go      | 48 ++++++++++++++++++++++++-------
 app/vmalert/config/config_test.go | 34 ++++++++++++++++++++++
 app/vmalert/group.go              |  3 ++
 app/vmalert/manager.go            | 32 +++++++++++++++++----
 4 files changed, 101 insertions(+), 16 deletions(-)

diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go
index 6c38d9e678..f027dad59d 100644
--- a/app/vmalert/config/config.go
+++ b/app/vmalert/config/config.go
@@ -1,6 +1,7 @@
 package config
 
 import (
+	"crypto/md5"
 	"fmt"
 	"hash/fnv"
 	"io/ioutil"
@@ -24,11 +25,30 @@ type Group struct {
 	Interval    time.Duration `yaml:"interval,omitempty"`
 	Rules       []Rule        `yaml:"rules"`
 	Concurrency int           `yaml:"concurrency"`
+	// Checksum stores the hash of yaml definition for this group.
+	// May be used to detect any changes like rules re-ordering etc.
+	Checksum string
 
 	// Catches all undefined fields and must be empty after parsing.
 	XXX map[string]interface{} `yaml:",inline"`
 }
 
+// UnmarshalYAML implements the yaml.Unmarshaler interface.
+func (g *Group) UnmarshalYAML(unmarshal func(interface{}) error) error {
+	type group Group
+	if err := unmarshal((*group)(g)); err != nil {
+		return err
+	}
+	b, err := yaml.Marshal(g)
+	if err != nil {
+		return fmt.Errorf("failed to marshal group configuration for checksum: %s", err)
+	}
+	h := md5.New()
+	h.Write(b)
+	g.Checksum = fmt.Sprintf("%x", h.Sum(nil))
+	return nil
+}
+
 // Validate check for internal Group or Rule configuration errors
 func (g *Group) Validate(validateAnnotations, validateExpressions bool) error {
 	if g.Name == "" {
@@ -101,7 +121,7 @@ func (r *Rule) Name() string {
 }
 
 // HashRule hashes significant Rule fields into
-// unique hash value
+// unique hash that supposed to define Rule uniqueness
 func HashRule(r Rule) uint64 {
 	h := fnv.New64a()
 	h.Write([]byte(r.Expr))
@@ -112,16 +132,7 @@ func HashRule(r Rule) uint64 {
 		h.Write([]byte("alerting"))
 		h.Write([]byte(r.Alert))
 	}
-	type item struct {
-		key, value string
-	}
-	var kv []item
-	for k, v := range r.Labels {
-		kv = append(kv, item{key: k, value: v})
-	}
-	sort.Slice(kv, func(i, j int) bool {
-		return kv[i].key < kv[j].key
-	})
+	kv := sortMap(r.Labels)
 	for _, i := range kv {
 		h.Write([]byte(i.key))
 		h.Write([]byte(i.value))
@@ -204,3 +215,18 @@ func checkOverflow(m map[string]interface{}, ctx string) error {
 	}
 	return nil
 }
+
+type item struct {
+	key, value string
+}
+
+func sortMap(m map[string]string) []item {
+	var kv []item
+	for k, v := range m {
+		kv = append(kv, item{key: k, value: v})
+	}
+	sort.Slice(kv, func(i, j int) bool {
+		return kv[i].key < kv[j].key
+	})
+	return kv
+}
diff --git a/app/vmalert/config/config_test.go b/app/vmalert/config/config_test.go
index d46b4ba239..d01665980d 100644
--- a/app/vmalert/config/config_test.go
+++ b/app/vmalert/config/config_test.go
@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
+	"gopkg.in/yaml.v2"
 )
 
 func TestMain(m *testing.M) {
@@ -320,3 +321,36 @@ func TestHashRule(t *testing.T) {
 		}
 	}
 }
+
+func TestGroupChecksum(t *testing.T) {
+	data := `
+name: TestGroup
+rules:
+  - alert: ExampleAlertAlwaysFiring
+    expr: sum by(job) (up == 1)
+  - record: handler:requests:rate5m
+    expr: sum(rate(prometheus_http_requests_total[5m])) by (handler)
+`
+	var g Group
+	if err := yaml.Unmarshal([]byte(data), &g); err != nil {
+		t.Fatalf("failed to unmarshal: %s", err)
+	}
+	if g.Checksum == "" {
+		t.Fatalf("expected to get non-empty checksum")
+	}
+	newData := `
+name: TestGroup
+rules:
+  - record: handler:requests:rate5m
+    expr: sum(rate(prometheus_http_requests_total[5m])) by (handler)
+  - alert: ExampleAlertAlwaysFiring
+    expr: sum by(job) (up == 1)
+`
+	var ng Group
+	if err := yaml.Unmarshal([]byte(newData), &g); err != nil {
+		t.Fatalf("failed to unmarshal: %s", err)
+	}
+	if g.Checksum == ng.Checksum {
+		t.Fatalf("expected to get different checksums")
+	}
+}
diff --git a/app/vmalert/group.go b/app/vmalert/group.go
index e94dec112a..f2a1b5bd7d 100644
--- a/app/vmalert/group.go
+++ b/app/vmalert/group.go
@@ -24,6 +24,7 @@ type Group struct {
 	Rules       []Rule
 	Interval    time.Duration
 	Concurrency int
+	Checksum    string
 
 	doneCh     chan struct{}
 	finishedCh chan struct{}
@@ -53,6 +54,7 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string
 		File:        cfg.File,
 		Interval:    cfg.Interval,
 		Concurrency: cfg.Concurrency,
+		Checksum:    cfg.Checksum,
 		doneCh:      make(chan struct{}),
 		finishedCh:  make(chan struct{}),
 		updateCh:    make(chan *Group),
@@ -156,6 +158,7 @@ func (g *Group) updateWith(newGroup *Group) error {
 		newRules = append(newRules, nr)
 	}
 	g.Concurrency = newGroup.Concurrency
+	g.Checksum = newGroup.Checksum
 	g.Rules = newRules
 	return nil
 }
diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go
index 2c749b830e..3abdd1340c 100644
--- a/app/vmalert/manager.go
+++ b/app/vmalert/manager.go
@@ -93,31 +93,53 @@ func (m *manager) update(ctx context.Context, path []string, validateTpl, valida
 		groupsRegistry[ng.ID()] = ng
 	}
 
+	type updateItem struct {
+		old *Group
+		new *Group
+	}
+	var toUpdate []updateItem
+
 	m.groupsMu.Lock()
 	for _, og := range m.groups {
 		ng, ok := groupsRegistry[og.ID()]
 		if !ok {
-			// old group is not present in new list
-			// and must be stopped and deleted
+			// old group is not present in new list,
+			// so must be stopped and deleted
 			og.close()
 			delete(m.groups, og.ID())
 			og = nil
 			continue
 		}
-		og.updateCh <- ng
 		delete(groupsRegistry, ng.ID())
+		if og.Checksum != ng.Checksum {
+			toUpdate = append(toUpdate, updateItem{old: og, new: ng})
+		}
 	}
-
 	for _, ng := range groupsRegistry {
 		m.startGroup(ctx, ng, restore)
 	}
 	m.groupsMu.Unlock()
+
+	if len(toUpdate) > 0 {
+		var wg sync.WaitGroup
+		for _, item := range toUpdate {
+			wg.Add(1)
+			go func(old *Group, new *Group) {
+				old.updateCh <- new
+				wg.Done()
+			}(item.old, item.new)
+		}
+		wg.Wait()
+	}
 	return nil
 }
 
 func (g *Group) toAPI() APIGroup {
+	g.mu.RLock()
+	defer g.mu.RUnlock()
+
 	ag := APIGroup{
-		// encode as strings to avoid rounding
+		// encode as string to avoid rounding
 		ID:          fmt.Sprintf("%d", g.ID()),
 		Name:        g.Name,
 		File:        g.File,