app/vmagent/remotewrite: limit the concurrency for marshaling time series before sending them to remote storage

There is no sense in running more than GOMAXPROCS concurrent marshalers,
since they are CPU-bound. More concurrent marshalers do not increase the marshaling bandwidth,
but they may result in more RAM usage.
This commit is contained in:
Aliaksandr Valialkin 2024-01-30 12:18:17 +02:00
parent 61562cdee9
commit 3b18659487
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -109,11 +110,12 @@ type writeRequest struct {
wr prompbmarshal.WriteRequest wr prompbmarshal.WriteRequest
tss []prompbmarshal.TimeSeries tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label labels []prompbmarshal.Label
samples []prompbmarshal.Sample samples []prompbmarshal.Sample
buf []byte
// buf holds labels data
buf []byte
} }
func (wr *writeRequest) reset() { func (wr *writeRequest) reset() {
@ -224,33 +226,45 @@ func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
wr.buf = buf wr.buf = buf
} }
// marshalConcurrency limits the maximum number of concurrent workers, which marshal and compress WriteRequest.
var marshalConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block []byte) bool, isVMRemoteWrite bool) bool { func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block []byte) bool, isVMRemoteWrite bool) bool {
if len(wr.Timeseries) == 0 { if len(wr.Timeseries) == 0 {
// Nothing to push // Nothing to push
return true return true
} }
marshalConcurrencyCh <- struct{}{}
bb := writeRequestBufPool.Get() bb := writeRequestBufPool.Get()
bb.B = wr.MarshalProtobuf(bb.B[:0]) bb.B = wr.MarshalProtobuf(bb.B[:0])
if len(bb.B) <= maxUnpackedBlockSize.IntN() { if len(bb.B) <= maxUnpackedBlockSize.IntN() {
zb := snappyBufPool.Get() zb := compressBufPool.Get()
if isVMRemoteWrite { if isVMRemoteWrite {
zb.B = zstd.CompressLevel(zb.B[:0], bb.B, *vmProtoCompressLevel) zb.B = zstd.CompressLevel(zb.B[:0], bb.B, *vmProtoCompressLevel)
} else { } else {
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B) zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
} }
writeRequestBufPool.Put(bb) writeRequestBufPool.Put(bb)
<-marshalConcurrencyCh
if len(zb.B) <= persistentqueue.MaxBlockSize { if len(zb.B) <= persistentqueue.MaxBlockSize {
if !tryPushBlock(zb.B) { zbLen := len(zb.B)
return false ok := tryPushBlock(zb.B)
compressBufPool.Put(zb)
if ok {
blockSizeRows.Update(float64(len(wr.Timeseries)))
blockSizeBytes.Update(float64(zbLen))
} }
blockSizeRows.Update(float64(len(wr.Timeseries))) return ok
blockSizeBytes.Update(float64(len(zb.B)))
snappyBufPool.Put(zb)
return true
} }
snappyBufPool.Put(zb) compressBufPool.Put(zb)
} else { } else {
writeRequestBufPool.Put(bb) writeRequestBufPool.Put(bb)
<-marshalConcurrencyCh
} }
// Too big block. Recursively split it into smaller parts if possible. // Too big block. Recursively split it into smaller parts if possible.
@ -296,5 +310,7 @@ var (
blockSizeRows = metrics.NewHistogram(`vmagent_remotewrite_block_size_rows`) blockSizeRows = metrics.NewHistogram(`vmagent_remotewrite_block_size_rows`)
) )
var writeRequestBufPool bytesutil.ByteBufferPool var (
var snappyBufPool bytesutil.ByteBufferPool writeRequestBufPool bytesutil.ByteBufferPool
compressBufPool bytesutil.ByteBufferPool
)