From ed50b8792bfa8f4b2745b5002c2d4678e59a6901 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 11 Sep 2019 14:25:53 +0300 Subject: [PATCH] app/vminsert/netstorage: reduce the maximum buffer size for rerouted rows, so it occupies less RAM --- app/vminsert/netstorage/netstorage.go | 49 ++++++++++++++------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 76b89f6de8..fa75703bc5 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -107,14 +107,14 @@ func (sn *storageNode) flushBufLocked() error { } func (sn *storageNode) sendBufLocked(buf []byte) error { + if len(buf) == 0 { + return nil + } if sn.bc == nil { if err := sn.dial(); err != nil { return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err) } } - if len(buf) == 0 { - return nil - } timeoutSeconds := len(buf) / 1e6 if timeoutSeconds < 10 { timeoutSeconds = 10 @@ -317,7 +317,7 @@ func InitStorageNodes(addrs []string) { }(addr) } - reroutedBufMaxSize = memory.Allowed() / 8 + reroutedBufMaxSize = memory.Allowed() / 16 rerouteWorkerWG.Add(1) go func() { rerouteWorker(rerouteWorkerStopCh) @@ -396,27 +396,28 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) { idx = 0 } attempts++ - if attempts == len(healthyStorageNodes) { - // There are no healthy nodes. - // Try returning the remaining data to reroutedBuf if it has enough free space. - rowsRemaining := rows - rowsProcessed - recovered := false - reroutedLock.Lock() - if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize { - swapBuf = append(swapBuf[:0], rowBuf...) - swapBuf = append(swapBuf, tail...) - swapBuf = append(swapBuf, reroutedBuf...) - reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0] - reroutedRows += rowsRemaining - recovered = true - } - reroutedLock.Unlock() - if recovered { - return swapBuf, nil - } - rowsLostTotal.Add(rowsRemaining) - return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s", len(storageNodes), rowsRemaining, err) + if attempts < len(healthyStorageNodes) { + continue } + // There are no healthy nodes. + // Try returning the remaining data to reroutedBuf if it has enough free space. + rowsRemaining := rows - rowsProcessed + recovered := false + reroutedLock.Lock() + if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize { + swapBuf = append(swapBuf[:0], rowBuf...) + swapBuf = append(swapBuf, tail...) + swapBuf = append(swapBuf, reroutedBuf...) + reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0] + reroutedRows += rowsRemaining + recovered = true + } + reroutedLock.Unlock() + if recovered { + return swapBuf, nil + } + rowsLostTotal.Add(rowsRemaining) + return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s", len(storageNodes), rowsRemaining, err) } rowsProcessed++ }