mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vmalert: protect executor's field from concurrent access (#2387)
Executor recently gain field for storing previously sent series. Since the same executor object can be used in multiple goroutines, the access to this field should be serialized. Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
0989649ad0
commit
1354e6d712
1 changed files with 4 additions and 1 deletions
|
@ -339,6 +339,7 @@ type executor struct {
|
|||
notifiers func() []notifier.Notifier
|
||||
rw *remotewrite.Client
|
||||
|
||||
previouslySentSeriesToRWMu sync.Mutex
|
||||
// previouslySentSeriesToRW stores series sent to RW on previous iteration
|
||||
// map[ruleID]map[ruleLabels][]prompb.Label
|
||||
// where `ruleID` is ID of the Rule within a Group
|
||||
|
@ -430,7 +431,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur
|
|||
|
||||
// getStaledSeries checks whether there are stale series from previously sent ones.
|
||||
func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries {
|
||||
ruleLabels := make(map[string][]prompbmarshal.Label)
|
||||
ruleLabels := make(map[string][]prompbmarshal.Label, len(tss))
|
||||
for _, ts := range tss {
|
||||
// convert labels to strings so we can compare with previously sent series
|
||||
key := labelsToString(ts.Labels)
|
||||
|
@ -440,6 +441,7 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim
|
|||
rID := rule.ID()
|
||||
var staleS []prompbmarshal.TimeSeries
|
||||
// check whether there are series which disappeared and need to be marked as stale
|
||||
e.previouslySentSeriesToRWMu.Lock()
|
||||
for key, labels := range e.previouslySentSeriesToRW[rID] {
|
||||
if _, ok := ruleLabels[key]; ok {
|
||||
continue
|
||||
|
@ -450,6 +452,7 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim
|
|||
}
|
||||
// set previous series to current
|
||||
e.previouslySentSeriesToRW[rID] = ruleLabels
|
||||
e.previouslySentSeriesToRWMu.Unlock()
|
||||
|
||||
return staleS
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue