mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vminsert/netstorage: reduce the maximum buffer size for rerouted rows, so it occupies less RAM
This commit is contained in:
parent
b101064f8b
commit
ed50b8792b
1 changed files with 25 additions and 24 deletions
|
@ -107,14 +107,14 @@ func (sn *storageNode) flushBufLocked() error {
|
|||
}
|
||||
|
||||
func (sn *storageNode) sendBufLocked(buf []byte) error {
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
if sn.bc == nil {
|
||||
if err := sn.dial(); err != nil {
|
||||
return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err)
|
||||
}
|
||||
}
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
timeoutSeconds := len(buf) / 1e6
|
||||
if timeoutSeconds < 10 {
|
||||
timeoutSeconds = 10
|
||||
|
@ -317,7 +317,7 @@ func InitStorageNodes(addrs []string) {
|
|||
}(addr)
|
||||
}
|
||||
|
||||
reroutedBufMaxSize = memory.Allowed() / 8
|
||||
reroutedBufMaxSize = memory.Allowed() / 16
|
||||
rerouteWorkerWG.Add(1)
|
||||
go func() {
|
||||
rerouteWorker(rerouteWorkerStopCh)
|
||||
|
@ -396,27 +396,28 @@ func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
|
|||
idx = 0
|
||||
}
|
||||
attempts++
|
||||
if attempts == len(healthyStorageNodes) {
|
||||
// There are no healthy nodes.
|
||||
// Try returning the remaining data to reroutedBuf if it has enough free space.
|
||||
rowsRemaining := rows - rowsProcessed
|
||||
recovered := false
|
||||
reroutedLock.Lock()
|
||||
if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize {
|
||||
swapBuf = append(swapBuf[:0], rowBuf...)
|
||||
swapBuf = append(swapBuf, tail...)
|
||||
swapBuf = append(swapBuf, reroutedBuf...)
|
||||
reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0]
|
||||
reroutedRows += rowsRemaining
|
||||
recovered = true
|
||||
}
|
||||
reroutedLock.Unlock()
|
||||
if recovered {
|
||||
return swapBuf, nil
|
||||
}
|
||||
rowsLostTotal.Add(rowsRemaining)
|
||||
return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s", len(storageNodes), rowsRemaining, err)
|
||||
if attempts < len(healthyStorageNodes) {
|
||||
continue
|
||||
}
|
||||
// There are no healthy nodes.
|
||||
// Try returning the remaining data to reroutedBuf if it has enough free space.
|
||||
rowsRemaining := rows - rowsProcessed
|
||||
recovered := false
|
||||
reroutedLock.Lock()
|
||||
if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize {
|
||||
swapBuf = append(swapBuf[:0], rowBuf...)
|
||||
swapBuf = append(swapBuf, tail...)
|
||||
swapBuf = append(swapBuf, reroutedBuf...)
|
||||
reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0]
|
||||
reroutedRows += rowsRemaining
|
||||
recovered = true
|
||||
}
|
||||
reroutedLock.Unlock()
|
||||
if recovered {
|
||||
return swapBuf, nil
|
||||
}
|
||||
rowsLostTotal.Add(rowsRemaining)
|
||||
return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s", len(storageNodes), rowsRemaining, err)
|
||||
}
|
||||
rowsProcessed++
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue