feat: spread load of rule evaluation by group when starting new groups (#724)

* feat: spread load of rule evaluation by group when starting new groups

* review: reduce the resulting diff.

* Update app/vmalert/group.go

Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>

Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com>
Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
This commit is contained in:
DexterZhang 2020-09-03 05:58:54 +08:00 committed by GitHub
parent f41b36bb9a
commit 3f932c2db1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"strconv"
"sync" "sync"
"time" "time"
@ -14,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
) )
// Group is an entity for grouping rules // Group is an entity for grouping rules
@ -182,7 +184,17 @@ func (g *Group) close() {
func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) {
defer func() { close(g.finishedCh) }() defer func() { close(g.finishedCh) }()
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) // This should spread load of rule evaluation by group
h := uint32(xxhash.Sum64([]byte(strconv.FormatUint(g.ID(), 10))))
randSleep := uint64(float64(g.Interval) * (float64(h) / (1 << 32)))
sleeper := time.NewTimer(time.Duration(randSleep))
select {
case <-g.finishedCh:
sleeper.Stop()
return
case <-sleeper.C:
}
logger.Infof("group %q started with delay %v; interval=%v; concurrency=%d", time.Duration(randSleep), g.Name, g.Interval, g.Concurrency)
e := &executor{querier, nts, rw} e := &executor{querier, nts, rw}
t := time.NewTicker(g.Interval) t := time.NewTicker(g.Interval)
defer t.Stop() defer t.Stop()