package netstorage import ( "flag" "fmt" "io" "sort" "sync" "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" xxhash "github.com/cespare/xxhash/v2" ) var ( disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+ "Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+ "Higher values for -dedup.minScrapeInterval at vmselect is OK") disableRerouting = flag.Bool(`disableRerouting`, false, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. By default the re-routing is enabled. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster") ) func (sn *storageNode) isBroken() bool { return atomic.LoadUint32(&sn.broken) != 0 } // push pushes buf to sn internal bufs. // // This function doesn't block on fast path. // It may block only if all the storageNodes cannot handle the incoming ingestion rate. // This blocking provides backpressure to the caller. // // The function falls back to sending data to other vmstorage nodes // if sn is currently unavailable or overloaded. // // rows is the number of rows in the buf. func (sn *storageNode) push(buf []byte, rows int) error { if len(buf) > maxBufSizePerStorageNode { logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode) } sn.rowsPushed.Add(rows) sn.brLock.Lock() again: select { case <-storageNodesStopCh: sn.brLock.Unlock() return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows) default: } if sn.isBroken() { if len(storageNodes) == 1 { // There are no other storage nodes to re-route to. So wait until the current node becomes healthy. sn.brCond.Wait() goto again } sn.brLock.Unlock() // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes even if *disableRerouting==true. if err := rerouteRowsMayBlock(sn, false, buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err) } return nil } if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode { // Fast path: the buf contents fits sn.buf. sn.br.buf = append(sn.br.buf, buf...) sn.br.rows += rows sn.brLock.Unlock() return nil } if *disableRerouting || len(storageNodes) == 1 { sn.brCond.Wait() goto again } sn.brLock.Unlock() // The buf contents doesn't fit sn.buf. // This means that the current vmstorage is slow or will become broken soon. // Spread buf among all the vmstorage nodes. if err := rerouteRowsMayBlock(sn, true, buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err) } return nil } var closedCh = func() <-chan struct{} { ch := make(chan struct{}) close(ch) return ch }() func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) { replicas := *replicationFactor if replicas <= 0 { replicas = 1 } if replicas > len(storageNodes) { replicas = len(storageNodes) } ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() var br bufRows brLastResetTime := fasttime.UnixTimestamp() var waitCh <-chan struct{} mustStop := false for !mustStop { sn.brLock.Lock() bufLen := len(sn.br.buf) sn.brLock.Unlock() waitCh = nil if bufLen > 0 { // Do not sleep if sn.br.buf isn't empty. waitCh = closedCh } select { case <-stopCh: mustStop = true // Make sure the sn.buf is flushed last time before returning // in order to send the remaining bits of data. case <-ticker.C: case <-waitCh: } sn.brLock.Lock() sn.br, br = br, sn.br sn.brCond.Broadcast() sn.brLock.Unlock() currentTime := fasttime.UnixTimestamp() if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { // Free up capacity space occupied by br.buf in order to reduce memory usage after spikes. br.buf = append(br.buf[:0:0], br.buf...) brLastResetTime = currentTime } sn.checkHealth() if len(br.buf) == 0 { // Nothing to send. continue } // Send br to replicas storageNodes starting from snIdx. for !sendBufToReplicasNonblocking(&br, snIdx, replicas) { t := timerpool.Get(200 * time.Millisecond) select { case <-stopCh: timerpool.Put(t) return case <-t.C: timerpool.Put(t) sn.checkHealth() } } br.reset() } } func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool { usedStorageNodes := make(map[*storageNode]bool, replicas) for i := 0; i < replicas; i++ { idx := snIdx + i attempts := 0 for { attempts++ if attempts > len(storageNodes) { if i == 0 { // The data wasn't replicated at all. logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+ "re-trying to send the data soon", len(br.buf), br.rows) return false } // The data is partially replicated, so just emit a warning and return true. // We could retry sending the data again, but this may result in uncontrolled duplicate data. // So it is better returning true. rowsIncompletelyReplicatedTotal.Add(br.rows) logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+ "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows) return true } if idx >= len(storageNodes) { idx %= len(storageNodes) } sn := storageNodes[idx] idx++ if usedStorageNodes[sn] { // The br has been already replicated to sn. Skip it. continue } if !sn.sendBufRowsNonblocking(br) { // Cannot send data to sn. Go to the next sn. continue } // Successfully sent data to sn. usedStorageNodes[sn] = true break } } return true } func (sn *storageNode) checkHealth() { sn.bcLock.Lock() defer sn.bcLock.Unlock() if sn.bc != nil { // The sn looks healthy. return } bc, err := sn.dial() if err != nil { atomic.StoreUint32(&sn.broken, 1) sn.brCond.Broadcast() if sn.lastDialErr == nil { // Log the error only once. sn.lastDialErr = err logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err) } return } logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr()) sn.lastDialErr = nil sn.bc = bc atomic.StoreUint32(&sn.broken, 0) sn.brCond.Broadcast() } func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { if sn.isBroken() { return false } sn.bcLock.Lock() defer sn.bcLock.Unlock() if sn.bc == nil { // Do not call sn.dial() here in order to prevent long blocking on sn.bcLock.Lock(), // which can negatively impact data sending in sendBufToReplicasNonblocking(). // sn.dial() should be called by sn.checkHealth() on unsuccessful call to sendBufToReplicasNonblocking(). return false } startTime := time.Now() err := sendToConn(sn.bc, br.buf) duration := time.Since(startTime) sn.sendDurationSeconds.Add(duration.Seconds()) if err == nil { // Successfully sent buf to bc. sn.rowsSent.Add(br.rows) return true } // Couldn't flush buf to sn. Mark sn as broken. logger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+ "re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err) if err = sn.bc.Close(); err != nil { logger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) } sn.bc = nil atomic.StoreUint32(&sn.broken, 1) sn.brCond.Broadcast() sn.connectionErrors.Inc() return false } func sendToConn(bc *handshake.BufferedConn, buf []byte) error { if len(buf) == 0 { // Nothing to send return nil } timeoutSeconds := len(buf) / 3e5 if timeoutSeconds < 60 { timeoutSeconds = 60 } timeout := time.Duration(timeoutSeconds) * time.Second deadline := time.Now().Add(timeout) if err := bc.SetWriteDeadline(deadline); err != nil { return fmt.Errorf("cannot set write deadline to %s: %w", deadline, err) } // sizeBuf guarantees that the rows batch will be either fully // read or fully discarded on the vmstorage side. // sizeBuf is used for read optimization in vmstorage. sizeBuf := sizeBufPool.Get() defer sizeBufPool.Put(sizeBuf) sizeBuf.B = encoding.MarshalUint64(sizeBuf.B[:0], uint64(len(buf))) if _, err := bc.Write(sizeBuf.B); err != nil { return fmt.Errorf("cannot write data size %d: %w", len(buf), err) } if _, err := bc.Write(buf); err != nil { return fmt.Errorf("cannot write data with size %d: %w", len(buf), err) } if err := bc.Flush(); err != nil { return fmt.Errorf("cannot flush data with size %d: %w", len(buf), err) } // Wait for `ack` from vmstorage. // This guarantees that the message has been fully received by vmstorage. deadline = time.Now().Add(timeout) if err := bc.SetReadDeadline(deadline); err != nil { return fmt.Errorf("cannot set read deadline for reading `ack` to vmstorage: %w", err) } if _, err := io.ReadFull(bc, sizeBuf.B[:1]); err != nil { return fmt.Errorf("cannot read `ack` from vmstorage: %w", err) } if sizeBuf.B[0] != 1 { return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want %d", sizeBuf.B[0], 1) } return nil } var sizeBufPool bytesutil.ByteBufferPool func (sn *storageNode) dial() (*handshake.BufferedConn, error) { c, err := sn.dialer.Dial() if err != nil { sn.dialErrors.Inc() return nil, err } compressionLevel := 1 if *disableRPCCompression { compressionLevel = 0 } bc, err := handshake.VMInsertClient(c, compressionLevel) if err != nil { _ = c.Close() sn.handshakeErrors.Inc() return nil, fmt.Errorf("handshake error: %w", err) } return bc, nil } // storageNode is a client sending data to vmstorage node. type storageNode struct { // The last time for the re-routing. lastRerouteTime uint64 // broken is set to non-zero if the given vmstorage node is temporarily unhealthy. // In this case the data is re-routed to the remaining healthy vmstorage nodes. broken uint32 // brLock protects br. brLock sync.Mutex // brCond is used for waiting for free space in br. brCond *sync.Cond // Buffer with data that needs to be written to the storage node. // It must be accessed under brLock. br bufRows // bcLock protects bc. bcLock sync.Mutex // bc is a single connection to vmstorage for data transfer. // It must be accessed under bcLock. bc *handshake.BufferedConn dialer *netutil.TCPDialer // last error during dial. lastDialErr error // The number of dial errors to vmstorage node. dialErrors *metrics.Counter // The number of handshake errors to vmstorage node. handshakeErrors *metrics.Counter // The number of connection errors to vmstorage node. connectionErrors *metrics.Counter // The number of rows pushed to storageNode with push method. rowsPushed *metrics.Counter // The number of rows sent to vmstorage node. rowsSent *metrics.Counter // The number of rows rerouted from the given vmstorage node // to healthy nodes when the given node was unhealthy. rowsReroutedFromHere *metrics.Counter // The number of rows rerouted to the given vmstorage node // from other nodes when they were unhealthy. rowsReroutedToHere *metrics.Counter // The total duration spent for sending data to vmstorage node. // This metric is useful for determining the saturation of vminsert->vmstorage link. sendDurationSeconds *metrics.FloatCounter } // storageNodes contains a list of vmstorage node clients. var storageNodes []*storageNode var storageNodesWG sync.WaitGroup var storageNodesStopCh = make(chan struct{}) // InitStorageNodes initializes vmstorage nodes' connections to the given addrs. func InitStorageNodes(addrs []string) { if len(addrs) == 0 { logger.Panicf("BUG: addrs must be non-empty") } if len(addrs) > 255 { logger.Panicf("BUG: too much addresses: %d; max supported %d addresses", len(addrs), 255) } // Sort addrs in order to guarantee identical series->vmstorage mapping across all the vminsert nodes. addrsCopy := append([]string{}, addrs...) sort.Strings(addrsCopy) addrs = addrsCopy storageNodes = storageNodes[:0] for _, addr := range addrs { sn := &storageNode{ dialer: netutil.NewTCPDialer("vminsert", addr), dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)), handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)), connectionErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)), rowsPushed: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)), rowsSent: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)), rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)), rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)), sendDurationSeconds: metrics.NewFloatCounter(fmt.Sprintf(`vm_rpc_send_duration_seconds_total{name="vminsert", addr=%q}`, addr)), } sn.brCond = sync.NewCond(&sn.brLock) _ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 { sn.brLock.Lock() n := sn.br.rows sn.brLock.Unlock() return float64(n) }) _ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 { sn.brLock.Lock() n := len(sn.br.buf) sn.brLock.Unlock() return float64(n) }) _ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_reachable{name="vminsert", addr=%q}`, addr), func() float64 { if sn.isBroken() { return 0 } return 1 }) storageNodes = append(storageNodes, sn) } maxBufSizePerStorageNode = memory.Allowed() / 8 / len(storageNodes) if maxBufSizePerStorageNode > consts.MaxInsertPacketSize { maxBufSizePerStorageNode = consts.MaxInsertPacketSize } for idx, sn := range storageNodes { storageNodesWG.Add(1) go func(sn *storageNode, idx int) { sn.run(storageNodesStopCh, idx) storageNodesWG.Done() }(sn, idx) } } // Stop gracefully stops netstorage. func Stop() { close(storageNodesStopCh) for _, sn := range storageNodes { sn.brCond.Broadcast() } storageNodesWG.Wait() } // rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes. // // It waits until healthy storage nodes have enough space for the re-routed rows. // This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' // ingestion rate capacity. // // It returns non-nil error only if Stop is called. func rerouteRowsMayBlock(snSource *storageNode, mayUseSNSource bool, buf []byte, rows int) error { if len(storageNodes) < 2 { logger.Panicf("BUG: re-routing can work only if at least 2 storage nodes are configured; got %d nodes", len(storageNodes)) } reroutesTotal.Inc() atomic.StoreUint64(&snSource.lastRerouteTime, fasttime.UnixTimestamp()) sns := getStorageNodesMapForRerouting(snSource, mayUseSNSource) if areStorageNodesEqual(sns) { // Fast path - all the storage nodes are the same - send the buf to them. sn := sns[0] if !sn.sendBufMayBlock(buf) { return fmt.Errorf("cannot re-route data because of graceful shutdown") } if sn != snSource { snSource.rowsReroutedFromHere.Add(rows) sn.rowsReroutedToHere.Add(rows) } return nil } src := buf var mr storage.MetricRow for len(src) > 0 { tail, err := mr.UnmarshalX(src) if err != nil { logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err) } rowBuf := src[:len(src)-len(tail)] src = tail reroutedRowsProcessed.Inc() h := xxhash.Sum64(mr.MetricNameRaw) mr.ResetX() idx := h % uint64(len(sns)) sn := sns[idx] if !sn.sendBufMayBlock(rowBuf) { return fmt.Errorf("cannot re-route data because of graceful shutdown") } if sn != snSource { snSource.rowsReroutedFromHere.Inc() sn.rowsReroutedToHere.Inc() } } return nil } func (sn *storageNode) sendBufMayBlock(buf []byte) bool { sn.brLock.Lock() for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode { select { case <-storageNodesStopCh: sn.brLock.Unlock() return false default: } sn.brCond.Wait() } sn.br.buf = append(sn.br.buf, buf...) sn.br.rows++ sn.brLock.Unlock() return true } func getStorageNodesMapForRerouting(snExclude *storageNode, mayUseSNExclude bool) []*storageNode { sns := getStorageNodesForRerouting(snExclude, true) if len(sns) == len(storageNodes) { return sns } if !mayUseSNExclude { sns = getStorageNodesForRerouting(snExclude, false) } for len(sns) < len(storageNodes) { sns = append(sns, snExclude) } return sns } func areStorageNodesEqual(sns []*storageNode) bool { snOrigin := sns[0] for _, sn := range sns[1:] { if sn != snOrigin { return false } } return true } func getStorageNodesForRerouting(snExclude *storageNode, skipRecentlyReroutedNodes bool) []*storageNode { sns := make([]*storageNode, 0, len(storageNodes)) currentTime := fasttime.UnixTimestamp() for i, sn := range storageNodes { if sn == snExclude || sn.isBroken() { // Skip snExclude and broken storage nodes. continue } if skipRecentlyReroutedNodes && currentTime <= atomic.LoadUint64(&sn.lastRerouteTime)+5 { // Skip nodes, which were re-routed recently. continue } for len(sns) <= i { sns = append(sns, sn) } } if len(sns) > 0 { for len(sns) < len(storageNodes) { sns = append(sns, sns[0]) } } return sns } var ( maxBufSizePerStorageNode int reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`) reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`) rowsIncompletelyReplicatedTotal = metrics.NewCounter(`vm_rpc_rows_incompletely_replicated_total{name="vminsert"}`) )