diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index b5004add88..091c635040 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -576,6 +576,11 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows rowsProcessed := 0 defer reroutedRowsProcessed.Add(rowsProcessed) + sns := getHealthyStorageNodesBlocking(stopCh) + if len(sns) == 0 { + // stopCh is notified to stop. + return + } src := br.buf for len(src) > 0 { tail, err := mr.Unmarshal(src) @@ -593,12 +598,6 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows h = xxhash.Sum64(mr.MetricNameRaw) } for { - // Obtain fresh list of healthy storage nodes, since it may change with every iteration. - sns := getHealthyStorageNodesBlocking(stopCh) - if len(sns) == 0 { - // stopCh is notified to stop. - return - } idx := h % uint64(len(sns)) sn := sns[idx] if sn.sendReroutedRow(rowBuf) { @@ -621,6 +620,12 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows case <-t.C: timerpool.Put(t) } + // Obtain fresh list of healthy storage nodes after the delay, since it may be already updated. + sns = getHealthyStorageNodesBlocking(stopCh) + if len(sns) == 0 { + // stopCh is notified to stop. + return + } } } }