app/vminsert/netstorage: exclude unavailable nodes from consistent hash on start

Exclude unhealthy storage nodes from consistent hash in case persistent storage node IDs are enabled.
This is needed in order to avoid uneven distribution of load due to default(uint64(0)) IDs assigned to storage nodes.

Remove generating fallback ID from node IP address as this will cause a re-distribution of series once storage node will become available and will change its ID.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2024-07-08 13:15:24 +04:00
parent e44c6f38c2
commit 88bfad9535
No known key found for this signature in database
GPG key ID: 932B34D6FE062023

View file

@ -404,12 +404,12 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
compressionLevel = 0
}
bc, id, err := handshake.VMInsertClient(c, compressionLevel)
sn.id.CompareAndSwap(0, id)
if err != nil {
_ = c.Close()
sn.handshakeErrors.Inc()
return nil, fmt.Errorf("handshake error: %w", err)
}
sn.id.CompareAndSwap(0, id)
return bc, nil
}
@ -418,15 +418,6 @@ func (sn *storageNode) getID() uint64 {
if sn.id.Load() == 0 {
sn.checkHealth()
}
// If the id is still not populated after checkHealth then storage node is not reachable
// build a unique id based on the address
if sn.id.Load() == 0 {
id := xxhash.Sum64String(sn.dialer.Addr())
sn.id.CompareAndSwap(0, id)
return id
}
return sn.id.Load()
}
@ -541,29 +532,20 @@ func MustStop() {
mustStopStorageNodes(snb)
}
var (
nodeID uint64
nodeIDOnce sync.Once
)
// GetNodeID returns unique identifier for underlying storage nodes.
func GetNodeID() uint64 {
nodeIDOnce.Do(func() {
snb := getStorageNodesBucket()
snIDs := make([]uint64, 0, len(snb.sns))
for _, sn := range snb.sns {
snIDs = append(snIDs, sn.getID())
}
slices.Sort(snIDs)
idsM := make([]byte, 0)
for _, id := range snIDs {
idsM = encoding.MarshalUint64(idsM, id)
}
snb := getStorageNodesBucket()
snIDs := make([]uint64, 0, len(snb.sns))
for _, sn := range snb.sns {
snIDs = append(snIDs, sn.getID())
}
slices.Sort(snIDs)
idsM := make([]byte, 0)
for _, id := range snIDs {
idsM = encoding.MarshalUint64(idsM, id)
}
nodeID = xxhash.Sum64(idsM)
})
return nodeID
return xxhash.Sum64(idsM)
}
func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
@ -572,6 +554,7 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
}
ms := metrics.NewSet()
sns := make([]*storageNode, 0, len(addrs))
brokenNodes := make([]*storageNode, 0)
stopCh := make(chan struct{})
nodeIDs := make([]uint64, 0, len(addrs))
for _, addr := range addrs {
@ -622,16 +605,19 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
var nodeID uint64
if *usePersistentStorageNodeID {
nodeID = sn.getID()
if nodeID == 0 {
brokenNodes = append(brokenNodes, sn)
continue
}
} else {
nodeID = xxhash.Sum64String(addr)
}
nodeIDs = append(nodeIDs, nodeID)
sns = append(sns, sn)
}
nodesHash := newConsistentHash(nodeIDs, hashSeed)
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(sns)
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(addrs)
if maxBufSizePerStorageNode > consts.MaxInsertPacketSizeForVMInsert {
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
}
@ -646,7 +632,12 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
wg: &wg,
}
for idx, sn := range sns {
// add broken nodes to the end of the list
// this is needed because consistent hash slots will be populated with IDs of available
// storage nodes (if there are any) and indexes of consistent hash must be linked to healthy storage nodes
snb.sns = append(snb.sns, brokenNodes...)
for idx, sn := range snb.sns {
wg.Add(1)
go func(sn *storageNode, idx int) {
sn.run(snb, idx)
@ -654,6 +645,52 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
}(sn, idx)
}
// Watch for node become healthy and add it to consistent hash.
for _, sn := range brokenNodes {
wg.Add(1)
go func(sn *storageNode) {
defer wg.Done()
for {
sn.brLock.Lock()
for !sn.isReady() {
select {
case <-sn.stopCh:
sn.brLock.Unlock()
return
default:
sn.brCond.Wait()
}
}
sn.brLock.Unlock()
select {
case <-sn.stopCh:
return
default:
}
if sn.isReady() {
again:
oldSnb := getStorageNodesBucket()
snbNew := storageNodesBucket{
ms: oldSnb.ms,
wg: oldSnb.wg,
stopCh: oldSnb.stopCh,
sns: oldSnb.sns,
}
newNodeIds := append(snb.nodesHash.nodeHashes, sn.getID())
snbNew.nodesHash = newConsistentHash(newNodeIds, hashSeed)
if !storageNodes.CompareAndSwap(oldSnb, &snbNew) {
goto again
}
break
}
}
}(sn)
}
return snb
}