mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vminsert/netstorage: log the error message when pending data wasn't sent to vmstorage nodes because they were unavailable at graceful shutdown
This commit is contained in:
parent
7d619deacc
commit
be3f5d1c64
1 changed files with 23 additions and 22 deletions
|
@ -95,6 +95,7 @@ again:
|
|||
return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows)
|
||||
default:
|
||||
}
|
||||
|
||||
if !sn.isReady() {
|
||||
if len(sns) == 1 {
|
||||
// There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
|
||||
|
@ -102,11 +103,12 @@ again:
|
|||
goto again
|
||||
}
|
||||
if *disableReroutingOnUnavailable {
|
||||
// We should not send timeseries from currently unavailable storage to alive storage nodes
|
||||
// We should not send timeseries from currently unavailable storage to alive storage nodes.
|
||||
sn.brCond.Wait()
|
||||
goto again
|
||||
}
|
||||
sn.brLock.Unlock()
|
||||
|
||||
// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
|
||||
rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
|
||||
rows -= rowsProcessed
|
||||
|
@ -115,6 +117,7 @@ again:
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
|
||||
// Fast path: the buf contents fits sn.buf.
|
||||
sn.br.buf = append(sn.br.buf, buf...)
|
||||
|
@ -136,12 +139,6 @@ again:
|
|||
return nil
|
||||
}
|
||||
|
||||
var closedCh = func() <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}()
|
||||
|
||||
func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
|
||||
replicas := *replicationFactor
|
||||
if replicas <= 0 {
|
||||
|
@ -164,29 +161,26 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
|
|||
defer ticker.Stop()
|
||||
var br bufRows
|
||||
brLastResetTime := fasttime.UnixTimestamp()
|
||||
var waitCh <-chan struct{}
|
||||
mustStop := false
|
||||
for !mustStop {
|
||||
sn.brLock.Lock()
|
||||
bufLen := len(sn.br.buf)
|
||||
waitForNewData := len(sn.br.buf) == 0
|
||||
sn.brLock.Unlock()
|
||||
waitCh = nil
|
||||
if bufLen > 0 {
|
||||
// Do not sleep if sn.br.buf isn't empty.
|
||||
waitCh = closedCh
|
||||
}
|
||||
if waitForNewData {
|
||||
select {
|
||||
case <-sn.stopCh:
|
||||
mustStop = true
|
||||
// Make sure the sn.buf is flushed last time before returning
|
||||
// Make sure the br.buf is flushed last time before returning
|
||||
// in order to send the remaining bits of data.
|
||||
case <-ticker.C:
|
||||
case <-waitCh:
|
||||
}
|
||||
}
|
||||
|
||||
sn.brLock.Lock()
|
||||
sn.br, br = br, sn.br
|
||||
sn.brCond.Broadcast()
|
||||
sn.brLock.Unlock()
|
||||
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
|
||||
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
|
||||
|
@ -205,6 +199,7 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
|
|||
select {
|
||||
case <-sn.stopCh:
|
||||
timerpool.Put(t)
|
||||
logger.Errorf("dropping %d rows on graceful shutdown, since all the vmstorage nodes are unavailable", br.rows)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
|
@ -294,6 +289,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
|
|||
if !sn.isReady() {
|
||||
return false
|
||||
}
|
||||
|
||||
sn.bcLock.Lock()
|
||||
defer sn.bcLock.Unlock()
|
||||
|
||||
|
@ -673,7 +669,7 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
|
|||
idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
|
||||
snNew := sns[idx]
|
||||
if !snNew.trySendBuf(rowBuf, 1) {
|
||||
// The row cannot be sent to both snSource and to sn without blocking.
|
||||
// The row cannot be sent to both snSource, sn and snNew without blocking.
|
||||
// Sleep for a while and try sending the row to snSource again.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
goto again
|
||||
|
@ -785,6 +781,11 @@ func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *sto
|
|||
}
|
||||
|
||||
func (sn *storageNode) trySendBuf(buf []byte, rows int) bool {
|
||||
if !sn.isReady() {
|
||||
// Fast path without locking the sn.brLock.
|
||||
return false
|
||||
}
|
||||
|
||||
sent := false
|
||||
sn.brLock.Lock()
|
||||
if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
|
||||
|
|
Loading…
Reference in a new issue