diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 6f6c22bdb..da504af56 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -127,7 +127,8 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) { brLastResetTime = currentTime } if len(br.buf) == 0 { - // Nothing to send. + // Nothing to send. Just check sn health, so it could be returned to non-broken state. + sn.checkHealth() continue } @@ -183,6 +184,25 @@ func sendBufToReplicas(br *bufRows, snIdx, replicas int) bool { return true } +func (sn *storageNode) checkHealth() { + if !sn.isBroken() { + return + } + + sn.bcLock.Lock() + defer sn.bcLock.Unlock() + + if sn.bc != nil { + logger.Panicf("BUG: sn.bc must be nil when sn is broken; got %p", sn.bc) + } + bc, err := sn.dial() + if err != nil { + logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err) + } + sn.bc = bc + atomic.StoreUint32(&sn.broken, 0) +} + func (sn *storageNode) sendBufRows(br *bufRows) bool { sn.bcLock.Lock() defer sn.bcLock.Unlock()