diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 5550c7e55c..964595c8e9 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -66,7 +66,7 @@ again: } sn.brLock.Unlock() // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes even if *disableRerouting==true. - if err := rerouteRowsMayBlock(sn, buf, rows); err != nil { + if err := rerouteRowsMayBlock(sn, false, buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err) } return nil @@ -87,7 +87,7 @@ again: // The buf contents doesn't fit sn.buf. // This means that the current vmstorage is slow or will become broken soon. // Spread buf among all the vmstorage nodes. - if err := rerouteRowsMayBlock(sn, buf, rows); err != nil { + if err := rerouteRowsMayBlock(sn, true, buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err) } return nil @@ -458,33 +458,34 @@ func Stop() { storageNodesWG.Wait() } -// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes excluding the snExclude. +// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes. // // It waits until healthy storage nodes have enough space for the re-routed rows. // This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' // ingestion rate capacity. // // It returns non-nil error only if Stop is called. -func rerouteRowsMayBlock(snExclude *storageNode, buf []byte, rows int) error { +func rerouteRowsMayBlock(snSource *storageNode, mayUseSNSource bool, buf []byte, rows int) error { if len(storageNodes) < 2 { logger.Panicf("BUG: re-routing can work only if at least 2 storage nodes are configured; got %d nodes", len(storageNodes)) } - var mr storage.MetricRow - sns := getStorageNodesForRerouting(snExclude) - if len(sns) == 1 { - // Fast path: only a single storage node is available for re-routing. - sn := sns[0] - if sn.sendBufMayBlock(buf) { - if sn != snExclude { - snExclude.rowsReroutedFromHere.Add(rows) - sn.rowsReroutedToHere.Add(rows) - } - return nil - } - } reroutesTotal.Inc() - atomic.StoreUint64(&snExclude.lastRerouteTime, fasttime.UnixTimestamp()) + sns := getStorageNodesMapForRerouting(snSource, mayUseSNSource) + if areStorageNodesEqual(sns) { + // Fast path - all the storage nodes are the same - send the buf to them. + sn := sns[0] + if !sn.sendBufMayBlock(buf) { + return fmt.Errorf("cannot re-route data because of graceful shutdown") + } + if sn != snSource { + snSource.rowsReroutedFromHere.Add(rows) + sn.rowsReroutedToHere.Add(rows) + } + return nil + } + atomic.StoreUint64(&snSource.lastRerouteTime, fasttime.UnixTimestamp()) src := buf + var mr storage.MetricRow for len(src) > 0 { tail, err := mr.UnmarshalX(src) if err != nil { @@ -495,44 +496,19 @@ func rerouteRowsMayBlock(snExclude *storageNode, buf []byte, rows int) error { reroutedRowsProcessed.Inc() h := xxhash.Sum64(mr.MetricNameRaw) mr.ResetX() - for { - idx := h % uint64(len(sns)) - sn := sns[idx] - if sn.sendBufMayBlock(rowBuf) { - // The row has been successfully re-routed to sn. - if sn != snExclude { - snExclude.rowsReroutedFromHere.Inc() - sn.rowsReroutedToHere.Inc() - } - break - } - select { - case <-storageNodesStopCh: - return fmt.Errorf("cannot re-route %d rows because of graceful shutdown", rows) - default: - } - // Refresh the list of healthy storage nodes, since sn became broken. - sns = getStorageNodesForRerouting(snExclude) + idx := h % uint64(len(sns)) + sn := sns[idx] + if !sn.sendBufMayBlock(rowBuf) { + return fmt.Errorf("cannot re-route data because of graceful shutdown") + } + if sn != snSource { + snSource.rowsReroutedFromHere.Inc() + sn.rowsReroutedToHere.Inc() } } return nil } -func getStorageNodesForRerouting(snExclude *storageNode) []*storageNode { - sns := make([]*storageNode, 0, len(storageNodes)) - currentTime := fasttime.UnixTimestamp() - for _, sn := range storageNodes { - if sn != snExclude && !sn.isBroken() && currentTime > atomic.LoadUint64(&sn.lastRerouteTime)+5 { - sns = append(sns, sn) - } - } - if len(sns) == 0 { - // There are no suitable storage nodes for re-routing. Fall back to snExclude. - sns = append(sns, snExclude) - } - return sns -} - func (sn *storageNode) sendBufMayBlock(buf []byte) bool { sn.brLock.Lock() for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode { @@ -542,10 +518,6 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool { return false default: } - if sn.isBroken() { - sn.brLock.Unlock() - return false - } sn.brCond.Wait() } sn.br.buf = append(sn.br.buf, buf...) @@ -554,6 +526,54 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool { return true } +func getStorageNodesMapForRerouting(snExclude *storageNode, mayUseSNExclude bool) []*storageNode { + sns := getStorageNodesForRerouting(snExclude, true) + if len(sns) == len(storageNodes) { + return sns + } + if !mayUseSNExclude { + sns = getStorageNodesForRerouting(snExclude, false) + } + for len(sns) < len(storageNodes) { + sns = append(sns, snExclude) + } + return sns +} + +func areStorageNodesEqual(sns []*storageNode) bool { + snOrigin := sns[0] + for _, sn := range sns[1:] { + if sn != snOrigin { + return false + } + } + return true +} + +func getStorageNodesForRerouting(snExclude *storageNode, skipRecentlyReroutedNodes bool) []*storageNode { + sns := make([]*storageNode, 0, len(storageNodes)) + currentTime := fasttime.UnixTimestamp() + for i, sn := range storageNodes { + if sn == snExclude || sn.isBroken() { + // Skip snExclude and broken storage nodes. + continue + } + if skipRecentlyReroutedNodes && currentTime <= atomic.LoadUint64(&sn.lastRerouteTime)+5 { + // Skip nodes, which were re-routed recently. + continue + } + for len(sns) <= i { + sns = append(sns, sn) + } + } + if len(sns) > 0 { + for len(sns) < len(storageNodes) { + sns = append(sns, sns[0]) + } + } + return sns +} + var ( maxBufSizePerStorageNode int