vmalert: protect executor's field from concurrent access (#2387)

Executor recently gain field for storing previously sent series.
Since the same executor object can be used in multiple goroutines,
the access to this field should be serialized.

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-03-30 12:37:27 +02:00 committed by Aliaksandr Valialkin
parent ab10178c85
commit 7aa9d0f5f6
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -339,6 +339,7 @@ type executor struct {
notifiers func() []notifier.Notifier notifiers func() []notifier.Notifier
rw *remotewrite.Client rw *remotewrite.Client
previouslySentSeriesToRWMu sync.Mutex
// previouslySentSeriesToRW stores series sent to RW on previous iteration // previouslySentSeriesToRW stores series sent to RW on previous iteration
// map[ruleID]map[ruleLabels][]prompb.Label // map[ruleID]map[ruleLabels][]prompb.Label
// where `ruleID` is ID of the Rule within a Group // where `ruleID` is ID of the Rule within a Group
@ -430,7 +431,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur
// getStaledSeries checks whether there are stale series from previously sent ones. // getStaledSeries checks whether there are stale series from previously sent ones.
func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries { func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries {
ruleLabels := make(map[string][]prompbmarshal.Label) ruleLabels := make(map[string][]prompbmarshal.Label, len(tss))
for _, ts := range tss { for _, ts := range tss {
// convert labels to strings so we can compare with previously sent series // convert labels to strings so we can compare with previously sent series
key := labelsToString(ts.Labels) key := labelsToString(ts.Labels)
@ -440,6 +441,7 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim
rID := rule.ID() rID := rule.ID()
var staleS []prompbmarshal.TimeSeries var staleS []prompbmarshal.TimeSeries
// check whether there are series which disappeared and need to be marked as stale // check whether there are series which disappeared and need to be marked as stale
e.previouslySentSeriesToRWMu.Lock()
for key, labels := range e.previouslySentSeriesToRW[rID] { for key, labels := range e.previouslySentSeriesToRW[rID] {
if _, ok := ruleLabels[key]; ok { if _, ok := ruleLabels[key]; ok {
continue continue
@ -450,6 +452,7 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim
} }
// set previous series to current // set previous series to current
e.previouslySentSeriesToRW[rID] = ruleLabels e.previouslySentSeriesToRW[rID] = ruleLabels
e.previouslySentSeriesToRWMu.Unlock()
return staleS return staleS
} }