diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 6a5df13dc..a15308996 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -87,6 +87,8 @@ func Stop() { // Push sends wr to remote storage systems set via `-remoteWrite.url`. // // Each timeseries in wr.Timeseries must contain one sample. +// +// Note that wr may be modified by Push due to relabeling. func Push(wr *prompbmarshal.WriteRequest) { var rctx *relabelCtx if len(prcsGlobal) > 0 || len(labelsGlobal) > 0 { @@ -128,6 +130,8 @@ type remoteWriteCtx struct { pss []*pendingSeries pssNextIdx uint64 + tss []prompbmarshal.TimeSeries + relabelMetricsDropped *metrics.Counter } @@ -181,6 +185,11 @@ func (rwctx *remoteWriteCtx) MustStop() { func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { var rctx *relabelCtx if len(rwctx.prcs) > 0 { + // Make a copy of tss before applying relabeling in order to prevent + // from affecting time series for other remoteWrite.url configs. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467 for details. + rwctx.tss = append(rwctx.tss[:0], tss...) + tss = rwctx.tss rctx = getRelabelCtx() tssLen := len(tss) tss = rctx.applyRelabeling(tss, nil, rwctx.prcs) @@ -191,5 +200,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { pss[idx].Push(tss) if rctx != nil { putRelabelCtx(rctx) + // Zero rwctx.tss in order to free up GC references. + rwctx.tss = prompbmarshal.ResetTimeSeries(rwctx.tss) } } diff --git a/lib/prompbmarshal/util.go b/lib/prompbmarshal/util.go index 72d45e954..2dcbc5f81 100644 --- a/lib/prompbmarshal/util.go +++ b/lib/prompbmarshal/util.go @@ -21,10 +21,15 @@ func MarshalWriteRequest(dst []byte, wr *WriteRequest) []byte { // ResetWriteRequest resets wr. func ResetWriteRequest(wr *WriteRequest) { - for i := range wr.Timeseries { - ts := wr.Timeseries[i] + wr.Timeseries = ResetTimeSeries(wr.Timeseries) +} + +// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use. +func ResetTimeSeries(tss []TimeSeries) []TimeSeries { + for i := range tss { + ts := tss[i] ts.Labels = nil ts.Samples = nil } - wr.Timeseries = wr.Timeseries[:0] + return tss[:0] }