From 464682f380501b87555f7f280ba5006d4d60a373 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 18 Jun 2020 20:41:33 +0300 Subject: [PATCH] app/vminsert/netstorage: periodically check for each `-storageNode` health, so it could be marked as healthy when it is ready to accept data This fixes uneven data routing in cluster version when `-replicationFactor` is set to 1 (default value), i.e. when the replication is disabled. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/546 --- app/vminsert/netstorage/netstorage.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 6f6c22bdb..da504af56 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -127,7 +127,8 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) { brLastResetTime = currentTime } if len(br.buf) == 0 { - // Nothing to send. + // Nothing to send. Just check sn health, so it could be returned to non-broken state. + sn.checkHealth() continue } @@ -183,6 +184,25 @@ func sendBufToReplicas(br *bufRows, snIdx, replicas int) bool { return true } +func (sn *storageNode) checkHealth() { + if !sn.isBroken() { + return + } + + sn.bcLock.Lock() + defer sn.bcLock.Unlock() + + if sn.bc != nil { + logger.Panicf("BUG: sn.bc must be nil when sn is broken; got %p", sn.bc) + } + bc, err := sn.dial() + if err != nil { + logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err) + } + sn.bc = bc + atomic.StoreUint32(&sn.broken, 0) +} + func (sn *storageNode) sendBufRows(br *bufRows) bool { sn.bcLock.Lock() defer sn.bcLock.Unlock()