diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index f8e25b7aa..6f7431f72 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -50,7 +50,7 @@ func (sn *storageNode) push(buf []byte, rows int) error { if sn.isBroken() { // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes. - if err := addToReroutedBuf(buf, rows); err != nil { + if err := addToReroutedBufMayBlock(buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %s", rows, err) } sn.rowsReroutedFromHere.Add(rows) @@ -70,7 +70,7 @@ func (sn *storageNode) push(buf []byte, rows int) error { // Slow path: the buf contents doesn't fit sn.buf. // This means that the current vmstorage is slow or will become broken soon. // Re-route buf to healthy vmstorage nodes. - if err := addToReroutedBuf(buf, rows); err != nil { + if err := addToReroutedBufMayBlock(buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %s", rows, err) } sn.rowsReroutedFromHere.Add(rows) @@ -352,7 +352,7 @@ func rerouteWorker(stopCh <-chan struct{}) { spreadReroutedBufToStorageNodes(sns, &br) // There is no need in br.reset() here, since it is already done in spreadReroutedBufToStorageNodes. } - // Notify all the blocked addToReroutedBuf callers, so they may finish the work. + // Notify all the blocked addToReroutedBufMayBlock callers, so they may finish the work. reroutedBRCond.Broadcast() } @@ -488,7 +488,7 @@ func Stop() { storageNodesWG.Wait() } -// addToReroutedBuf adds buf to reroutedBR. +// addToReroutedBufMayBlock adds buf to reroutedBR. // // It waits until the reroutedBR has enough space for buf or if Stop is called. // This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' @@ -498,7 +498,7 @@ func Stop() { // // - if all the storage nodes are unhealthy. // - if Stop is called. -func addToReroutedBuf(buf []byte, rows int) error { +func addToReroutedBufMayBlock(buf []byte, rows int) error { if len(buf) > reroutedBufMaxSize { logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize) } @@ -552,7 +552,11 @@ func getHealthyStorageNodes() []*storageNode { func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) { var mr storage.MetricRow rowsProcessed := 0 + defer reroutedRowsProcessed.Add(rowsProcessed) + src := br.buf + dst := br.buf[:0] + dstRows := 0 for len(src) > 0 { tail, err := mr.Unmarshal(src) if err != nil { @@ -560,6 +564,7 @@ func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) { } rowBuf := src[:len(src)-len(tail)] src = tail + rowsProcessed++ idx := uint64(0) if len(sns) > 1 { @@ -572,41 +577,40 @@ func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) { attempts := 0 for { sn := sns[idx] - if sn.sendReroutedRow(rowBuf) { - // The row has been successfully re-routed to sn. - break - } - - // Cannot re-route data to sn. Try sending to the next vmstorage node. idx++ if idx >= uint64(len(sns)) { idx = 0 } attempts++ - if attempts < len(sns) { + if attempts > len(sns) { + // All the storage nodes are broken. + // Return the remaining data to br.buf, so it may be processed later. + dst = append(dst, rowBuf...) + dst = append(dst, src...) + br.buf = dst + br.rows = dstRows + (br.rows - rowsProcessed + 1) + return + } + if sn.isBroken() { + // The sn is broken. Go to the next one. continue } - - // There is no enough buffer space in all the vmstorage nodes. - // Return the remaining data to br.buf, so it may be processed later. - br.buf = append(br.buf[:0], rowBuf...) - br.buf = append(br.buf, src...) - br.rows -= rowsProcessed - return + if !sn.sendReroutedRow(rowBuf) { + // The row cannot be re-routed to sn. Return it back to the buf for rerouting. + // Do not re-route the row to the remaining storage nodes, + // since this may result in increased resource usage (CPU, memory, disk IO) on these nodes, + // because they'll have to accept and register new time series (this is resource-intensive operation). + dst = append(dst, rowBuf...) + dstRows++ + } + break } - rowsProcessed++ } - if rowsProcessed != br.rows { - logger.Panicf("BUG: unexpected number of rows processed; got %d; want %d", rowsProcessed, br.rows) - } - reroutedRowsProcessed.Add(rowsProcessed) - br.reset() + br.buf = dst + br.rows = dstRows } func (sn *storageNode) sendReroutedRow(buf []byte) bool { - if sn.isBroken() { - return false - } sn.brLock.Lock() ok := len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode if ok {