VictoriaMetrics/app/vminsert/netstorage/netstorage.go

461 lines
13 KiB
Go

package netstorage
import (
"flag"
"fmt"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
xxhash "github.com/cespare/xxhash/v2"
)
var disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
// push pushes buf to sn.
//
// It falls back to sending data to another vmstorage node if sn is currently
// unavailable.
//
// rows is the number of rows in the buf.
func (sn *storageNode) push(buf []byte, rows int) error {
if len(buf) > consts.MaxInsertPacketSize {
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), consts.MaxInsertPacketSize)
}
sn.rowsPushed.Add(rows)
sn.mu.Lock()
defer sn.mu.Unlock()
if sn.broken {
// The vmstorage node is broken. Re-route buf to healthy vmstorage nodes.
if err := addToReroutedBuf(buf, rows); err != nil {
rowsLostTotal.Add(rows)
return err
}
sn.rowsReroutedFromHere.Add(rows)
return nil
}
if len(sn.buf)+len(buf) <= consts.MaxInsertPacketSize {
// Fast path: the buf contents fits sn.buf.
sn.buf = append(sn.buf, buf...)
sn.rows += rows
return nil
}
// Slow path: the buf contents doesn't fit sn.buf.
// Flush sn.buf to vmstorage and then add buf to sn.buf.
if err := sn.flushBufLocked(); err != nil {
// Failed to flush or re-route sn.buf to vmstorage nodes.
// The sn.buf is already dropped by flushBufLocked.
// Drop buf too, since there is litte sense in trying to rescue it.
rowsLostTotal.Add(rows)
return err
}
// Successful flush.
sn.buf = append(sn.buf, buf...)
sn.rows += rows
return nil
}
func (sn *storageNode) sendReroutedRow(buf []byte) error {
sn.mu.Lock()
defer sn.mu.Unlock()
if sn.broken {
return errBrokenStorageNode
}
if len(sn.buf)+len(buf) > consts.MaxInsertPacketSize {
return fmt.Errorf("cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes", len(sn.buf)+len(buf), consts.MaxInsertPacketSize)
}
sn.buf = append(sn.buf, buf...)
sn.rows++
return nil
}
var errBrokenStorageNode = fmt.Errorf("the vmstorage node is temporarily broken")
func (sn *storageNode) flushBufLocked() error {
if err := sn.sendBufLocked(sn.buf); err == nil {
// Successful flush. Remove broken flag.
sn.broken = false
sn.rowsSent.Add(sn.rows)
sn.buf = sn.buf[:0]
sn.rows = 0
return nil
}
// Couldn't flush sn.buf to vmstorage. Mark sn as broken
// and try re-routing sn.buf to healthy vmstorage nodes.
sn.broken = true
err := addToReroutedBuf(sn.buf, sn.rows)
if err != nil {
rowsLostTotal.Add(sn.rows)
}
sn.buf = sn.buf[:0]
sn.rows = 0
return err
}
func (sn *storageNode) sendBufLocked(buf []byte) error {
// sizeBuf guarantees that the rows batch will be either fully
// read or fully discarded on the vmstorage side.
// sizeBuf is used for read optimization in vmstorage.
if sn.bc == nil {
if err := sn.dial(); err != nil {
return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err)
}
}
if len(buf) == 0 {
return nil
}
deadline := time.Now().Add(30 * time.Second)
if err := sn.bc.SetWriteDeadline(deadline); err != nil {
sn.closeBrokenConn()
return fmt.Errorf("cannot set write deadline to %s: %s", deadline, err)
}
sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf)))
if _, err := sn.bc.Write(sn.sizeBuf); err != nil {
sn.closeBrokenConn()
return fmt.Errorf("cannot write data size %d: %s", len(buf), err)
}
if _, err := sn.bc.Write(buf); err != nil {
sn.closeBrokenConn()
return fmt.Errorf("cannot write data: %s", err)
}
if err := sn.bc.Flush(); err != nil {
sn.closeBrokenConn()
return fmt.Errorf("cannot flush data: %s", err)
}
return nil
}
func (sn *storageNode) dial() error {
c, err := sn.dialer.Dial()
if err != nil {
sn.dialErrors.Inc()
return err
}
compressionLevel := 1
if *disableRPCCompression {
compressionLevel = 0
}
bc, err := handshake.VMInsertClient(c, compressionLevel)
if err != nil {
_ = c.Close()
sn.handshakeErrors.Inc()
return fmt.Errorf("handshake error: %s", err)
}
sn.bc = bc
return nil
}
func (sn *storageNode) closeBrokenConn() {
_ = sn.bc.Close()
sn.bc = nil
sn.connectionErrors.Inc()
}
func (sn *storageNode) run(stopCh <-chan struct{}) {
t := time.NewTimer(time.Second)
mustStop := false
for !mustStop {
select {
case <-stopCh:
mustStop = true
// Make sure flushBufLocked is called last time before returning
// in order to send the remaining bits of data.
case <-t.C:
}
sn.mu.Lock()
if err := sn.flushBufLocked(); err != nil {
sn.closeBrokenConn()
logger.Errorf("cannot flush data to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.mu.Unlock()
t.Reset(time.Second)
}
t.Stop()
}
func rerouteWorker(stopCh <-chan struct{}) {
t := time.NewTimer(time.Second)
var buf []byte
mustStop := false
for !mustStop {
select {
case <-stopCh:
mustStop = true
// Make sure spreadReroutedBufToStorageNodes is called last time before returning
// in order to reroute the remaining data to healthy vmstorage nodes.
case <-t.C:
}
var err error
buf, err = spreadReroutedBufToStorageNodes(buf[:0])
if err != nil {
rerouteErrors.Inc()
logger.Errorf("cannot reroute data among healthy vmstorage nodes: %s", err)
}
t.Reset(time.Second)
}
t.Stop()
}
// storageNode is a client sending data to vmstorage node.
type storageNode struct {
mu sync.Mutex
// Buffer with data that needs to be written to vmstorage node.
buf []byte
// The number of rows buf contains at the moment.
rows int
// Temporary buffer for encoding marshaled buf size.
sizeBuf []byte
// broken is set to true if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken bool
dialer *netutil.TCPDialer
bc *handshake.BufferedConn
// The number of dial errors to vmstorage node.
dialErrors *metrics.Counter
// The number of handshake errors to vmstorage node.
handshakeErrors *metrics.Counter
// The number of connection errors to vmstorage node.
connectionErrors *metrics.Counter
// The number of rows pushed to storageNode with push method.
rowsPushed *metrics.Counter
// The number of rows sent to vmstorage node.
rowsSent *metrics.Counter
// The number of rows rerouted from the given vmstorage node
// to healthy nodes when the given node was unhealthy.
rowsReroutedFromHere *metrics.Counter
// The number of rows rerouted to the given vmstorage node
// from other nodes when they were unhealthy.
rowsReroutedToHere *metrics.Counter
}
// storageNodes contains a list of vmstorage node clients.
var storageNodes []*storageNode
var (
storageNodesWG sync.WaitGroup
rerouteWorkerWG sync.WaitGroup
)
var (
storageNodesStopCh = make(chan struct{})
rerouteWorkerStopCh = make(chan struct{})
)
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
func InitStorageNodes(addrs []string) {
if len(addrs) == 0 {
logger.Panicf("BUG: addrs must be non-empty")
}
if len(addrs) > 255 {
logger.Panicf("BUG: too much addresses: %d; max supported %d addresses", len(addrs), 255)
}
for _, addr := range addrs {
sn := &storageNode{
dialer: netutil.NewTCPDialer("vminsert", addr),
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
connectionErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
rowsPushed: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
rowsSent: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
}
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 {
sn.mu.Lock()
n := sn.rows
sn.mu.Unlock()
return float64(n)
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 {
sn.mu.Lock()
n := len(sn.buf)
sn.mu.Unlock()
return float64(n)
})
storageNodes = append(storageNodes, sn)
storageNodesWG.Add(1)
go func(addr string) {
sn.run(storageNodesStopCh)
storageNodesWG.Done()
}(addr)
}
reroutedBufMaxSize = memory.Allowed() / 8
rerouteWorkerWG.Add(1)
go func() {
rerouteWorker(rerouteWorkerStopCh)
rerouteWorkerWG.Done()
}()
}
// Stop gracefully stops netstorage.
func Stop() {
close(rerouteWorkerStopCh)
rerouteWorkerWG.Wait()
close(storageNodesStopCh)
storageNodesWG.Wait()
}
func addToReroutedBuf(buf []byte, rows int) error {
reroutedLock.Lock()
defer reroutedLock.Unlock()
if len(reroutedBuf)+len(buf) > reroutedBufMaxSize {
reroutedBufOverflows.Inc()
return fmt.Errorf("%d rows dropped because of reroutedBuf overflows %d bytes", rows, reroutedBufMaxSize)
}
reroutedBuf = append(reroutedBuf, buf...)
reroutedRows += rows
reroutesTotal.Inc()
return nil
}
func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
healthyStorageNodes := getHealthyStorageNodes()
if len(healthyStorageNodes) == 0 {
// No more vmstorage nodes to write data to.
return swapBuf, fmt.Errorf("all the storage nodes are unhealthy")
}
reroutedLock.Lock()
reroutedBuf, swapBuf = swapBuf[:0], reroutedBuf
rows := reroutedRows
reroutedRows = 0
reroutedLock.Unlock()
if len(swapBuf) == 0 {
// Nothing to re-route.
return swapBuf, nil
}
var mr storage.MetricRow
src := swapBuf
rowsProcessed := 0
for len(src) > 0 {
tail, err := mr.Unmarshal(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal recently marshaled MetricRow: %s", err)
}
rowBuf := src[:len(src)-len(tail)]
src = tail
// Use non-consistent hashing instead of jump hash in order to re-route rows
// equally among healthy vmstorage nodes.
// This should spread the increased load among healthy vmstorage nodes.
h := xxhash.Sum64(mr.MetricNameRaw)
idx := h % uint64(len(healthyStorageNodes))
attempts := 0
for {
sn := healthyStorageNodes[idx]
err := sn.sendReroutedRow(rowBuf)
if err == nil {
sn.rowsReroutedToHere.Inc()
break
}
// Cannot send data to sn. Try sending to the next vmstorage node.
idx++
if idx >= uint64(len(healthyStorageNodes)) {
idx = 0
}
attempts++
if attempts == len(healthyStorageNodes) {
// There are no healthy nodes.
// Try returning the remaining data to reroutedBuf if it has enough free space.
rowsRemaining := rows - rowsProcessed
recovered := false
reroutedLock.Lock()
if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize {
swapBuf = append(swapBuf[:0], rowBuf...)
swapBuf = append(swapBuf, tail...)
swapBuf = append(swapBuf, reroutedBuf...)
reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0]
reroutedRows += rowsRemaining
recovered = true
}
reroutedLock.Unlock()
if recovered {
return swapBuf, nil
}
rowsLostTotal.Add(rowsRemaining)
return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s", len(storageNodes), rowsRemaining, err)
}
}
rowsProcessed++
}
if rowsProcessed != rows {
logger.Panicf("BUG: unexpected number of rows processed; got %d; want %d", rowsProcessed, rows)
}
reroutedRowsProcessed.Add(rowsProcessed)
return swapBuf, nil
}
var (
reroutedLock sync.Mutex
reroutedBuf []byte
reroutedRows int
reroutedBufMaxSize int
reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`)
reroutedBufOverflows = metrics.NewCounter(`vm_rpc_rerouted_buf_overflows_total{name="vminsert"}`)
reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`)
_ = metrics.NewGauge(`vm_rpc_rerouted_rows_pending{name="vminsert"}`, func() float64 {
reroutedLock.Lock()
n := reroutedRows
reroutedLock.Unlock()
return float64(n)
})
_ = metrics.NewGauge(`vm_rpc_rerouted_buf_pending_bytes{name="vminsert"}`, func() float64 {
reroutedLock.Lock()
n := len(reroutedBuf)
reroutedLock.Unlock()
return float64(n)
})
rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`)
rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`)
)
func getHealthyStorageNodes() []*storageNode {
sns := make([]*storageNode, 0, len(storageNodes)-1)
for _, sn := range storageNodes {
sn.mu.Lock()
if !sn.broken {
sns = append(sns, sn)
}
sn.mu.Unlock()
}
return sns
}