app/vminsert: add -disableRerouting command-line flag for disabling re-routing if some vmstorage nodes have lower performance than the others

Refactor the rerouting mechanism and make it more resilient to cases when some of vmstorage nodes are temporarily unavailable.

Reduce the probability of rerouting storm.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1054
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1165
This commit is contained in:
Aliaksandr Valialkin 2021-06-04 04:33:49 +03:00
parent fc2565b4ee
commit 1c09e71f5b
4 changed files with 111 additions and 356 deletions

View file

@ -23,10 +23,11 @@ import (
)
var (
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
disableRerouting = flag.Bool(`disableRerouting`, false, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. By default the re-routing is enabled. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster")
)
func (sn *storageNode) isBroken() bool {
@ -49,16 +50,27 @@ func (sn *storageNode) push(buf []byte, rows int) error {
}
sn.rowsPushed.Add(rows)
sn.brLock.Lock()
again:
select {
case <-storageNodesStopCh:
sn.brLock.Unlock()
return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows)
default:
}
if sn.isBroken() {
// The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes.
if err := addToReroutedBufMayBlock(buf, rows); err != nil {
if len(storageNodes) == 1 {
// There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
// The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes even if *disableRerouting==true.
if err := rerouteRowsMayBlock(sn, buf, rows); err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
}
sn.rowsReroutedFromHere.Add(rows)
return nil
}
sn.brLock.Lock()
if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
// Fast path: the buf contents fits sn.buf.
sn.br.buf = append(sn.br.buf, buf...)
@ -66,15 +78,18 @@ func (sn *storageNode) push(buf []byte, rows int) error {
sn.brLock.Unlock()
return nil
}
if *disableRerouting || len(storageNodes) == 1 {
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
// Slow path: the buf contents doesn't fit sn.buf.
// The buf contents doesn't fit sn.buf.
// This means that the current vmstorage is slow or will become broken soon.
// Re-route buf to healthy vmstorage nodes.
if err := addToReroutedBufMayBlock(buf, rows); err != nil {
// Spread buf among all the vmstorage nodes.
if err := rerouteRowsMayBlock(sn, buf, rows); err != nil {
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
}
sn.rowsReroutedFromHere.Add(rows)
return nil
}
@ -118,6 +133,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
}
sn.brLock.Lock()
sn.br, br = br, sn.br
sn.brCond.Broadcast()
sn.brLock.Unlock()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
@ -200,6 +216,7 @@ func (sn *storageNode) checkHealth() {
bc, err := sn.dial()
if err != nil {
atomic.StoreUint32(&sn.broken, 1)
sn.brCond.Broadcast()
if sn.lastDialErr == nil {
// Log the error only once.
sn.lastDialErr = err
@ -211,6 +228,7 @@ func (sn *storageNode) checkHealth() {
sn.lastDialErr = nil
sn.bc = bc
atomic.StoreUint32(&sn.broken, 0)
sn.brCond.Broadcast()
}
func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
@ -240,6 +258,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
}
sn.bc = nil
atomic.StoreUint32(&sn.broken, 1)
sn.brCond.Broadcast()
sn.connectionErrors.Inc()
return false
}
@ -310,53 +329,11 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
return bc, nil
}
func rerouteWorker(stopCh <-chan struct{}) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
var waitCh <-chan struct{}
mustStop := false
for !mustStop {
reroutedBRLock.Lock()
bufLen := len(reroutedBR.buf)
reroutedBRLock.Unlock()
waitCh = nil
if bufLen > 0 {
// Do not sleep if reroutedBR contains data to process.
waitCh = closedCh
}
select {
case <-stopCh:
mustStop = true
// Make sure reroutedBR is re-routed last time before returning
// in order to reroute the remaining data to healthy vmstorage nodes.
case <-ticker.C:
case <-waitCh:
}
reroutedBRLock.Lock()
reroutedBR, br = br, reroutedBR
reroutedBRLock.Unlock()
reroutedBRCond.Broadcast()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
if len(br.buf) == 0 {
// Nothing to re-route.
continue
}
spreadReroutedBufToStorageNodesBlocking(stopCh, &br)
br.reset()
}
// Notify all the blocked addToReroutedBufMayBlock callers, so they may finish the work.
reroutedBRCond.Broadcast()
}
// storageNode is a client sending data to vmstorage node.
type storageNode struct {
// The last time for the re-routing.
lastRerouteTime uint64
// broken is set to non-zero if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken uint32
@ -364,6 +341,9 @@ type storageNode struct {
// brLock protects br.
brLock sync.Mutex
// brCond is used for waiting for free space in br.
brCond *sync.Cond
// Buffer with data that needs to be written to the storage node.
// It must be accessed under brLock.
br bufRows
@ -407,15 +387,9 @@ type storageNode struct {
// storageNodes contains a list of vmstorage node clients.
var storageNodes []*storageNode
var (
storageNodesWG sync.WaitGroup
rerouteWorkerWG sync.WaitGroup
)
var storageNodesWG sync.WaitGroup
var (
storageNodesStopCh = make(chan struct{})
rerouteWorkerStopCh = make(chan struct{})
)
var storageNodesStopCh = make(chan struct{})
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
func InitStorageNodes(addrs []string) {
@ -439,6 +413,7 @@ func InitStorageNodes(addrs []string) {
rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
}
sn.brCond = sync.NewCond(&sn.brLock)
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 {
sn.brLock.Lock()
n := sn.br.rows
@ -464,13 +439,6 @@ func InitStorageNodes(addrs []string) {
if maxBufSizePerStorageNode > consts.MaxInsertPacketSize {
maxBufSizePerStorageNode = consts.MaxInsertPacketSize
}
reroutedBufMaxSize = memory.Allowed() / 16
if reroutedBufMaxSize < maxBufSizePerStorageNode {
reroutedBufMaxSize = maxBufSizePerStorageNode
}
if reroutedBufMaxSize > maxBufSizePerStorageNode*len(storageNodes) {
reroutedBufMaxSize = maxBufSizePerStorageNode * len(storageNodes)
}
for idx, sn := range storageNodes {
storageNodesWG.Add(1)
@ -479,208 +447,117 @@ func InitStorageNodes(addrs []string) {
storageNodesWG.Done()
}(sn, idx)
}
rerouteWorkerWG.Add(1)
go func() {
rerouteWorker(rerouteWorkerStopCh)
rerouteWorkerWG.Done()
}()
}
// Stop gracefully stops netstorage.
func Stop() {
close(rerouteWorkerStopCh)
rerouteWorkerWG.Wait()
close(storageNodesStopCh)
for _, sn := range storageNodes {
sn.brCond.Broadcast()
}
storageNodesWG.Wait()
}
// addToReroutedBufMayBlock adds buf to reroutedBR.
// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes excluding the snExclude.
//
// It waits until the reroutedBR has enough space for buf or if Stop is called.
// It waits until healthy storage nodes have enough space for the re-routed rows.
// This guarantees backpressure if the ingestion rate exceeds vmstorage nodes'
// ingestion rate capacity.
//
// It returns non-nil error only in the following cases:
//
// - if all the storage nodes are unhealthy.
// - if Stop is called.
func addToReroutedBufMayBlock(buf []byte, rows int) error {
if len(buf) > reroutedBufMaxSize {
logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize)
// It returns non-nil error only if Stop is called.
func rerouteRowsMayBlock(snExclude *storageNode, buf []byte, rows int) error {
if len(storageNodes) < 2 {
logger.Panicf("BUG: re-routing can work only if at least 2 storage nodes are configured; got %d nodes", len(storageNodes))
}
reroutedBRLock.Lock()
defer reroutedBRLock.Unlock()
for len(reroutedBR.buf)+len(buf) > reroutedBufMaxSize {
if getHealthyStorageNodesCount() == 0 {
rowsLostTotal.Add(rows)
return fmt.Errorf("all the vmstorage nodes are unavailable and reroutedBR has no enough space for storing %d bytes; "+
"only %d free bytes left out of %d bytes in reroutedBR",
len(buf), reroutedBufMaxSize-len(reroutedBR.buf), reroutedBufMaxSize)
}
select {
case <-rerouteWorkerStopCh:
rowsLostTotal.Add(rows)
return fmt.Errorf("rerouteWorker cannot send the data since it is stopped")
default:
}
// The reroutedBR.buf has no enough space for len(buf). Wait while the reroutedBR.buf is sent by rerouteWorker.
reroutedBufWaits.Inc()
reroutedBRCond.Wait()
}
reroutedBR.buf = append(reroutedBR.buf, buf...)
reroutedBR.rows += rows
reroutesTotal.Inc()
return nil
}
func getHealthyStorageNodesCount() int {
n := 0
for _, sn := range storageNodes {
if !sn.isBroken() {
n++
}
}
return n
}
func getHealthyStorageNodes() []*storageNode {
sns := make([]*storageNode, 0, len(storageNodes)-1)
for _, sn := range storageNodes {
if !sn.isBroken() {
sns = append(sns, sn)
}
}
return sns
}
func getHealthyStorageNodesBlocking(stopCh <-chan struct{}) []*storageNode {
// Wait for at least a single healthy storage node.
for {
// Wake up goroutines blocked at addToReroutedBufMayBlock, so they can return error to the caller if reroutedBR is full.
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
reroutedBRCond.Broadcast()
sns := getHealthyStorageNodes()
if len(sns) > 0 {
return sns
}
// There is no healthy storage nodes.
// Wait for a while until such nodes appear.
t := timerpool.Get(time.Second)
select {
case <-stopCh:
timerpool.Put(t)
return nil
case <-t.C:
timerpool.Put(t)
}
}
}
func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows) {
var mr storage.MetricRow
rowsProcessed := 0
defer func() {
reroutedRowsProcessed.Add(rowsProcessed)
}()
sns := getHealthyStorageNodesBlocking(stopCh)
if len(sns) == 0 {
// stopCh is notified to stop.
return
sns := getStorageNodesForRerouting(snExclude)
if len(sns) == 1 {
// Fast path: only a single storage node is available for re-routing.
sn := sns[0]
if sn.sendBufMayBlock(buf) {
if sn != snExclude {
snExclude.rowsReroutedFromHere.Add(rows)
sn.rowsReroutedToHere.Add(rows)
}
return nil
}
}
src := br.buf
reroutesTotal.Inc()
atomic.StoreUint64(&snExclude.lastRerouteTime, fasttime.UnixTimestamp())
src := buf
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal MetricRow from reroutedBR.buf: %s", err)
logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err)
}
rowBuf := src[:len(src)-len(tail)]
src = tail
rowsProcessed++
var h uint64
if len(storageNodes) > 1 {
// Do not use jump.Hash(h, int32(len(sns))) here,
// since this leads to uneven distribution of rerouted rows among sns -
// they all go to the original or to the next sn.
h = xxhash.Sum64(mr.MetricNameRaw)
}
reroutedRowsProcessed.Inc()
h := xxhash.Sum64(mr.MetricNameRaw)
mr.ResetX()
for {
idx := h % uint64(len(sns))
sn := sns[idx]
if sn.sendReroutedRow(rowBuf) {
if sn.sendBufMayBlock(rowBuf) {
// The row has been successfully re-routed to sn.
if sn != snExclude {
snExclude.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
break
}
// The row cannot be re-routed to sn. Wait for a while and try again.
// Do not re-route the row to the remaining storage nodes,
// since this may result in increased resource usage (CPU, memory, disk IO) on these nodes,
// because they'll have to accept and register new time series (this is resource-intensive operation).
//
// Do not skip rowBuf in the hope it may be sent later, since this wastes CPU time for no reason.
rerouteErrors.Inc()
t := timerpool.Get(200 * time.Millisecond)
select {
case <-stopCh:
// stopCh is notified to stop.
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
}
// Obtain fresh list of healthy storage nodes after the delay, since it may be already updated.
sns = getHealthyStorageNodesBlocking(stopCh)
if len(sns) == 0 {
// stopCh is notified to stop.
return
case <-storageNodesStopCh:
return fmt.Errorf("cannot re-route %d rows because of graceful shutdown", rows)
default:
}
// Refresh the list of healthy storage nodes, since sn became broken.
sns = getStorageNodesForRerouting(snExclude)
}
}
return nil
}
func (sn *storageNode) sendReroutedRow(buf []byte) bool {
sn.brLock.Lock()
ok := len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode
if ok {
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows++
sn.rowsReroutedToHere.Inc()
func getStorageNodesForRerouting(snExclude *storageNode) []*storageNode {
sns := make([]*storageNode, 0, len(storageNodes))
currentTime := fasttime.UnixTimestamp()
for _, sn := range storageNodes {
if sn != snExclude && !sn.isBroken() && currentTime > atomic.LoadUint64(&sn.lastRerouteTime)+5 {
sns = append(sns, sn)
}
}
if len(sns) == 0 {
// There are no suitable storage nodes for re-routing. Fall back to snExclude.
sns = append(sns, snExclude)
}
return sns
}
func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
sn.brLock.Lock()
for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode {
select {
case <-storageNodesStopCh:
sn.brLock.Unlock()
return false
default:
}
if sn.isBroken() {
sn.brLock.Unlock()
return false
}
sn.brCond.Wait()
}
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows++
sn.brLock.Unlock()
return ok
return true
}
var (
maxBufSizePerStorageNode int
reroutedBR bufRows
reroutedBRLock sync.Mutex
reroutedBRCond = sync.NewCond(&reroutedBRLock)
reroutedBufMaxSize int
reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`)
reroutedBufWaits = metrics.NewCounter(`vm_rpc_rerouted_buf_waits_total{name="vminsert"}`)
reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`)
_ = metrics.NewGauge(`vm_rpc_rerouted_rows_pending{name="vminsert"}`, func() float64 {
reroutedBRLock.Lock()
n := reroutedBR.rows
reroutedBRLock.Unlock()
return float64(n)
})
_ = metrics.NewGauge(`vm_rpc_rerouted_buf_pending_bytes{name="vminsert"}`, func() float64 {
reroutedBRLock.Lock()
n := len(reroutedBR.buf)
reroutedBRLock.Unlock()
return float64(n)
})
rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`)
rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`)
reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`)
reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`)
rowsIncompletelyReplicatedTotal = metrics.NewCounter(`vm_rpc_rows_incompletely_replicated_total{name="vminsert"}`)
)

View file

@ -1481,13 +1481,6 @@
"intervalFactor": 1,
"legendFormat": "Handshake",
"refId": "E"
},
{
"expr": "sum(rate(vm_rpc_reroute_errors_total{job=~\"$job\",instance=~\"$instance.*\"}[5m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "Reroute",
"refId": "C"
}
],
"thresholds": [],
@ -3252,13 +3245,6 @@
"intervalFactor": 1,
"legendFormat": "Handshake",
"refId": "E"
},
{
"expr": "sum(rate(vm_rpc_reroute_errors_total{job=~\"$job\",instance=~\"$instance\"}[5m]))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "Reroute",
"refId": "C"
}
],
"thresholds": [],
@ -3610,102 +3596,6 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$ds",
"description": "The number of rows that were lost because of unhealthy vmstorage nodes.",
"fieldConfig": {
"defaults": {
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 62
},
"hiddenSeries": false,
"id": 84,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"max": false,
"min": false,
"show": false,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(vm_rpc_rows_lost_total{job=~\"$job\", instance=~\"$instance\"}[5m]))",
"legendFormat": "rows",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Rows lost ($instance)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "bytes",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
@ -6383,4 +6273,4 @@
"title": "VictoriaMetrics - cluster",
"uid": "oS7Bi_0Wz",
"version": 1
}
}

View file

@ -89,8 +89,6 @@ groups:
sum(increase(vm_rpc_dial_errors_total[5m])) by(job, instance)
+
sum(increase(vm_rpc_handshake_errors_total[5m])) by(job, instance)
+
sum(increase(vm_rpc_reroute_errors_total[5m])) by(job, instance)
) > 0
for: 15m
labels:
@ -208,17 +206,6 @@ groups:
This prevents from ingesting metrics with too many labels. Please verify that `-maxLabelsPerTimeseries` is configured
correctly or that clients which send these metrics aren't misbehaving."
- alert: VminsertIsDroppingRows
expr: sum(rate(vm_rpc_rows_lost_total[5m])) by(instance) > 0
for: 15m
labels:
severity: warning
annotations:
dashboard: "http://localhost:3000/d/oS7Bi_0Wz?viewPanel=84&var-instance={{ $labels.instance }}"
summary: "VMinsert \"{{ $labels.job }}\" on instance {{ $labels.instance }} drops rows due to RPC errors."
description: "VMinsert starts to drop rows if there are no healthy VMstorage nodes where it can route
insert requests to. Check the health state of VMstorage nodes and RPC metrics."
# Alerts group for vmagent assumes that Grafana dashboard
# https://grafana.com/grafana/dashboards/12683 is installed.

View file

@ -9,6 +9,7 @@ sort: 15
* FEATURE: vmalert: add a command-line flag `-rule.configCheckInterval` for automatic re-reading of `-rule` files without the need to send SIGHUP signal. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/512).
* FEATURE: vmagent: respect the `sample_limit` and `-promscrape.maxScrapeSize` values when scraping targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1331).
* FEATURE: vmauth: add ability to specify mutliple `url_prefix` entries for balancing the load among multiple `vmselect` and/or `vminsert` nodes in a cluster. See [these docs](https://docs.victoriametrics.com/vmauth.html#load-balancing).
* FEATURE: vminsert: add `-disableRerouting` command-line flag for forcibly disabling the rerouting. This should help resolving [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1054) issues.
* BUGFIX: reduce CPU usage by up to 2x during querying a database with big number of active daily time series. The issue has been introduced in `v1.59.0`.