mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
app/vminsert: limit the summary buffer sizes for all the storage nodes to a half of the allowed memory
This commit is contained in:
parent
97feac596f
commit
4a82631e44
1 changed files with 11 additions and 5 deletions
|
@ -27,8 +27,8 @@ var disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable
|
||||||
//
|
//
|
||||||
// rows is the number of rows in the buf.
|
// rows is the number of rows in the buf.
|
||||||
func (sn *storageNode) push(buf []byte, rows int) error {
|
func (sn *storageNode) push(buf []byte, rows int) error {
|
||||||
if len(buf) > consts.MaxInsertPacketSize {
|
if len(buf) > maxBufSizePerStorageNode {
|
||||||
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), consts.MaxInsertPacketSize)
|
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode)
|
||||||
}
|
}
|
||||||
sn.rowsPushed.Add(rows)
|
sn.rowsPushed.Add(rows)
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func (sn *storageNode) push(buf []byte, rows int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(sn.buf)+len(buf) <= consts.MaxInsertPacketSize {
|
if len(sn.buf)+len(buf) <= maxBufSizePerStorageNode {
|
||||||
// Fast path: the buf contents fits sn.buf.
|
// Fast path: the buf contents fits sn.buf.
|
||||||
sn.buf = append(sn.buf, buf...)
|
sn.buf = append(sn.buf, buf...)
|
||||||
sn.rows += rows
|
sn.rows += rows
|
||||||
|
@ -75,8 +75,8 @@ func (sn *storageNode) sendReroutedRow(buf []byte) error {
|
||||||
if sn.broken {
|
if sn.broken {
|
||||||
return errBrokenStorageNode
|
return errBrokenStorageNode
|
||||||
}
|
}
|
||||||
if len(sn.buf)+len(buf) > consts.MaxInsertPacketSize {
|
if len(sn.buf)+len(buf) > maxBufSizePerStorageNode {
|
||||||
return fmt.Errorf("cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes", len(sn.buf)+len(buf), consts.MaxInsertPacketSize)
|
return fmt.Errorf("cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes", len(sn.buf)+len(buf), maxBufSizePerStorageNode)
|
||||||
}
|
}
|
||||||
sn.buf = append(sn.buf, buf...)
|
sn.buf = append(sn.buf, buf...)
|
||||||
sn.rows++
|
sn.rows++
|
||||||
|
@ -340,6 +340,10 @@ func InitStorageNodes(addrs []string) {
|
||||||
}(addr)
|
}(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maxBufSizePerStorageNode = memory.Allowed() / 2 / len(storageNodes)
|
||||||
|
if maxBufSizePerStorageNode > consts.MaxInsertPacketSize {
|
||||||
|
maxBufSizePerStorageNode = consts.MaxInsertPacketSize
|
||||||
|
}
|
||||||
reroutedBufMaxSize = memory.Allowed() / 16
|
reroutedBufMaxSize = memory.Allowed() / 16
|
||||||
rerouteWorkerWG.Add(1)
|
rerouteWorkerWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -452,6 +456,8 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
maxBufSizePerStorageNode int
|
||||||
|
|
||||||
reroutedLock sync.Mutex
|
reroutedLock sync.Mutex
|
||||||
reroutedBuf []byte
|
reroutedBuf []byte
|
||||||
reroutedRows int
|
reroutedRows int
|
||||||
|
|
Loading…
Reference in a new issue