VictoriaMetrics/app/vminsert/netstorage/netstorage.go
Nikolay 2be76d4bc3
app/vminsert: fixes readonly check (#4892)
* app/vminsert: fixes readonly check
previously vminsert doesn't check readOnly state for vmstorage, since check was never performed for nil buffer
In this case every 30 second storage node loss readonly state and received some data.
It caused re-routing and possible slow down for ingestion
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4870

* wip

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-08-30 16:26:05 +02:00

822 lines
26 KiB
Go

package netstorage
import (
"errors"
"flag"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
)
var (
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression for the data sent from vminsert to vmstorage. This reduces CPU usage at the cost of higher network bandwidth usage")
replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -dropSamplesOnOverload")
dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded. The drop of samples happens before the replication, so it's not recommended to use this flag with -replicationFactor enabled.")
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage")
)
var errStorageReadOnly = errors.New("storage node is read only")
func (sn *storageNode) isReady() bool {
return atomic.LoadUint32(&sn.broken) == 0 && atomic.LoadUint32(&sn.isReadOnly) == 0
}
// push pushes buf to sn internal bufs.
//
// This function doesn't block on fast path.
// It may block only if storage nodes cannot handle the incoming ingestion rate.
// This blocking provides backpressure to the caller.
//
// The function falls back to sending data to other vmstorage nodes
// if sn is currently unavailable or overloaded.
//
// rows must match the number of rows in the buf.
func (sn *storageNode) push(snb *storageNodesBucket, buf []byte, rows int) error {
if len(buf) > maxBufSizePerStorageNode {
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode)
}
sn.rowsPushed.Add(rows)
if sn.trySendBuf(buf, rows) {
// Fast path - the buffer is successfully sent to sn.
return nil
}
if *dropSamplesOnOverload && atomic.LoadUint32(&sn.isReadOnly) == 0 {
sn.rowsDroppedOnOverload.Add(rows)
dropSamplesOnOverloadLogger.Warnf("some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+
"See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr())
return nil
}
// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
if err := sn.rerouteBufToOtherStorageNodes(snb, buf, rows); err != nil {
return fmt.Errorf("error when re-routing rows from %s: %w", sn.dialer.Addr(), err)
}
return nil
}
var dropSamplesOnOverloadLogger = logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second)
func (sn *storageNode) rerouteBufToOtherStorageNodes(snb *storageNodesBucket, buf []byte, rows int) error {
sns := snb.sns
sn.brLock.Lock()
again:
select {
case <-sn.stopCh:
sn.brLock.Unlock()
return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows)
default:
}
if !sn.isReady() {
if len(sns) == 1 {
// There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
}
return nil
}
if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
// Fast path: the buf contents fits sn.buf.
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows += rows
sn.brLock.Unlock()
return nil
}
// Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes.
if *disableRerouting || len(sns) == 1 {
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
rowsProcessed, err := rerouteRowsToFreeStorageNodes(snb, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
}
return nil
}
var closedCh = func() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()
func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
sns := snb.sns
if replicas > len(sns) {
replicas = len(sns)
}
sn.readOnlyCheckerWG.Add(1)
go func() {
defer sn.readOnlyCheckerWG.Done()
sn.readOnlyChecker()
}()
defer sn.readOnlyCheckerWG.Wait()
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
var waitCh <-chan struct{}
mustStop := false
for !mustStop {
sn.brLock.Lock()
bufLen := len(sn.br.buf)
sn.brLock.Unlock()
waitCh = nil
if bufLen > 0 {
// Do not sleep if sn.br.buf isn't empty.
waitCh = closedCh
}
select {
case <-sn.stopCh:
mustStop = true
// Make sure the sn.buf is flushed last time before returning
// in order to send the remaining bits of data.
case <-ticker.C:
case <-waitCh:
}
sn.brLock.Lock()
sn.br, br = br, sn.br
sn.brCond.Broadcast()
sn.brLock.Unlock()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
sn.checkHealth()
if len(br.buf) == 0 {
// Nothing to send.
continue
}
// Send br to replicas storage nodes starting from snIdx.
for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
t := timerpool.Get(200 * time.Millisecond)
select {
case <-sn.stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
sn.checkHealth()
}
}
br.reset()
}
}
func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
sns := snb.sns
for i := 0; i < replicas; i++ {
idx := snIdx + i
attempts := 0
for {
attempts++
if attempts > len(sns) {
if i == 0 {
// The data wasn't replicated at all.
cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon", len(br.buf), br.rows)
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows)
incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(sns) {
idx %= len(sns)
}
sn := sns[idx]
idx++
if _, ok := usedStorageNodes[sn]; ok {
// The br has been already replicated to sn. Skip it.
continue
}
if !sn.sendBufRowsNonblocking(br) {
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
usedStorageNodes[sn] = struct{}{}
break
}
}
return true
}
var (
cannotReplicateLogger = logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second)
incompleteReplicationLogger = logger.WithThrottler("incompleteReplication", 5*time.Second)
)
func (sn *storageNode) checkHealth() {
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc != nil {
// The sn looks healthy.
return
}
bc, err := sn.dial()
if err != nil {
atomic.StoreUint32(&sn.broken, 1)
sn.brCond.Broadcast()
if sn.lastDialErr == nil {
// Log the error only once.
sn.lastDialErr = err
logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err)
}
return
}
logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr())
sn.lastDialErr = nil
sn.bc = bc
atomic.StoreUint32(&sn.broken, 0)
sn.brCond.Broadcast()
}
func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
if !sn.isReady() {
return false
}
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc == nil {
// Do not call sn.dial() here in order to prevent long blocking on sn.bcLock.Lock(),
// which can negatively impact data sending in sendBufToReplicasNonblocking().
// sn.dial() should be called by sn.checkHealth() on unsuccessful call to sendBufToReplicasNonblocking().
return false
}
startTime := time.Now()
err := sendToConn(sn.bc, br.buf)
duration := time.Since(startTime)
sn.sendDurationSeconds.Add(duration.Seconds())
if err == nil {
// Successfully sent buf to bc.
sn.rowsSent.Add(br.rows)
return true
}
if errors.Is(err, errStorageReadOnly) {
// The vmstorage is transitioned to readonly mode.
atomic.StoreUint32(&sn.isReadOnly, 1)
sn.brCond.Broadcast()
// Signal the caller that the data wasn't accepted by the vmstorage,
// so it will be re-routed to the remaining vmstorage nodes.
return false
}
// Couldn't flush buf to sn. Mark sn as broken.
cannotSendBufsLogger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+
"re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
if err = sn.bc.Close(); err != nil {
cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.bc = nil
atomic.StoreUint32(&sn.broken, 1)
sn.brCond.Broadcast()
sn.connectionErrors.Inc()
return false
}
var cannotCloseStorageNodeConnLogger = logger.WithThrottler("cannotCloseStorageNodeConn", 5*time.Second)
var cannotSendBufsLogger = logger.WithThrottler("cannotSendBufRows", 5*time.Second)
func sendToConn(bc *handshake.BufferedConn, buf []byte) error {
// if len(buf) == 0, it must be sent to the vmstorage too in order to check for vmstorage health
// See checkReadOnlyMode() and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4870
timeoutSeconds := len(buf) / 3e5
if timeoutSeconds < 60 {
timeoutSeconds = 60
}
timeout := time.Duration(timeoutSeconds) * time.Second
deadline := time.Now().Add(timeout)
if err := bc.SetWriteDeadline(deadline); err != nil {
return fmt.Errorf("cannot set write deadline to %s: %w", deadline, err)
}
// sizeBuf guarantees that the rows batch will be either fully
// read or fully discarded on the vmstorage side.
// sizeBuf is used for read optimization in vmstorage.
sizeBuf := sizeBufPool.Get()
defer sizeBufPool.Put(sizeBuf)
sizeBuf.B = encoding.MarshalUint64(sizeBuf.B[:0], uint64(len(buf)))
if _, err := bc.Write(sizeBuf.B); err != nil {
return fmt.Errorf("cannot write data size %d: %w", len(buf), err)
}
if _, err := bc.Write(buf); err != nil {
return fmt.Errorf("cannot write data with size %d: %w", len(buf), err)
}
if err := bc.Flush(); err != nil {
return fmt.Errorf("cannot flush data with size %d: %w", len(buf), err)
}
// Wait for `ack` from vmstorage.
// This guarantees that the message has been fully received by vmstorage.
deadline = time.Now().Add(timeout)
if err := bc.SetReadDeadline(deadline); err != nil {
return fmt.Errorf("cannot set read deadline for reading `ack` to vmstorage: %w", err)
}
if _, err := io.ReadFull(bc, sizeBuf.B[:1]); err != nil {
return fmt.Errorf("cannot read `ack` from vmstorage: %w", err)
}
ackResp := sizeBuf.B[0]
switch ackResp {
case 1:
// ok response, data successfully accepted by vmstorage
case 2:
// vmstorage is in readonly mode
return errStorageReadOnly
default:
return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want 1 or 2", sizeBuf.B[0])
}
return nil
}
var sizeBufPool bytesutil.ByteBufferPool
func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
c, err := sn.dialer.Dial()
if err != nil {
sn.dialErrors.Inc()
return nil, err
}
compressionLevel := 1
if *disableRPCCompression {
compressionLevel = 0
}
bc, err := handshake.VMInsertClient(c, compressionLevel)
if err != nil {
_ = c.Close()
sn.handshakeErrors.Inc()
return nil, fmt.Errorf("handshake error: %w", err)
}
return bc, nil
}
// storageNode is a client sending data to vmstorage node.
type storageNode struct {
// broken is set to non-zero if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken uint32
// isReadOnly is set to non-zero if the given vmstorage node is read only
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
isReadOnly uint32
// brLock protects br.
brLock sync.Mutex
// brCond is used for waiting for free space in br.
brCond *sync.Cond
// Buffer with data that needs to be written to the storage node.
// It must be accessed under brLock.
br bufRows
// bcLock protects bc.
bcLock sync.Mutex
// waitGroup for readOnlyChecker
readOnlyCheckerWG sync.WaitGroup
// bc is a single connection to vmstorage for data transfer.
// It must be accessed under bcLock.
bc *handshake.BufferedConn
dialer *netutil.TCPDialer
stopCh chan struct{}
// last error during dial.
lastDialErr error
// The number of dial errors to vmstorage node.
dialErrors *metrics.Counter
// The number of handshake errors to vmstorage node.
handshakeErrors *metrics.Counter
// The number of connection errors to vmstorage node.
connectionErrors *metrics.Counter
// The number of rows pushed to storageNode with push method.
rowsPushed *metrics.Counter
// The number of rows sent to vmstorage node.
rowsSent *metrics.Counter
// The number of rows dropped on overload if -dropSamplesOnOverload is set.
rowsDroppedOnOverload *metrics.Counter
// The number of rows rerouted from the given vmstorage node
// to healthy nodes when the given node was unhealthy.
rowsReroutedFromHere *metrics.Counter
// The number of rows rerouted to the given vmstorage node
// from other nodes when they were unhealthy.
rowsReroutedToHere *metrics.Counter
// The total duration spent for sending data to vmstorage node.
// This metric is useful for determining the saturation of vminsert->vmstorage link.
sendDurationSeconds *metrics.FloatCounter
}
type storageNodesBucket struct {
ms *metrics.Set
// nodesHash is used for consistently selecting a storage node by key.
nodesHash *consistentHash
// sns is a list of storage nodes.
sns []*storageNode
stopCh chan struct{}
wg *sync.WaitGroup
}
// storageNodes contains a list of vmstorage node clients.
var storageNodes atomic.Pointer[storageNodesBucket]
func getStorageNodesBucket() *storageNodesBucket {
return storageNodes.Load()
}
func setStorageNodesBucket(snb *storageNodesBucket) {
storageNodes.Store(snb)
}
// Init initializes vmstorage nodes' connections to the given addrs.
//
// hashSeed is used for changing the distribution of input time series among addrs.
//
// Call MustStop when the initialized vmstorage connections are no longer needed.
func Init(addrs []string, hashSeed uint64) {
snb := initStorageNodes(addrs, hashSeed)
setStorageNodesBucket(snb)
}
// MustStop stops netstorage.
func MustStop() {
snb := getStorageNodesBucket()
mustStopStorageNodes(snb)
}
func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
if len(addrs) == 0 {
logger.Panicf("BUG: addrs must be non-empty")
}
ms := metrics.NewSet()
nodesHash := newConsistentHash(addrs, hashSeed)
sns := make([]*storageNode, 0, len(addrs))
stopCh := make(chan struct{})
for _, addr := range addrs {
if _, _, err := net.SplitHostPort(addr); err != nil {
// Automatically add missing port.
addr += ":8400"
}
sn := &storageNode{
dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout),
stopCh: stopCh,
dialErrors: ms.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
handshakeErrors: ms.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
connectionErrors: ms.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
rowsPushed: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
rowsSent: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
rowsDroppedOnOverload: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_dropped_on_overload_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedFromHere: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedToHere: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
sendDurationSeconds: ms.NewFloatCounter(fmt.Sprintf(`vm_rpc_send_duration_seconds_total{name="vminsert", addr=%q}`, addr)),
}
sn.brCond = sync.NewCond(&sn.brLock)
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 {
sn.brLock.Lock()
n := sn.br.rows
sn.brLock.Unlock()
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 {
sn.brLock.Lock()
n := len(sn.br.buf)
sn.brLock.Unlock()
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_reachable{name="vminsert", addr=%q}`, addr), func() float64 {
if atomic.LoadUint32(&sn.broken) != 0 {
return 0
}
return 1
})
_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_read_only{name="vminsert", addr=%q}`, addr), func() float64 {
return float64(atomic.LoadUint32(&sn.isReadOnly))
})
sns = append(sns, sn)
}
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(sns)
if maxBufSizePerStorageNode > consts.MaxInsertPacketSizeForVMInsert {
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
}
metrics.RegisterSet(ms)
var wg sync.WaitGroup
snb := &storageNodesBucket{
ms: ms,
nodesHash: nodesHash,
sns: sns,
stopCh: stopCh,
wg: &wg,
}
for idx, sn := range sns {
wg.Add(1)
go func(sn *storageNode, idx int) {
sn.run(snb, idx)
wg.Done()
}(sn, idx)
}
return snb
}
func mustStopStorageNodes(snb *storageNodesBucket) {
close(snb.stopCh)
for _, sn := range snb.sns {
sn.brCond.Broadcast()
}
snb.wg.Wait()
metrics.UnregisterSet(snb.ms)
snb.ms.UnregisterAllMetrics()
}
// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
//
// The function blocks until src is fully re-routed.
func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
reroutesTotal.Inc()
rowsProcessed := 0
var idxsExclude, idxsExcludeNew []int
nodesHash := snb.nodesHash
sns := snb.sns
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err)
}
rowBuf := src[:len(src)-len(tail)]
src = tail
reroutedRowsProcessed.Inc()
h := xxhash.Sum64(mr.MetricNameRaw)
mr.ResetX()
var sn *storageNode
for {
idx := nodesHash.getNodeIdx(h, idxsExclude)
sn = sns[idx]
if sn.isReady() {
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
}
if *disableRerouting {
if !sn.sendBufMayBlock(rowBuf) {
return rowsProcessed, fmt.Errorf("graceful shutdown started")
}
rowsProcessed++
if sn != snSource {
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
continue
}
again:
if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++
if sn != snSource {
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
continue
}
// If the re-routing is enabled, then try sending the row to another storage node.
idxsExcludeNew = getNotReadyStorageNodeIdxs(snb, idxsExcludeNew[:0], sn)
idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
snNew := sns[idx]
if snNew.trySendBuf(rowBuf, 1) {
rowsProcessed++
if snNew != snSource {
snSource.rowsReroutedFromHere.Inc()
snNew.rowsReroutedToHere.Inc()
}
continue
}
// The row cannot be sent to both snSource and the re-routed sn without blocking.
// Sleep for a while and try sending the row to snSource again.
time.Sleep(100 * time.Millisecond)
goto again
}
return rowsProcessed, nil
}
// reouteRowsToFreeStorageNodes re-routes src from snSource to other storage nodes.
//
// It is expected that snSource has no enough buffer for sending src.
// It is expected than *dsableRerouting isn't set when calling this function.
func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
if *disableRerouting {
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
}
reroutesTotal.Inc()
rowsProcessed := 0
var idxsExclude []int
nodesHash := snb.nodesHash
sns := snb.sns
idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err)
}
rowBuf := src[:len(src)-len(tail)]
src = tail
reroutedRowsProcessed.Inc()
h := xxhash.Sum64(mr.MetricNameRaw)
mr.ResetX()
// Try sending the row to snSource in order to minimize re-routing.
again:
if snSource.trySendBuf(rowBuf, 1) {
rowsProcessed++
continue
}
// The row couldn't be sent to snSrouce. Try re-routing it to other nodes.
var sn *storageNode
for {
idx := nodesHash.getNodeIdx(h, idxsExclude)
sn = sns[idx]
if sn.isReady() {
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
}
if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
continue
}
// The row cannot be sent to both snSource and the re-routed sn without blocking.
// Sleep for a while and try sending the row to snSource again.
time.Sleep(100 * time.Millisecond)
goto again
}
return rowsProcessed, nil
}
func getNotReadyStorageNodeIdxsBlocking(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
sns := snb.sns
if len(dst) < len(sns) {
return dst
}
noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
for {
time.Sleep(time.Second)
dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
if availableNodes := len(sns) - len(dst); availableNodes > 0 {
storageNodesBecameAvailableLogger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
return dst
}
}
}
var storageNodesBecameAvailableLogger = logger.WithThrottler("storageNodesBecameAvailable", 5*time.Second)
var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*time.Second)
func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
dst = dst[:0]
for i, sn := range snb.sns {
if sn == snExtra || !sn.isReady() {
dst = append(dst, i)
}
}
return dst
}
func (sn *storageNode) trySendBuf(buf []byte, rows int) bool {
sent := false
sn.brLock.Lock()
if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows += rows
sent = true
}
sn.brLock.Unlock()
return sent
}
func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
sn.brLock.Lock()
for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode {
select {
case <-sn.stopCh:
sn.brLock.Unlock()
return false
default:
}
sn.brCond.Wait()
}
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows++
sn.brLock.Unlock()
return true
}
func (sn *storageNode) readOnlyChecker() {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-sn.stopCh:
return
case <-ticker.C:
sn.checkReadOnlyMode()
}
}
}
func (sn *storageNode) checkReadOnlyMode() {
if atomic.LoadUint32(&sn.isReadOnly) == 0 {
// fast path - the sn isn't in readonly mode
return
}
// Check whether the storage remains in readonly mode
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc == nil {
return
}
// send nil buff to check ack response from storage
err := sendToConn(sn.bc, nil)
if err == nil {
// The storage switched from readonly to non-readonly mode
atomic.StoreUint32(&sn.isReadOnly, 0)
return
}
if !errors.Is(err, errStorageReadOnly) {
logger.Errorf("cannot check storage readonly mode for -storageNode=%q: %s", sn.dialer.Addr(), err)
}
}
var (
maxBufSizePerStorageNode int
reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`)
reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`)
rowsIncompletelyReplicatedTotal = metrics.NewCounter(`vm_rpc_rows_incompletely_replicated_total{name="vminsert"}`)
)