From cc39c9d74b63e4c00dc08988efd73a9cb268a709 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 28 Feb 2020 18:57:45 +0200 Subject: [PATCH] app/vmagent/remotewrite: limit memory usage when big scrape blocks are pushed to remote storage --- app/vmagent/remotewrite/relabel.go | 7 +++---- app/vmagent/remotewrite/remotewrite.go | 17 ++++++++++++----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index b6fe3d097f..42a28e5f75 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -50,12 +50,11 @@ func resetRelabel() { prcs = nil } -func (rctx *relabelCtx) applyRelabeling(wr *prompbmarshal.WriteRequest) { +func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { if len(extraLabels) == 0 && len(prcs) == 0 { // Nothing to change. - return + return tss } - tss := wr.Timeseries tssDst := tss[:0] labels := rctx.labels[:0] for i := range tss { @@ -83,7 +82,7 @@ func (rctx *relabelCtx) applyRelabeling(wr *prompbmarshal.WriteRequest) { }) } rctx.labels = labels - wr.Timeseries = tssDst + return tssDst } type relabelCtx struct { diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 8bb927c714..9f0dedc1a7 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -105,11 +105,18 @@ func Stop() { // Each timeseries in wr.Timeseries must contain one sample. func Push(wr *prompbmarshal.WriteRequest) { rctx := relabelCtxPool.Get().(*relabelCtx) - rctx.applyRelabeling(wr) - - idx := atomic.AddUint64(&pssNextIdx, 1) % uint64(len(pss)) - pss[idx].Push(wr.Timeseries) - + tss := wr.Timeseries + for len(tss) > 0 { + // Process big tss in smaller blocks in order to reduce maxmimum memory usage + tssBlock := tss + if len(tssBlock) > maxRowsPerBlock { + tssBlock = tss[:maxRowsPerBlock] + tss = tss[maxRowsPerBlock:] + } + tssBlock = rctx.applyRelabeling(tssBlock) + idx := atomic.AddUint64(&pssNextIdx, 1) % uint64(len(pss)) + pss[idx].Push(tssBlock) + } rctx.reset() relabelCtxPool.Put(rctx) }