app/vminsert/netstorage: refactor snb rebuild

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2024-07-29 17:23:22 +04:00
parent 2b39ee785c
commit 1272a7f743
No known key found for this signature in database
GPG key ID: 932B34D6FE062023

View file

@ -648,8 +648,39 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
// Watch for node become healthy and rebuild snb.
for _, sn := range brokenNodes {
wg.Add(1)
go func(sn *storageNode) {
sn := sn
go watchStorageNodeHealthy(sn, func() {
defer wg.Done()
// rebuild snb in order to update consistent hash with an ID of the healthy storage node
for {
currentSnb := getStorageNodesBucket()
newSnb := initStorageNodes(addrs, hashSeed)
if !storageNodes.CompareAndSwap(currentSnb, newSnb) {
// snb has been changed, so we need to stop the newSnb and try again
mustStopStorageNodes(newSnb)
continue
}
// stop previous snb and exit
mustStopStorageNodes(currentSnb)
break
}
})
}
return snb
}
func mustStopStorageNodes(snb *storageNodesBucket) {
close(snb.stopCh)
for _, sn := range snb.sns {
sn.brCond.Broadcast()
}
snb.wg.Wait()
metrics.UnregisterSet(snb.ms, true)
}
// watchStorageNodeHealthy watches for sn become healthy and calls cb once it is ready.
func watchStorageNodeHealthy(sn *storageNode, cb func()) {
for {
sn.brLock.Lock()
for !sn.isReady() {
@ -670,31 +701,10 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
}
if sn.isReady() {
again:
currentSnb := getStorageNodesBucket()
newSnb := initStorageNodes(addrs, hashSeed)
if !storageNodes.CompareAndSwap(currentSnb, newSnb) {
mustStopStorageNodes(newSnb)
goto again
}
mustStopStorageNodes(currentSnb)
break
cb()
return
}
}
}(sn)
}
return snb
}
func mustStopStorageNodes(snb *storageNodesBucket) {
close(snb.stopCh)
for _, sn := range snb.sns {
sn.brCond.Broadcast()
}
snb.wg.Wait()
metrics.UnregisterSet(snb.ms, true)
}
// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.