diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 1bcfbc745..bbd6aa01b 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -95,6 +95,7 @@ again: return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows) default: } + if !sn.isReady() { if len(sns) == 1 { // There are no other storage nodes to re-route to. So wait until the current node becomes healthy. @@ -102,11 +103,12 @@ again: goto again } if *disableReroutingOnUnavailable { - // We should not send timeseries from currently unavailable storage to alive storage nodes + // We should not send timeseries from currently unavailable storage to alive storage nodes. sn.brCond.Wait() goto again } sn.brLock.Unlock() + // The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set. rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf) rows -= rowsProcessed @@ -115,6 +117,7 @@ again: } return nil } + if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode { // Fast path: the buf contents fits sn.buf. sn.br.buf = append(sn.br.buf, buf...) @@ -136,12 +139,6 @@ again: return nil } -var closedCh = func() <-chan struct{} { - ch := make(chan struct{}) - close(ch) - return ch -}() - func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) { replicas := *replicationFactor if replicas <= 0 { @@ -164,29 +161,26 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) { defer ticker.Stop() var br bufRows brLastResetTime := fasttime.UnixTimestamp() - var waitCh <-chan struct{} mustStop := false for !mustStop { sn.brLock.Lock() - bufLen := len(sn.br.buf) + waitForNewData := len(sn.br.buf) == 0 sn.brLock.Unlock() - waitCh = nil - if bufLen > 0 { - // Do not sleep if sn.br.buf isn't empty. - waitCh = closedCh - } - select { - case <-sn.stopCh: - mustStop = true - // Make sure the sn.buf is flushed last time before returning - // in order to send the remaining bits of data. - case <-ticker.C: - case <-waitCh: + if waitForNewData { + select { + case <-sn.stopCh: + mustStop = true + // Make sure the br.buf is flushed last time before returning + // in order to send the remaining bits of data. + case <-ticker.C: + } } + sn.brLock.Lock() sn.br, br = br, sn.br sn.brCond.Broadcast() sn.brLock.Unlock() + currentTime := fasttime.UnixTimestamp() if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { // Free up capacity space occupied by br.buf in order to reduce memory usage after spikes. @@ -205,6 +199,7 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) { select { case <-sn.stopCh: timerpool.Put(t) + logger.Errorf("dropping %d rows on graceful shutdown, since all the vmstorage nodes are unavailable", br.rows) return case <-t.C: timerpool.Put(t) @@ -294,6 +289,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { if !sn.isReady() { return false } + sn.bcLock.Lock() defer sn.bcLock.Unlock() @@ -673,7 +669,7 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo idx := nodesHash.getNodeIdx(h, idxsExcludeNew) snNew := sns[idx] if !snNew.trySendBuf(rowBuf, 1) { - // The row cannot be sent to both snSource and to sn without blocking. + // The row cannot be sent to both snSource, sn and snNew without blocking. // Sleep for a while and try sending the row to snSource again. time.Sleep(100 * time.Millisecond) goto again @@ -785,6 +781,11 @@ func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *sto } func (sn *storageNode) trySendBuf(buf []byte, rows int) bool { + if !sn.isReady() { + // Fast path without locking the sn.brLock. + return false + } + sent := false sn.brLock.Lock() if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {