diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 5fa35f5110..f2b3c8341b 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -7,6 +7,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -109,11 +110,12 @@ type writeRequest struct { wr prompbmarshal.WriteRequest - tss []prompbmarshal.TimeSeries - + tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label samples []prompbmarshal.Sample - buf []byte + + // buf holds labels data + buf []byte } func (wr *writeRequest) reset() { @@ -224,33 +226,45 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) { wr.buf = buf } +// marshalConcurrency limits the maximum number of concurrent workers, which marshal and compress WriteRequest. +var marshalConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) + func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block []byte) bool, isVMRemoteWrite bool) bool { if len(wr.Timeseries) == 0 { // Nothing to push return true } + + marshalConcurrencyCh <- struct{}{} + bb := writeRequestBufPool.Get() bb.B = wr.MarshalProtobuf(bb.B[:0]) if len(bb.B) <= maxUnpackedBlockSize.IntN() { - zb := snappyBufPool.Get() + zb := compressBufPool.Get() if isVMRemoteWrite { zb.B = zstd.CompressLevel(zb.B[:0], bb.B, *vmProtoCompressLevel) } else { zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B) } writeRequestBufPool.Put(bb) + + <-marshalConcurrencyCh + if len(zb.B) <= persistentqueue.MaxBlockSize { - if !tryPushBlock(zb.B) { - return false + zbLen := len(zb.B) + ok := tryPushBlock(zb.B) + compressBufPool.Put(zb) + if ok { + blockSizeRows.Update(float64(len(wr.Timeseries))) + blockSizeBytes.Update(float64(zbLen)) } - blockSizeRows.Update(float64(len(wr.Timeseries))) - blockSizeBytes.Update(float64(len(zb.B))) - snappyBufPool.Put(zb) - return true + return ok } - snappyBufPool.Put(zb) + compressBufPool.Put(zb) } else { writeRequestBufPool.Put(bb) + + <-marshalConcurrencyCh } // Too big block. Recursively split it into smaller parts if possible. @@ -296,5 +310,7 @@ var ( blockSizeRows = metrics.NewHistogram(`vmagent_remotewrite_block_size_rows`) ) -var writeRequestBufPool bytesutil.ByteBufferPool -var snappyBufPool bytesutil.ByteBufferPool +var ( + writeRequestBufPool bytesutil.ByteBufferPool + compressBufPool bytesutil.ByteBufferPool +)