mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
app/vminsert/netstorage: prevent from infinite loop when the re-routing is enabled and all the vmstorage nodes are unavailable
This commit is contained in:
parent
1109521806
commit
05f56c411e
2 changed files with 58 additions and 36 deletions
|
@ -28,6 +28,13 @@ func (rh *consistentHash) getNodeIdx(h uint64, excludeIdxs []int) int {
|
|||
var mMax uint64
|
||||
var idx int
|
||||
h ^= rh.hashSeed
|
||||
|
||||
if len(excludeIdxs) == len(rh.nodeHashes) {
|
||||
// All the nodes are excluded. Treat this case as no nodes are excluded.
|
||||
// This is better from load-balacning PoV than selecting some static node.
|
||||
excludeIdxs = nil
|
||||
}
|
||||
|
||||
next:
|
||||
for i, nh := range rh.nodeHashes {
|
||||
for _, j := range excludeIdxs {
|
||||
|
|
|
@ -620,7 +620,7 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
|
|||
var idxsExclude, idxsExcludeNew []int
|
||||
nodesHash := snb.nodesHash
|
||||
sns := snb.sns
|
||||
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
|
||||
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0])
|
||||
var mr storage.MetricRow
|
||||
for len(src) > 0 {
|
||||
tail, err := mr.UnmarshalX(src)
|
||||
|
@ -639,8 +639,14 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
|
|||
if sn.isReady() {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-sn.stopCh:
|
||||
return rowsProcessed, fmt.Errorf("graceful shutdown started")
|
||||
default:
|
||||
}
|
||||
|
||||
// re-generate idxsExclude list, since sn must be put there.
|
||||
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
|
||||
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0])
|
||||
}
|
||||
if *disableRerouting {
|
||||
if !sn.sendBufMayBlock(rowBuf) {
|
||||
|
@ -666,18 +672,17 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
|
|||
idxsExcludeNew = getNotReadyStorageNodeIdxs(snb, idxsExcludeNew[:0], sn)
|
||||
idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
|
||||
snNew := sns[idx]
|
||||
if snNew.trySendBuf(rowBuf, 1) {
|
||||
rowsProcessed++
|
||||
if snNew != snSource {
|
||||
snSource.rowsReroutedFromHere.Inc()
|
||||
snNew.rowsReroutedToHere.Inc()
|
||||
}
|
||||
continue
|
||||
if !snNew.trySendBuf(rowBuf, 1) {
|
||||
// The row cannot be sent to both snSource and to sn without blocking.
|
||||
// Sleep for a while and try sending the row to snSource again.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto again
|
||||
}
|
||||
rowsProcessed++
|
||||
if snNew != snSource {
|
||||
snSource.rowsReroutedFromHere.Inc()
|
||||
snNew.rowsReroutedToHere.Inc()
|
||||
}
|
||||
// The row cannot be sent to both snSource and the re-routed sn without blocking.
|
||||
// Sleep for a while and try sending the row to snSource again.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto again
|
||||
}
|
||||
return rowsProcessed, nil
|
||||
}
|
||||
|
@ -686,15 +691,19 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
|
|||
//
|
||||
// It is expected that snSource has no enough buffer for sending src.
|
||||
// It is expected than *dsableRerouting isn't set when calling this function.
|
||||
// It is expected that len(snb.sns) >= 2
|
||||
func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
|
||||
if *disableRerouting {
|
||||
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
|
||||
}
|
||||
sns := snb.sns
|
||||
if len(sns) < 2 {
|
||||
logger.Panicf("BUG: the number of storage nodes is too small for calling rerouteRowsToFreeStorageNodes: %d", len(sns))
|
||||
}
|
||||
reroutesTotal.Inc()
|
||||
rowsProcessed := 0
|
||||
var idxsExclude []int
|
||||
nodesHash := snb.nodesHash
|
||||
sns := snb.sns
|
||||
idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
|
||||
var mr storage.MetricRow
|
||||
for len(src) > 0 {
|
||||
|
@ -707,47 +716,53 @@ func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNod
|
|||
reroutedRowsProcessed.Inc()
|
||||
h := xxhash.Sum64(mr.MetricNameRaw)
|
||||
mr.ResetX()
|
||||
// Try sending the row to snSource in order to minimize re-routing.
|
||||
|
||||
again:
|
||||
// Try sending the row to snSource in order to minimize re-routing.
|
||||
if snSource.trySendBuf(rowBuf, 1) {
|
||||
rowsProcessed++
|
||||
continue
|
||||
}
|
||||
// The row couldn't be sent to snSrouce. Try re-routing it to other nodes.
|
||||
var sn *storageNode
|
||||
for {
|
||||
// The row couldn't be sent to snSrouce. Try re-routing it to other node.
|
||||
idx := nodesHash.getNodeIdx(h, idxsExclude)
|
||||
sn := sns[idx]
|
||||
for !sn.isReady() && len(idxsExclude) < len(sns) {
|
||||
// re-generate idxsExclude list, since sn and snSource must be put there.
|
||||
idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
|
||||
idx := nodesHash.getNodeIdx(h, idxsExclude)
|
||||
sn = sns[idx]
|
||||
if sn.isReady() {
|
||||
break
|
||||
}
|
||||
// re-generate idxsExclude list, since sn must be put there.
|
||||
idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
|
||||
}
|
||||
if sn.trySendBuf(rowBuf, 1) {
|
||||
rowsProcessed++
|
||||
snSource.rowsReroutedFromHere.Inc()
|
||||
sn.rowsReroutedToHere.Inc()
|
||||
continue
|
||||
if !sn.trySendBuf(rowBuf, 1) {
|
||||
// The row cannot be sent to both snSource and sn without blocking.
|
||||
// Sleep for a while and try sending the row to snSource again.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto again
|
||||
}
|
||||
// The row cannot be sent to both snSource and the re-routed sn without blocking.
|
||||
// Sleep for a while and try sending the row to snSource again.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto again
|
||||
rowsProcessed++
|
||||
snSource.rowsReroutedFromHere.Inc()
|
||||
sn.rowsReroutedToHere.Inc()
|
||||
}
|
||||
return rowsProcessed, nil
|
||||
}
|
||||
|
||||
func getNotReadyStorageNodeIdxsBlocking(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
|
||||
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
|
||||
func getNotReadyStorageNodeIdxsBlocking(snb *storageNodesBucket, dst []int) []int {
|
||||
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], nil)
|
||||
sns := snb.sns
|
||||
if len(dst) < len(sns) {
|
||||
return dst
|
||||
}
|
||||
noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
|
||||
tc := timerpool.Get(time.Second)
|
||||
select {
|
||||
case <-snb.stopCh:
|
||||
timerpool.Put(tc)
|
||||
return dst
|
||||
case <-tc.C:
|
||||
timerpool.Put(tc)
|
||||
}
|
||||
|
||||
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], nil)
|
||||
if availableNodes := len(sns) - len(dst); availableNodes > 0 {
|
||||
storageNodesBecameAvailableLogger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
|
||||
return dst
|
||||
|
|
Loading…
Reference in a new issue