mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent: fix data race when multiple -remoteWrite.urlRelabelConfig
options are set
Previously multiple goroutines could access remoteWriteCtx.tss concurrently, which could lead to data race and improper relabeling. Now each goroutine has its own copy of tss during relabeling. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
This commit is contained in:
parent
508ad46e0e
commit
a730b3f6a1
2 changed files with 15 additions and 10 deletions
|
@ -59,7 +59,6 @@ type relabelConfigs struct {
|
|||
|
||||
// initLabelsGlobal must be called after parsing command-line flags.
|
||||
func initLabelsGlobal() {
|
||||
// Init labelsGlobal
|
||||
labelsGlobal = nil
|
||||
for _, s := range *unparsedLabelsGlobal {
|
||||
n := strings.IndexByte(s, '=')
|
||||
|
|
|
@ -128,7 +128,7 @@ func Push(wr *prompbmarshal.WriteRequest) {
|
|||
}
|
||||
tss := wr.Timeseries
|
||||
for len(tss) > 0 {
|
||||
// Process big tss in smaller blocks in order to reduce maxmimum memory usage
|
||||
// Process big tss in smaller blocks in order to reduce the maximum memory usage
|
||||
tssBlock := tss
|
||||
if len(tssBlock) > maxRowsPerBlock {
|
||||
tssBlock = tss[:maxRowsPerBlock]
|
||||
|
@ -162,8 +162,6 @@ type remoteWriteCtx struct {
|
|||
pss []*pendingSeries
|
||||
pssNextIdx uint64
|
||||
|
||||
tss []prompbmarshal.TimeSeries
|
||||
|
||||
relabelMetricsDropped *metrics.Counter
|
||||
}
|
||||
|
||||
|
@ -208,15 +206,17 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
|||
|
||||
func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
|
||||
var rctx *relabelCtx
|
||||
var v *[]prompbmarshal.TimeSeries
|
||||
rcs := allRelabelConfigs.Load().(*relabelConfigs)
|
||||
prcs := rcs.perURL[rwctx.idx]
|
||||
if len(prcs) > 0 {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 for details.
|
||||
rwctx.tss = append(rwctx.tss[:0], tss...)
|
||||
tss = rwctx.tss
|
||||
rctx = getRelabelCtx()
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
v = tssRelabelPool.Get().(*[]prompbmarshal.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
tssLen := len(tss)
|
||||
tss = rctx.applyRelabeling(tss, nil, prcs)
|
||||
rwctx.relabelMetricsDropped.Add(tssLen - len(tss))
|
||||
|
@ -225,8 +225,14 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
|
|||
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
|
||||
pss[idx].Push(tss)
|
||||
if rctx != nil {
|
||||
*v = prompbmarshal.ResetTimeSeries(tss)
|
||||
tssRelabelPool.Put(v)
|
||||
putRelabelCtx(rctx)
|
||||
// Zero rwctx.tss in order to free up GC references.
|
||||
rwctx.tss = prompbmarshal.ResetTimeSeries(rwctx.tss)
|
||||
}
|
||||
}
|
||||
|
||||
var tssRelabelPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return []prompbmarshal.TimeSeries{}
|
||||
},
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue