From a586b8b6d4f0ead2b60eb38711f59d0a120d82d7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 25 Jun 2020 16:38:42 +0300 Subject: [PATCH] app/vminsert/netstorage: do not re-route every time series to more than two vmstorage nodes when certain vmstorage nodes are temporarily slower than the rest of them Previously vminsert may spread data for a single time series across all the available vmstorage nodes when vmstorage nodes couldn't handle the given ingestion rate. This could lead to increased usage of CPU and memory on every vmstorage node, since every vmstorage node had to register all the time series seen in the cluster. Now a time series may spread to maximum two vmstorage nodes under heavy load. Every time series is routed to a single vmstorage node under normal load. --- app/vminsert/netstorage/netstorage.go | 60 ++++++++++++++------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index f8e25b7aa1..6f7431f72d 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -50,7 +50,7 @@ func (sn *storageNode) push(buf []byte, rows int) error { if sn.isBroken() { // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes. - if err := addToReroutedBuf(buf, rows); err != nil { + if err := addToReroutedBufMayBlock(buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %s", rows, err) } sn.rowsReroutedFromHere.Add(rows) @@ -70,7 +70,7 @@ func (sn *storageNode) push(buf []byte, rows int) error { // Slow path: the buf contents doesn't fit sn.buf. // This means that the current vmstorage is slow or will become broken soon. // Re-route buf to healthy vmstorage nodes. - if err := addToReroutedBuf(buf, rows); err != nil { + if err := addToReroutedBufMayBlock(buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %s", rows, err) } sn.rowsReroutedFromHere.Add(rows) @@ -352,7 +352,7 @@ func rerouteWorker(stopCh <-chan struct{}) { spreadReroutedBufToStorageNodes(sns, &br) // There is no need in br.reset() here, since it is already done in spreadReroutedBufToStorageNodes. } - // Notify all the blocked addToReroutedBuf callers, so they may finish the work. + // Notify all the blocked addToReroutedBufMayBlock callers, so they may finish the work. reroutedBRCond.Broadcast() } @@ -488,7 +488,7 @@ func Stop() { storageNodesWG.Wait() } -// addToReroutedBuf adds buf to reroutedBR. +// addToReroutedBufMayBlock adds buf to reroutedBR. // // It waits until the reroutedBR has enough space for buf or if Stop is called. // This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' @@ -498,7 +498,7 @@ func Stop() { // // - if all the storage nodes are unhealthy. // - if Stop is called. -func addToReroutedBuf(buf []byte, rows int) error { +func addToReroutedBufMayBlock(buf []byte, rows int) error { if len(buf) > reroutedBufMaxSize { logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize) } @@ -552,7 +552,11 @@ func getHealthyStorageNodes() []*storageNode { func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) { var mr storage.MetricRow rowsProcessed := 0 + defer reroutedRowsProcessed.Add(rowsProcessed) + src := br.buf + dst := br.buf[:0] + dstRows := 0 for len(src) > 0 { tail, err := mr.Unmarshal(src) if err != nil { @@ -560,6 +564,7 @@ func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) { } rowBuf := src[:len(src)-len(tail)] src = tail + rowsProcessed++ idx := uint64(0) if len(sns) > 1 { @@ -572,41 +577,40 @@ func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) { attempts := 0 for { sn := sns[idx] - if sn.sendReroutedRow(rowBuf) { - // The row has been successfully re-routed to sn. - break - } - - // Cannot re-route data to sn. Try sending to the next vmstorage node. idx++ if idx >= uint64(len(sns)) { idx = 0 } attempts++ - if attempts < len(sns) { + if attempts > len(sns) { + // All the storage nodes are broken. + // Return the remaining data to br.buf, so it may be processed later. + dst = append(dst, rowBuf...) + dst = append(dst, src...) + br.buf = dst + br.rows = dstRows + (br.rows - rowsProcessed + 1) + return + } + if sn.isBroken() { + // The sn is broken. Go to the next one. continue } - - // There is no enough buffer space in all the vmstorage nodes. - // Return the remaining data to br.buf, so it may be processed later. - br.buf = append(br.buf[:0], rowBuf...) - br.buf = append(br.buf, src...) - br.rows -= rowsProcessed - return + if !sn.sendReroutedRow(rowBuf) { + // The row cannot be re-routed to sn. Return it back to the buf for rerouting. + // Do not re-route the row to the remaining storage nodes, + // since this may result in increased resource usage (CPU, memory, disk IO) on these nodes, + // because they'll have to accept and register new time series (this is resource-intensive operation). + dst = append(dst, rowBuf...) + dstRows++ + } + break } - rowsProcessed++ } - if rowsProcessed != br.rows { - logger.Panicf("BUG: unexpected number of rows processed; got %d; want %d", rowsProcessed, br.rows) - } - reroutedRowsProcessed.Add(rowsProcessed) - br.reset() + br.buf = dst + br.rows = dstRows } func (sn *storageNode) sendReroutedRow(buf []byte) bool { - if sn.isBroken() { - return false - } sn.brLock.Lock() ok := len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode if ok {