app/vminsert/netstorage: tune re-routing algorithm further

This commit is contained in:
Aliaksandr Valialkin 2022-02-07 14:35:39 +02:00
parent 1a5546006d
commit 5aee6eb406
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -71,7 +71,6 @@ func (sn *storageNode) push(buf []byte, rows int) error {
} }
func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error { func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error {
reroutesTotal.Inc()
sn.brLock.Lock() sn.brLock.Lock()
again: again:
select { select {
@ -534,6 +533,7 @@ func Stop() {
// //
// The function blocks until src is fully re-routed. // The function blocks until src is fully re-routed.
func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, error) { func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, error) {
reroutesTotal.Inc()
rowsProcessed := 0 rowsProcessed := 0
var idxsExclude, idxsExcludeNew []int var idxsExclude, idxsExcludeNew []int
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil) idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil)
@ -569,6 +569,7 @@ func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, err
} }
continue continue
} }
again:
if sn.trySendBuf(rowBuf, 1) { if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++ rowsProcessed++
if sn != snSource { if sn != snSource {
@ -578,7 +579,7 @@ func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, err
continue continue
} }
// If the re-routing is enabled, then try sending the row to another storage node. // If the re-routing is enabled, then try sending the row to another storage node.
idxsExcludeNew = getNotReadyStorageNodeIdxsBlocking(idxsExcludeNew[:0], sn) idxsExcludeNew = getNotReadyStorageNodeIdxs(idxsExcludeNew[:0], sn)
idx := nodesHash.getNodeIdx(h, idxsExcludeNew) idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
snNew := storageNodes[idx] snNew := storageNodes[idx]
if snNew.trySendBuf(rowBuf, 1) { if snNew.trySendBuf(rowBuf, 1) {
@ -589,15 +590,10 @@ func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, err
} }
continue continue
} }
// Fall back to sending the row to sn in order to minimize re-routing. // The row cannot be sent to both snSource and the re-routed sn without blocking.
if !sn.sendBufMayBlock(rowBuf) { // Sleep for a while and try sending the row to snSource again.
return rowsProcessed, fmt.Errorf("graceful shutdown started") time.Sleep(100 * time.Millisecond)
} goto again
rowsProcessed++
if sn != snSource {
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
} }
return rowsProcessed, nil return rowsProcessed, nil
} }
@ -610,9 +606,10 @@ func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, erro
if *disableRerouting { if *disableRerouting {
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes") logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
} }
reroutesTotal.Inc()
rowsProcessed := 0 rowsProcessed := 0
var idxsExclude []int var idxsExclude []int
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], snSource) idxsExclude = getNotReadyStorageNodeIdxs(idxsExclude[:0], snSource)
var mr storage.MetricRow var mr storage.MetricRow
for len(src) > 0 { for len(src) > 0 {
tail, err := mr.UnmarshalX(src) tail, err := mr.UnmarshalX(src)
@ -625,6 +622,7 @@ func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, erro
h := xxhash.Sum64(mr.MetricNameRaw) h := xxhash.Sum64(mr.MetricNameRaw)
mr.ResetX() mr.ResetX()
// Try sending the row to snSource in order to minimize re-routing. // Try sending the row to snSource in order to minimize re-routing.
again:
if snSource.trySendBuf(rowBuf, 1) { if snSource.trySendBuf(rowBuf, 1) {
rowsProcessed++ rowsProcessed++
continue continue
@ -638,7 +636,7 @@ func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, erro
break break
} }
// re-generate idxsExclude list, since sn must be put there. // re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], snSource) idxsExclude = getNotReadyStorageNodeIdxs(idxsExclude[:0], snSource)
} }
if sn.trySendBuf(rowBuf, 1) { if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++ rowsProcessed++
@ -646,11 +644,10 @@ func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, erro
sn.rowsReroutedToHere.Inc() sn.rowsReroutedToHere.Inc()
continue continue
} }
// Fall back sending the row to snSource in order to minimize re-routing. // The row cannot be sent to both snSource and the re-routed sn without blocking.
if !snSource.sendBufMayBlock(rowBuf) { // Sleep for a while and try sending the row to snSource again.
return rowsProcessed, fmt.Errorf("graceful shutdown started") time.Sleep(100 * time.Millisecond)
} goto again
rowsProcessed++
} }
return rowsProcessed, nil return rowsProcessed, nil
} }