diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 6adede560..f1d48b3dd 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -59,7 +59,6 @@ type relabelConfigs struct { // initLabelsGlobal must be called after parsing command-line flags. func initLabelsGlobal() { - // Init labelsGlobal labelsGlobal = nil for _, s := range *unparsedLabelsGlobal { n := strings.IndexByte(s, '=') diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 1646b1751..fdd1d021d 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -128,7 +128,7 @@ func Push(wr *prompbmarshal.WriteRequest) { } tss := wr.Timeseries for len(tss) > 0 { - // Process big tss in smaller blocks in order to reduce maxmimum memory usage + // Process big tss in smaller blocks in order to reduce the maximum memory usage tssBlock := tss if len(tssBlock) > maxRowsPerBlock { tssBlock = tss[:maxRowsPerBlock] @@ -162,8 +162,6 @@ type remoteWriteCtx struct { pss []*pendingSeries pssNextIdx uint64 - tss []prompbmarshal.TimeSeries - relabelMetricsDropped *metrics.Counter } @@ -208,15 +206,17 @@ func (rwctx *remoteWriteCtx) MustStop() { func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { var rctx *relabelCtx + var v *[]prompbmarshal.TimeSeries rcs := allRelabelConfigs.Load().(*relabelConfigs) prcs := rcs.perURL[rwctx.idx] if len(prcs) > 0 { + rctx = getRelabelCtx() // Make a copy of tss before applying relabeling in order to prevent // from affecting time series for other remoteWrite.url configs. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 for details. - rwctx.tss = append(rwctx.tss[:0], tss...) - tss = rwctx.tss - rctx = getRelabelCtx() + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 + // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599 + v = tssRelabelPool.Get().(*[]prompbmarshal.TimeSeries) + tss = append(*v, tss...) tssLen := len(tss) tss = rctx.applyRelabeling(tss, nil, prcs) rwctx.relabelMetricsDropped.Add(tssLen - len(tss)) @@ -225,8 +225,14 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) pss[idx].Push(tss) if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssRelabelPool.Put(v) putRelabelCtx(rctx) - // Zero rwctx.tss in order to free up GC references. - rwctx.tss = prompbmarshal.ResetTimeSeries(rwctx.tss) } } + +var tssRelabelPool = &sync.Pool{ + New: func() interface{} { + return []prompbmarshal.TimeSeries{} + }, +}