From 5f16ceb294946a48a65e958fa8acd91a2e266ad9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 3 Sep 2020 01:00:55 +0300 Subject: [PATCH] app/vmalert: imrovements over 3f932c2db17276d3390176d29feb3f1e1e86d43b --- app/vmalert/group.go | 35 +++++++++++++++++++++++------------ app/vmalert/group_test.go | 6 ++++++ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/app/vmalert/group.go b/app/vmalert/group.go index ec1eddcd8..e94dec112 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "hash/fnv" - "strconv" "sync" "time" @@ -15,7 +14,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" - "github.com/cespare/xxhash/v2" ) // Group is an entity for grouping rules @@ -182,19 +180,32 @@ func (g *Group) close() { } } +var skipRandSleepOnGroupStart bool + func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { defer func() { close(g.finishedCh) }() - // This should spread load of rule evaluation by group - h := uint32(xxhash.Sum64([]byte(strconv.FormatUint(g.ID(), 10)))) - randSleep := uint64(float64(g.Interval) * (float64(h) / (1 << 32))) - sleeper := time.NewTimer(time.Duration(randSleep)) - select { - case <-g.finishedCh: - sleeper.Stop() - return - case <-sleeper.C: + + // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics. + if !skipRandSleepOnGroupStart { + randSleep := uint64(float64(g.Interval) * (float64(uint32(g.ID())) / (1 << 32))) + sleepOffset := uint64(time.Now().UnixNano()) % uint64(g.Interval) + if randSleep < sleepOffset { + randSleep += uint64(g.Interval) + } + randSleep -= sleepOffset + sleepTimer := time.NewTimer(time.Duration(randSleep)) + select { + case <-ctx.Done(): + sleepTimer.Stop() + return + case <-g.doneCh: + sleepTimer.Stop() + return + case <-sleepTimer.C: + } } - logger.Infof("group %q started with delay %v; interval=%v; concurrency=%d", time.Duration(randSleep), g.Name, g.Interval, g.Concurrency) + + logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) e := &executor{querier, nts, rw} t := time.NewTicker(g.Interval) defer t.Stop() diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index f664868ca..c18f8d487 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -10,6 +10,12 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" ) +func init() { + // Disable rand sleep on group start during tests in order to speed up test execution. + // Rand sleep is needed only in prod code. + skipRandSleepOnGroupStart = true +} + func TestUpdateWith(t *testing.T) { testCases := []struct { name string