app/vminsert/netstorage: make sure that the the data is always replicated among -replicationFactor vmstorage nodes

Previously vminsert could write multiple copies of the data to a single vmstorage node when the ingestion rate
exceeds the maximum throughput for connections to vmstorage nodes.
This commit is contained in:
Aliaksandr Valialkin 2020-05-28 19:57:05 +03:00
parent 981caa6f0b
commit 7a61357b5d
2 changed files with 131 additions and 97 deletions

View file

@ -1,7 +1,6 @@
package netstorage
import (
"flag"
"fmt"
"net/http"
@ -15,10 +14,6 @@ import (
jump "github.com/lithammer/go-jump-consistent-hash"
)
var replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
// InsertCtx is a generic context for inserting data.
//
// InsertCtx.Reset must be called before the first usage.
@ -120,38 +115,20 @@ func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, time
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
idx := storageNodeIdx
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len(storageNodes) {
replicas = len(storageNodes)
}
for {
br := &ctx.bufRowss[idx]
sn := storageNodes[idx]
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
if len(bufNew) >= maxBufSizePerStorageNode {
// Send buf to storageNode, since it is too big.
if err := br.pushTo(sn); err != nil {
return err
}
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)
} else {
br.buf = bufNew
}
br.rows++
replicas--
if replicas == 0 {
return nil
}
idx++
if idx >= len(storageNodes) {
idx = 0
br := &ctx.bufRowss[storageNodeIdx]
sn := storageNodes[storageNodeIdx]
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
if len(bufNew) >= maxBufSizePerStorageNode {
// Send buf to storageNode, since it is too big.
if err := br.pushTo(sn); err != nil {
return err
}
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)
} else {
br.buf = bufNew
}
br.rows++
return nil
}
// FlushBufs flushes ctx bufs to remote storage nodes.

View file

@ -21,7 +21,12 @@ import (
jump "github.com/lithammer/go-jump-consistent-hash"
)
var disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
var (
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
)
func (sn *storageNode) isBroken() bool {
return atomic.LoadUint32(&sn.broken) != 0
@ -78,12 +83,18 @@ var closedCh = func() <-chan struct{} {
return ch
}()
func (sn *storageNode) run(stopCh <-chan struct{}) {
func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len(storageNodes) {
replicas = len(storageNodes)
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var br bufRows
var bc *handshake.BufferedConn
var err error
var waitCh <-chan struct{}
mustStop := false
for !mustStop {
@ -108,44 +119,94 @@ func (sn *storageNode) run(stopCh <-chan struct{}) {
sn.br, br = br, sn.br
sn.brLock.Unlock()
}
if bc == nil {
bc, err = sn.dial()
if err != nil {
// Mark sn as broken in order to prevent sending additional data to it until it is recovered.
atomic.StoreUint32(&sn.broken, 1)
if len(br.buf) == 0 {
continue
}
logger.Warnf("re-routing %d bytes with %d rows to other storage nodes because cannot dial storageNode %q: %s",
len(br.buf), br.rows, sn.dialer.Addr(), err)
if addToReroutedBufNonblock(br.buf, br.rows) {
sn.rowsReroutedFromHere.Add(br.rows)
br.reset()
}
continue
}
}
if err = sendToConn(bc, br.buf); err == nil {
// Successfully sent buf to bc. Remove broken flag from sn.
atomic.StoreUint32(&sn.broken, 0)
sn.rowsSent.Add(br.rows)
br.reset()
if len(br.buf) == 0 {
// Nothing to send.
continue
}
// Couldn't flush buf to sn. Mark sn as broken
// and try re-routing buf to healthy vmstorage nodes.
if err = bc.Close(); err != nil {
logger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
// continue executing the code below.
// Send br to replicas storageNodes starting from snIdx.
if !sendBufToReplicas(&br, snIdx, replicas) {
// do not reset br in the hope it will be sent next time.
continue
}
bc = nil
sn.connectionErrors.Inc()
atomic.StoreUint32(&sn.broken, 1)
if addToReroutedBufNonblock(br.buf, br.rows) {
sn.rowsReroutedFromHere.Add(br.rows)
br.reset()
br.reset()
}
}
func sendBufToReplicas(br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]bool, replicas)
for i := 0; i < replicas; i++ {
idx := snIdx + i
attempts := 0
for {
attempts++
if attempts > len(storageNodes) {
if i == 0 {
// The data wasn't replicated at all.
rowsLostTotal.Add(br.rows)
logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon", len(br.buf), br.rows)
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows)
logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(storageNodes) {
idx = 0
}
sn := storageNodes[idx]
idx++
if usedStorageNodes[sn] {
// The br has been already replicated to sn. Skip it.
continue
}
if !sn.sendBufRows(br) {
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
usedStorageNodes[sn] = true
break
}
}
return true
}
func (sn *storageNode) sendBufRows(br *bufRows) bool {
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc == nil {
bc, err := sn.dial()
if err != nil {
// Mark sn as broken in order to prevent sending additional data to it until it is recovered.
atomic.StoreUint32(&sn.broken, 1)
logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err)
return false
}
sn.bc = bc
}
err := sendToConn(sn.bc, br.buf)
if err == nil {
// Successfully sent buf to bc. Remove broken flag from sn.
atomic.StoreUint32(&sn.broken, 0)
sn.rowsSent.Add(br.rows)
return true
}
// Couldn't flush buf to sn. Mark sn as broken.
logger.Warnf("cannot send %d bytes with %d rows to %q: %s; re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
if err = sn.bc.Close(); err != nil {
logger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.bc = nil
sn.connectionErrors.Inc()
atomic.StoreUint32(&sn.broken, 1)
return false
}
func sendToConn(bc *handshake.BufferedConn, buf []byte) error {
@ -272,8 +333,16 @@ type storageNode struct {
brLock sync.Mutex
// Buffer with data that needs to be written to the storage node.
// It must be accessed under brLock.
br bufRows
// bcLock protects bc.
bcLock sync.Mutex
// bc is a single connection to vmstorage for data transfer.
// It must be accessed under bcLock.
bc *handshake.BufferedConn
dialer *netutil.TCPDialer
// The number of dial errors to vmstorage node.
@ -322,6 +391,7 @@ func InitStorageNodes(addrs []string) {
logger.Panicf("BUG: too much addresses: %d; max supported %d addresses", len(addrs), 255)
}
storageNodes = storageNodes[:0]
for _, addr := range addrs {
sn := &storageNode{
dialer: netutil.NewTCPDialer("vminsert", addr),
@ -347,11 +417,6 @@ func InitStorageNodes(addrs []string) {
return float64(n)
})
storageNodes = append(storageNodes, sn)
storageNodesWG.Add(1)
go func(addr string) {
sn.run(storageNodesStopCh)
storageNodesWG.Done()
}(addr)
}
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(storageNodes)
@ -365,6 +430,15 @@ func InitStorageNodes(addrs []string) {
if reroutedBufMaxSize > maxBufSizePerStorageNode*len(storageNodes) {
reroutedBufMaxSize = maxBufSizePerStorageNode * len(storageNodes)
}
for idx, sn := range storageNodes {
storageNodesWG.Add(1)
go func(sn *storageNode, idx int) {
sn.run(storageNodesStopCh, idx)
storageNodesWG.Done()
}(sn, idx)
}
rerouteWorkerWG.Add(1)
go func() {
rerouteWorker(rerouteWorkerStopCh)
@ -422,24 +496,6 @@ func addToReroutedBuf(buf []byte, rows int) error {
return nil
}
// addToReroutedBufNonblock adds buf to reroutedBR.
//
// It returns true if buf has been successfully added to reroutedBR.
func addToReroutedBufNonblock(buf []byte, rows int) bool {
if len(buf) > reroutedBufMaxSize {
logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize)
}
reroutedBRLock.Lock()
ok := len(reroutedBR.buf)+len(buf) <= reroutedBufMaxSize
if ok {
reroutedBR.buf = append(reroutedBR.buf, buf...)
reroutedBR.rows += rows
reroutesTotal.Inc()
}
reroutedBRLock.Unlock()
return ok
}
func getHealthyStorageNodesCount() int {
n := 0
for _, sn := range storageNodes {
@ -550,6 +606,7 @@ var (
return float64(n)
})
rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`)
rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`)
rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`)
rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`)
rowsIncompletelyReplicatedTotal = metrics.NewCounter(`vm_rpc_rows_incompletely_replicated_total{name="vminsert"}`)
)