diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 0a50e6dbb..19e4e2b6d 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -27,8 +27,8 @@ var disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable // // rows is the number of rows in the buf. func (sn *storageNode) push(buf []byte, rows int) error { - if len(buf) > consts.MaxInsertPacketSize { - logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), consts.MaxInsertPacketSize) + if len(buf) > maxBufSizePerStorageNode { + logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode) } sn.rowsPushed.Add(rows) @@ -45,7 +45,7 @@ func (sn *storageNode) push(buf []byte, rows int) error { return nil } - if len(sn.buf)+len(buf) <= consts.MaxInsertPacketSize { + if len(sn.buf)+len(buf) <= maxBufSizePerStorageNode { // Fast path: the buf contents fits sn.buf. sn.buf = append(sn.buf, buf...) sn.rows += rows @@ -75,8 +75,8 @@ func (sn *storageNode) sendReroutedRow(buf []byte) error { if sn.broken { return errBrokenStorageNode } - if len(sn.buf)+len(buf) > consts.MaxInsertPacketSize { - return fmt.Errorf("cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes", len(sn.buf)+len(buf), consts.MaxInsertPacketSize) + if len(sn.buf)+len(buf) > maxBufSizePerStorageNode { + return fmt.Errorf("cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes", len(sn.buf)+len(buf), maxBufSizePerStorageNode) } sn.buf = append(sn.buf, buf...) sn.rows++ @@ -340,6 +340,10 @@ func InitStorageNodes(addrs []string) { }(addr) } + maxBufSizePerStorageNode = memory.Allowed() / 2 / len(storageNodes) + if maxBufSizePerStorageNode > consts.MaxInsertPacketSize { + maxBufSizePerStorageNode = consts.MaxInsertPacketSize + } reroutedBufMaxSize = memory.Allowed() / 16 rerouteWorkerWG.Add(1) go func() { @@ -452,6 +456,8 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) { } var ( + maxBufSizePerStorageNode int + reroutedLock sync.Mutex reroutedBuf []byte reroutedRows int