From 032c88561b47380ac388b005b9a4fdea49f8e9a5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 3 Mar 2020 19:58:45 +0200 Subject: [PATCH] app/vminsert/prompush: limit memory usage by pushing promscrape data in smaller blocks --- app/vminsert/prompush/push.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/app/vminsert/prompush/push.go b/app/vminsert/prompush/push.go index 6235a7c9a9..b4734b7cbc 100644 --- a/app/vminsert/prompush/push.go +++ b/app/vminsert/prompush/push.go @@ -17,22 +17,38 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="promscrape"}`) ) -// Push pushes wr to to storage. +const maxRowsPerBlock = 10000 + +// Push pushes wr to storage. func Push(wr *prompbmarshal.WriteRequest) { ctx := getPushCtx() defer putPushCtx(ctx) - timeseries := wr.Timeseries + tss := wr.Timeseries + for len(tss) > 0 { + // Process big tss in smaller blocks in order to reduce maxmimum memory usage + tssBlock := tss + if len(tssBlock) > maxRowsPerBlock { + tssBlock = tss[:maxRowsPerBlock] + tss = tss[maxRowsPerBlock:] + } else { + tss = nil + } + ctx.push(tssBlock) + } +} + +func (ctx *pushCtx) push(tss []prompbmarshal.TimeSeries) { rowsLen := 0 - for i := range timeseries { - rowsLen += len(timeseries[i].Samples) + for i := range tss { + rowsLen += len(tss[i].Samples) } ic := &ctx.Common ic.Reset(rowsLen) rowsTotal := 0 labels := ctx.labels[:0] - for i := range timeseries { - ts := ×eries[i] + for i := range tss { + ts := &tss[i] labels = labels[:0] for j := range ts.Labels { label := &ts.Labels[j]