diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index b6fe3d097..42a28e5f7 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -50,12 +50,11 @@ func resetRelabel() { prcs = nil } -func (rctx *relabelCtx) applyRelabeling(wr *prompbmarshal.WriteRequest) { +func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { if len(extraLabels) == 0 && len(prcs) == 0 { // Nothing to change. - return + return tss } - tss := wr.Timeseries tssDst := tss[:0] labels := rctx.labels[:0] for i := range tss { @@ -83,7 +82,7 @@ func (rctx *relabelCtx) applyRelabeling(wr *prompbmarshal.WriteRequest) { }) } rctx.labels = labels - wr.Timeseries = tssDst + return tssDst } type relabelCtx struct { diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 8bb927c71..9f0dedc1a 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -105,11 +105,18 @@ func Stop() { // Each timeseries in wr.Timeseries must contain one sample. func Push(wr *prompbmarshal.WriteRequest) { rctx := relabelCtxPool.Get().(*relabelCtx) - rctx.applyRelabeling(wr) - - idx := atomic.AddUint64(&pssNextIdx, 1) % uint64(len(pss)) - pss[idx].Push(wr.Timeseries) - + tss := wr.Timeseries + for len(tss) > 0 { + // Process big tss in smaller blocks in order to reduce maxmimum memory usage + tssBlock := tss + if len(tssBlock) > maxRowsPerBlock { + tssBlock = tss[:maxRowsPerBlock] + tss = tss[maxRowsPerBlock:] + } + tssBlock = rctx.applyRelabeling(tssBlock) + idx := atomic.AddUint64(&pssNextIdx, 1) % uint64(len(pss)) + pss[idx].Push(tssBlock) + } rctx.reset() relabelCtxPool.Put(rctx) }