diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 40a5d77a3..e07f5da86 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -648,41 +648,23 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket { // Watch for node become healthy and rebuild snb. for _, sn := range brokenNodes { wg.Add(1) - go func(sn *storageNode) { + sn := sn + go watchStorageNodeHealthy(sn, func() { defer wg.Done() + // rebuild snb in order to update consistent hash with an ID of the healthy storage node for { - sn.brLock.Lock() - for !sn.isReady() { - select { - case <-sn.stopCh: - sn.brLock.Unlock() - return - default: - sn.brCond.Wait() - } - } - sn.brLock.Unlock() - - select { - case <-sn.stopCh: - return - default: - } - - if sn.isReady() { - again: - currentSnb := getStorageNodesBucket() - newSnb := initStorageNodes(addrs, hashSeed) - if !storageNodes.CompareAndSwap(currentSnb, newSnb) { - mustStopStorageNodes(newSnb) - goto again - } - - mustStopStorageNodes(currentSnb) - break + currentSnb := getStorageNodesBucket() + newSnb := initStorageNodes(addrs, hashSeed) + if !storageNodes.CompareAndSwap(currentSnb, newSnb) { + // snb has been changed, so we need to stop the newSnb and try again + mustStopStorageNodes(newSnb) + continue } + // stop previous snb and exit + mustStopStorageNodes(currentSnb) + break } - }(sn) + }) } return snb @@ -697,6 +679,34 @@ func mustStopStorageNodes(snb *storageNodesBucket) { metrics.UnregisterSet(snb.ms, true) } +// watchStorageNodeHealthy watches for sn become healthy and calls cb once it is ready. +func watchStorageNodeHealthy(sn *storageNode, cb func()) { + for { + sn.brLock.Lock() + for !sn.isReady() { + select { + case <-sn.stopCh: + sn.brLock.Unlock() + return + default: + sn.brCond.Wait() + } + } + sn.brLock.Unlock() + + select { + case <-sn.stopCh: + return + default: + } + + if sn.isReady() { + cb() + return + } + } +} + // rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes. // // The function blocks until src is fully re-routed.