From c6c7843e93e53bb6cd37ab66b42338bee21c2447 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 25 Feb 2020 19:57:47 +0200 Subject: [PATCH] app/vmagent: add `-remoteWrite.maxBlockSize` command-line flag for limiting the maximum size of unpacked block to send to remote storage --- app/vmagent/remotewrite/pendingseries.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index e5895e238..3b64d3212 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -13,8 +13,12 @@ import ( "github.com/golang/snappy" ) -var flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ - "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage") +var ( + flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ + "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage") + maxUnpackedBlockSize = flag.Int("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ + "It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics") +) // the maximum number of rows to send per each block. const maxRowsPerBlock = 10000 @@ -160,17 +164,21 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt } bb := writeRequestBufPool.Get() bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) - if len(bb.B) <= persistentqueue.MaxBlockSize { + if len(bb.B) <= *maxUnpackedBlockSize { zb := snappyBufPool.Get() zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B) writeRequestBufPool.Put(bb) - pushBlock(zb.B) - blockSizeRows.Update(float64(len(wr.Timeseries))) - blockSizeBytes.Update(float64(len(zb.B))) + if len(zb.B) <= persistentqueue.MaxBlockSize { + pushBlock(zb.B) + blockSizeRows.Update(float64(len(wr.Timeseries))) + blockSizeBytes.Update(float64(len(zb.B))) + snappyBufPool.Put(zb) + return + } snappyBufPool.Put(zb) - return + } else { + writeRequestBufPool.Put(bb) } - writeRequestBufPool.Put(bb) // Too big block. Recursively split it into smaller parts. timeseries := wr.Timeseries