mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
feat: spread load of rule evaluation by group when starting new groups (#724)
* feat: spread load of rule evaluation by group when starting new groups * review: reduce the resulting diff. * Update app/vmalert/group.go Co-authored-by: Roman Khavronenko <hagen1778@gmail.com> Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com> Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
This commit is contained in:
parent
4fa97430d7
commit
85f49ad439
1 changed files with 13 additions and 1 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -14,6 +15,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
"github.com/cespare/xxhash/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Group is an entity for grouping rules
|
// Group is an entity for grouping rules
|
||||||
|
@ -182,7 +184,17 @@ func (g *Group) close() {
|
||||||
|
|
||||||
func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) {
|
func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) {
|
||||||
defer func() { close(g.finishedCh) }()
|
defer func() { close(g.finishedCh) }()
|
||||||
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
// This should spread load of rule evaluation by group
|
||||||
|
h := uint32(xxhash.Sum64([]byte(strconv.FormatUint(g.ID(), 10))))
|
||||||
|
randSleep := uint64(float64(g.Interval) * (float64(h) / (1 << 32)))
|
||||||
|
sleeper := time.NewTimer(time.Duration(randSleep))
|
||||||
|
select {
|
||||||
|
case <-g.finishedCh:
|
||||||
|
sleeper.Stop()
|
||||||
|
return
|
||||||
|
case <-sleeper.C:
|
||||||
|
}
|
||||||
|
logger.Infof("group %q started with delay %v; interval=%v; concurrency=%d", time.Duration(randSleep), g.Name, g.Interval, g.Concurrency)
|
||||||
e := &executor{querier, nts, rw}
|
e := &executor{querier, nts, rw}
|
||||||
t := time.NewTicker(g.Interval)
|
t := time.NewTicker(g.Interval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
Loading…
Reference in a new issue