mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vminsert: do not drop pending rows if all the vmstorage backends are unavailable
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/294
This commit is contained in:
parent
36a1a21d6e
commit
4d70a81e18
1 changed files with 11 additions and 9 deletions
|
@ -36,9 +36,9 @@ func (sn *storageNode) push(buf []byte, rows int) error {
|
|||
|
||||
if sn.broken {
|
||||
// The vmstorage node is broken. Re-route buf to healthy vmstorage nodes.
|
||||
if err := addToReroutedBuf(buf, rows); err != nil {
|
||||
if !addToReroutedBuf(buf, rows) {
|
||||
rowsLostTotal.Add(rows)
|
||||
return err
|
||||
return fmt.Errorf("%d rows dropped because of reroutedBuf overflows %d bytes", rows, reroutedBufMaxSize)
|
||||
}
|
||||
sn.rowsReroutedFromHere.Add(rows)
|
||||
return nil
|
||||
|
@ -97,11 +97,13 @@ func (sn *storageNode) flushBufLocked() error {
|
|||
|
||||
// Couldn't flush sn.buf to vmstorage. Mark sn as broken
|
||||
// and try re-routing sn.buf to healthy vmstorage nodes.
|
||||
logger.Errorf("cannot send data to vmstorage %s: %s; re-routing data to healthy vmstorage nodes", sn.dialer.Addr(), err)
|
||||
sn.broken = true
|
||||
err = addToReroutedBuf(sn.buf, sn.rows)
|
||||
if err != nil {
|
||||
rowsLostTotal.Add(sn.rows)
|
||||
if !addToReroutedBuf(sn.buf, sn.rows) {
|
||||
// Preserve sn.buf when it cannot be sent to healthy nodes
|
||||
// in the hope the error will disappear on the next call to flushBufLocked.
|
||||
//
|
||||
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/294 .
|
||||
return err
|
||||
}
|
||||
sn.buf = sn.buf[:0]
|
||||
sn.rows = 0
|
||||
|
@ -339,17 +341,17 @@ func Stop() {
|
|||
storageNodesWG.Wait()
|
||||
}
|
||||
|
||||
func addToReroutedBuf(buf []byte, rows int) error {
|
||||
func addToReroutedBuf(buf []byte, rows int) bool {
|
||||
reroutedLock.Lock()
|
||||
defer reroutedLock.Unlock()
|
||||
if len(reroutedBuf)+len(buf) > reroutedBufMaxSize {
|
||||
reroutedBufOverflows.Inc()
|
||||
return fmt.Errorf("%d rows dropped because of reroutedBuf overflows %d bytes", rows, reroutedBufMaxSize)
|
||||
return false
|
||||
}
|
||||
reroutedBuf = append(reroutedBuf, buf...)
|
||||
reroutedRows += rows
|
||||
reroutesTotal.Inc()
|
||||
return nil
|
||||
return true
|
||||
}
|
||||
|
||||
func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
|
||||
|
|
Loading…
Reference in a new issue