app/vminsert: properly limit the size of reroutedBuf

This commit is contained in:
Aliaksandr Valialkin 2019-08-23 10:29:40 +03:00
parent c50975e12d
commit a5dc54efc3

View file

@ -186,6 +186,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}) {
t.Reset(time.Second) t.Reset(time.Second)
} }
t.Stop()
} }
func rerouteWorker(stopCh <-chan struct{}) { func rerouteWorker(stopCh <-chan struct{}) {
@ -209,6 +210,7 @@ func rerouteWorker(stopCh <-chan struct{}) {
} }
t.Reset(time.Second) t.Reset(time.Second)
} }
t.Stop()
} }
// storageNode is a client sending data to vmstorage node. // storageNode is a client sending data to vmstorage node.
@ -310,6 +312,7 @@ func InitStorageNodes(addrs []string) {
}(addr) }(addr)
} }
reroutedBufMaxSize = memory.Allowed() / 8
rerouteWorkerWG.Add(1) rerouteWorkerWG.Add(1)
go func() { go func() {
rerouteWorker(rerouteWorkerStopCh) rerouteWorker(rerouteWorkerStopCh)
@ -327,7 +330,6 @@ func Stop() {
} }
func addToReroutedBuf(buf []byte, rows int) error { func addToReroutedBuf(buf []byte, rows int) error {
reroutedBufMaxSize := memory.Allowed() / 8
reroutedLock.Lock() reroutedLock.Lock()
defer reroutedLock.Unlock() defer reroutedLock.Unlock()
if len(reroutedBuf)+len(buf) > reroutedBufMaxSize { if len(reroutedBuf)+len(buf) > reroutedBufMaxSize {
@ -358,7 +360,7 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
// Try returning the the data to reroutedBuf if it has enough free space. // Try returning the the data to reroutedBuf if it has enough free space.
recovered := false recovered := false
reroutedLock.Lock() reroutedLock.Lock()
if len(swapBuf)+len(reroutedBuf) <= consts.MaxInsertPacketSize { if len(swapBuf)+len(reroutedBuf) <= reroutedBufMaxSize {
swapBuf = append(swapBuf, reroutedBuf...) swapBuf = append(swapBuf, reroutedBuf...)
reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0] reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0]
reroutedRows += rows reroutedRows += rows
@ -409,7 +411,7 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
rowsRemaining := rows - rowsProcessed rowsRemaining := rows - rowsProcessed
recovered := false recovered := false
reroutedLock.Lock() reroutedLock.Lock()
if len(rowBuf)+len(tail)+len(reroutedBuf) <= consts.MaxInsertPacketSize { if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize {
swapBuf = append(swapBuf[:0], rowBuf...) swapBuf = append(swapBuf[:0], rowBuf...)
swapBuf = append(swapBuf, tail...) swapBuf = append(swapBuf, tail...)
swapBuf = append(swapBuf, reroutedBuf...) swapBuf = append(swapBuf, reroutedBuf...)
@ -435,9 +437,10 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
} }
var ( var (
reroutedLock sync.Mutex reroutedLock sync.Mutex
reroutedBuf []byte reroutedBuf []byte
reroutedRows int reroutedRows int
reroutedBufMaxSize int
reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`) reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`)
reroutedBufOverflows = metrics.NewCounter(`vm_rpc_rerouted_buf_overflows_total{name="vminsert"}`) reroutedBufOverflows = metrics.NewCounter(`vm_rpc_rerouted_buf_overflows_total{name="vminsert"}`)