diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 843f9752f..c1c320d1e 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -186,6 +186,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}) { t.Reset(time.Second) } + t.Stop() } func rerouteWorker(stopCh <-chan struct{}) { @@ -209,6 +210,7 @@ func rerouteWorker(stopCh <-chan struct{}) { } t.Reset(time.Second) } + t.Stop() } // storageNode is a client sending data to vmstorage node. @@ -310,6 +312,7 @@ func InitStorageNodes(addrs []string) { }(addr) } + reroutedBufMaxSize = memory.Allowed() / 8 rerouteWorkerWG.Add(1) go func() { rerouteWorker(rerouteWorkerStopCh) @@ -327,7 +330,6 @@ func Stop() { } func addToReroutedBuf(buf []byte, rows int) error { - reroutedBufMaxSize := memory.Allowed() / 8 reroutedLock.Lock() defer reroutedLock.Unlock() if len(reroutedBuf)+len(buf) > reroutedBufMaxSize { @@ -358,7 +360,7 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) { // Try returning the the data to reroutedBuf if it has enough free space. recovered := false reroutedLock.Lock() - if len(swapBuf)+len(reroutedBuf) <= consts.MaxInsertPacketSize { + if len(swapBuf)+len(reroutedBuf) <= reroutedBufMaxSize { swapBuf = append(swapBuf, reroutedBuf...) reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0] reroutedRows += rows @@ -409,7 +411,7 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) { rowsRemaining := rows - rowsProcessed recovered := false reroutedLock.Lock() - if len(rowBuf)+len(tail)+len(reroutedBuf) <= consts.MaxInsertPacketSize { + if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize { swapBuf = append(swapBuf[:0], rowBuf...) swapBuf = append(swapBuf, tail...) swapBuf = append(swapBuf, reroutedBuf...) @@ -435,9 +437,10 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) { } var ( - reroutedLock sync.Mutex - reroutedBuf []byte - reroutedRows int + reroutedLock sync.Mutex + reroutedBuf []byte + reroutedRows int + reroutedBufMaxSize int reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`) reroutedBufOverflows = metrics.NewCounter(`vm_rpc_rerouted_buf_overflows_total{name="vminsert"}`)