diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index f8db1caacb..5550c7e55c 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -23,10 +23,11 @@ import ( ) var ( - disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") + disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+ "Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+ "Higher values for -dedup.minScrapeInterval at vmselect is OK") + disableRerouting = flag.Bool(`disableRerouting`, false, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. By default the re-routing is enabled. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster") ) func (sn *storageNode) isBroken() bool { @@ -49,16 +50,27 @@ func (sn *storageNode) push(buf []byte, rows int) error { } sn.rowsPushed.Add(rows) + sn.brLock.Lock() +again: + select { + case <-storageNodesStopCh: + sn.brLock.Unlock() + return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows) + default: + } if sn.isBroken() { - // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes. - if err := addToReroutedBufMayBlock(buf, rows); err != nil { + if len(storageNodes) == 1 { + // There are no other storage nodes to re-route to. So wait until the current node becomes healthy. + sn.brCond.Wait() + goto again + } + sn.brLock.Unlock() + // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes even if *disableRerouting==true. + if err := rerouteRowsMayBlock(sn, buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err) } - sn.rowsReroutedFromHere.Add(rows) return nil } - - sn.brLock.Lock() if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode { // Fast path: the buf contents fits sn.buf. sn.br.buf = append(sn.br.buf, buf...) @@ -66,15 +78,18 @@ func (sn *storageNode) push(buf []byte, rows int) error { sn.brLock.Unlock() return nil } + if *disableRerouting || len(storageNodes) == 1 { + sn.brCond.Wait() + goto again + } sn.brLock.Unlock() - // Slow path: the buf contents doesn't fit sn.buf. + // The buf contents doesn't fit sn.buf. // This means that the current vmstorage is slow or will become broken soon. - // Re-route buf to healthy vmstorage nodes. - if err := addToReroutedBufMayBlock(buf, rows); err != nil { + // Spread buf among all the vmstorage nodes. + if err := rerouteRowsMayBlock(sn, buf, rows); err != nil { return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err) } - sn.rowsReroutedFromHere.Add(rows) return nil } @@ -118,6 +133,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) { } sn.brLock.Lock() sn.br, br = br, sn.br + sn.brCond.Broadcast() sn.brLock.Unlock() currentTime := fasttime.UnixTimestamp() if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { @@ -200,6 +216,7 @@ func (sn *storageNode) checkHealth() { bc, err := sn.dial() if err != nil { atomic.StoreUint32(&sn.broken, 1) + sn.brCond.Broadcast() if sn.lastDialErr == nil { // Log the error only once. sn.lastDialErr = err @@ -211,6 +228,7 @@ func (sn *storageNode) checkHealth() { sn.lastDialErr = nil sn.bc = bc atomic.StoreUint32(&sn.broken, 0) + sn.brCond.Broadcast() } func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { @@ -240,6 +258,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { } sn.bc = nil atomic.StoreUint32(&sn.broken, 1) + sn.brCond.Broadcast() sn.connectionErrors.Inc() return false } @@ -310,53 +329,11 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) { return bc, nil } -func rerouteWorker(stopCh <-chan struct{}) { - ticker := time.NewTicker(200 * time.Millisecond) - defer ticker.Stop() - var br bufRows - brLastResetTime := fasttime.UnixTimestamp() - var waitCh <-chan struct{} - mustStop := false - for !mustStop { - reroutedBRLock.Lock() - bufLen := len(reroutedBR.buf) - reroutedBRLock.Unlock() - waitCh = nil - if bufLen > 0 { - // Do not sleep if reroutedBR contains data to process. - waitCh = closedCh - } - select { - case <-stopCh: - mustStop = true - // Make sure reroutedBR is re-routed last time before returning - // in order to reroute the remaining data to healthy vmstorage nodes. - case <-ticker.C: - case <-waitCh: - } - reroutedBRLock.Lock() - reroutedBR, br = br, reroutedBR - reroutedBRLock.Unlock() - reroutedBRCond.Broadcast() - currentTime := fasttime.UnixTimestamp() - if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { - // Free up capacity space occupied by br.buf in order to reduce memory usage after spikes. - br.buf = append(br.buf[:0:0], br.buf...) - brLastResetTime = currentTime - } - if len(br.buf) == 0 { - // Nothing to re-route. - continue - } - spreadReroutedBufToStorageNodesBlocking(stopCh, &br) - br.reset() - } - // Notify all the blocked addToReroutedBufMayBlock callers, so they may finish the work. - reroutedBRCond.Broadcast() -} - // storageNode is a client sending data to vmstorage node. type storageNode struct { + // The last time for the re-routing. + lastRerouteTime uint64 + // broken is set to non-zero if the given vmstorage node is temporarily unhealthy. // In this case the data is re-routed to the remaining healthy vmstorage nodes. broken uint32 @@ -364,6 +341,9 @@ type storageNode struct { // brLock protects br. brLock sync.Mutex + // brCond is used for waiting for free space in br. + brCond *sync.Cond + // Buffer with data that needs to be written to the storage node. // It must be accessed under brLock. br bufRows @@ -407,15 +387,9 @@ type storageNode struct { // storageNodes contains a list of vmstorage node clients. var storageNodes []*storageNode -var ( - storageNodesWG sync.WaitGroup - rerouteWorkerWG sync.WaitGroup -) +var storageNodesWG sync.WaitGroup -var ( - storageNodesStopCh = make(chan struct{}) - rerouteWorkerStopCh = make(chan struct{}) -) +var storageNodesStopCh = make(chan struct{}) // InitStorageNodes initializes vmstorage nodes' connections to the given addrs. func InitStorageNodes(addrs []string) { @@ -439,6 +413,7 @@ func InitStorageNodes(addrs []string) { rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)), rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)), } + sn.brCond = sync.NewCond(&sn.brLock) _ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 { sn.brLock.Lock() n := sn.br.rows @@ -464,13 +439,6 @@ func InitStorageNodes(addrs []string) { if maxBufSizePerStorageNode > consts.MaxInsertPacketSize { maxBufSizePerStorageNode = consts.MaxInsertPacketSize } - reroutedBufMaxSize = memory.Allowed() / 16 - if reroutedBufMaxSize < maxBufSizePerStorageNode { - reroutedBufMaxSize = maxBufSizePerStorageNode - } - if reroutedBufMaxSize > maxBufSizePerStorageNode*len(storageNodes) { - reroutedBufMaxSize = maxBufSizePerStorageNode * len(storageNodes) - } for idx, sn := range storageNodes { storageNodesWG.Add(1) @@ -479,208 +447,117 @@ func InitStorageNodes(addrs []string) { storageNodesWG.Done() }(sn, idx) } - - rerouteWorkerWG.Add(1) - go func() { - rerouteWorker(rerouteWorkerStopCh) - rerouteWorkerWG.Done() - }() } // Stop gracefully stops netstorage. func Stop() { - close(rerouteWorkerStopCh) - rerouteWorkerWG.Wait() - close(storageNodesStopCh) + for _, sn := range storageNodes { + sn.brCond.Broadcast() + } storageNodesWG.Wait() } -// addToReroutedBufMayBlock adds buf to reroutedBR. +// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes excluding the snExclude. // -// It waits until the reroutedBR has enough space for buf or if Stop is called. +// It waits until healthy storage nodes have enough space for the re-routed rows. // This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' // ingestion rate capacity. // -// It returns non-nil error only in the following cases: -// -// - if all the storage nodes are unhealthy. -// - if Stop is called. -func addToReroutedBufMayBlock(buf []byte, rows int) error { - if len(buf) > reroutedBufMaxSize { - logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize) +// It returns non-nil error only if Stop is called. +func rerouteRowsMayBlock(snExclude *storageNode, buf []byte, rows int) error { + if len(storageNodes) < 2 { + logger.Panicf("BUG: re-routing can work only if at least 2 storage nodes are configured; got %d nodes", len(storageNodes)) } - - reroutedBRLock.Lock() - defer reroutedBRLock.Unlock() - - for len(reroutedBR.buf)+len(buf) > reroutedBufMaxSize { - if getHealthyStorageNodesCount() == 0 { - rowsLostTotal.Add(rows) - return fmt.Errorf("all the vmstorage nodes are unavailable and reroutedBR has no enough space for storing %d bytes; "+ - "only %d free bytes left out of %d bytes in reroutedBR", - len(buf), reroutedBufMaxSize-len(reroutedBR.buf), reroutedBufMaxSize) - } - select { - case <-rerouteWorkerStopCh: - rowsLostTotal.Add(rows) - return fmt.Errorf("rerouteWorker cannot send the data since it is stopped") - default: - } - - // The reroutedBR.buf has no enough space for len(buf). Wait while the reroutedBR.buf is sent by rerouteWorker. - reroutedBufWaits.Inc() - reroutedBRCond.Wait() - } - reroutedBR.buf = append(reroutedBR.buf, buf...) - reroutedBR.rows += rows - reroutesTotal.Inc() - return nil -} - -func getHealthyStorageNodesCount() int { - n := 0 - for _, sn := range storageNodes { - if !sn.isBroken() { - n++ - } - } - return n -} - -func getHealthyStorageNodes() []*storageNode { - sns := make([]*storageNode, 0, len(storageNodes)-1) - for _, sn := range storageNodes { - if !sn.isBroken() { - sns = append(sns, sn) - } - } - return sns -} - -func getHealthyStorageNodesBlocking(stopCh <-chan struct{}) []*storageNode { - // Wait for at least a single healthy storage node. - for { - // Wake up goroutines blocked at addToReroutedBufMayBlock, so they can return error to the caller if reroutedBR is full. - // This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 - reroutedBRCond.Broadcast() - - sns := getHealthyStorageNodes() - if len(sns) > 0 { - return sns - } - // There is no healthy storage nodes. - // Wait for a while until such nodes appear. - t := timerpool.Get(time.Second) - select { - case <-stopCh: - timerpool.Put(t) - return nil - case <-t.C: - timerpool.Put(t) - } - } -} - -func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows) { var mr storage.MetricRow - rowsProcessed := 0 - defer func() { - reroutedRowsProcessed.Add(rowsProcessed) - }() - - sns := getHealthyStorageNodesBlocking(stopCh) - if len(sns) == 0 { - // stopCh is notified to stop. - return + sns := getStorageNodesForRerouting(snExclude) + if len(sns) == 1 { + // Fast path: only a single storage node is available for re-routing. + sn := sns[0] + if sn.sendBufMayBlock(buf) { + if sn != snExclude { + snExclude.rowsReroutedFromHere.Add(rows) + sn.rowsReroutedToHere.Add(rows) + } + return nil + } } - src := br.buf + reroutesTotal.Inc() + atomic.StoreUint64(&snExclude.lastRerouteTime, fasttime.UnixTimestamp()) + src := buf for len(src) > 0 { tail, err := mr.UnmarshalX(src) if err != nil { - logger.Panicf("BUG: cannot unmarshal MetricRow from reroutedBR.buf: %s", err) + logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err) } rowBuf := src[:len(src)-len(tail)] src = tail - rowsProcessed++ - var h uint64 - if len(storageNodes) > 1 { - // Do not use jump.Hash(h, int32(len(sns))) here, - // since this leads to uneven distribution of rerouted rows among sns - - // they all go to the original or to the next sn. - h = xxhash.Sum64(mr.MetricNameRaw) - } + reroutedRowsProcessed.Inc() + h := xxhash.Sum64(mr.MetricNameRaw) mr.ResetX() for { idx := h % uint64(len(sns)) sn := sns[idx] - if sn.sendReroutedRow(rowBuf) { + if sn.sendBufMayBlock(rowBuf) { // The row has been successfully re-routed to sn. + if sn != snExclude { + snExclude.rowsReroutedFromHere.Inc() + sn.rowsReroutedToHere.Inc() + } break } - // The row cannot be re-routed to sn. Wait for a while and try again. - // Do not re-route the row to the remaining storage nodes, - // since this may result in increased resource usage (CPU, memory, disk IO) on these nodes, - // because they'll have to accept and register new time series (this is resource-intensive operation). - // - // Do not skip rowBuf in the hope it may be sent later, since this wastes CPU time for no reason. - rerouteErrors.Inc() - t := timerpool.Get(200 * time.Millisecond) select { - case <-stopCh: - // stopCh is notified to stop. - timerpool.Put(t) - return - case <-t.C: - timerpool.Put(t) - } - // Obtain fresh list of healthy storage nodes after the delay, since it may be already updated. - sns = getHealthyStorageNodesBlocking(stopCh) - if len(sns) == 0 { - // stopCh is notified to stop. - return + case <-storageNodesStopCh: + return fmt.Errorf("cannot re-route %d rows because of graceful shutdown", rows) + default: } + // Refresh the list of healthy storage nodes, since sn became broken. + sns = getStorageNodesForRerouting(snExclude) } } + return nil } -func (sn *storageNode) sendReroutedRow(buf []byte) bool { - sn.brLock.Lock() - ok := len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode - if ok { - sn.br.buf = append(sn.br.buf, buf...) - sn.br.rows++ - sn.rowsReroutedToHere.Inc() +func getStorageNodesForRerouting(snExclude *storageNode) []*storageNode { + sns := make([]*storageNode, 0, len(storageNodes)) + currentTime := fasttime.UnixTimestamp() + for _, sn := range storageNodes { + if sn != snExclude && !sn.isBroken() && currentTime > atomic.LoadUint64(&sn.lastRerouteTime)+5 { + sns = append(sns, sn) + } } + if len(sns) == 0 { + // There are no suitable storage nodes for re-routing. Fall back to snExclude. + sns = append(sns, snExclude) + } + return sns +} + +func (sn *storageNode) sendBufMayBlock(buf []byte) bool { + sn.brLock.Lock() + for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode { + select { + case <-storageNodesStopCh: + sn.brLock.Unlock() + return false + default: + } + if sn.isBroken() { + sn.brLock.Unlock() + return false + } + sn.brCond.Wait() + } + sn.br.buf = append(sn.br.buf, buf...) + sn.br.rows++ sn.brLock.Unlock() - return ok + return true } var ( maxBufSizePerStorageNode int - reroutedBR bufRows - reroutedBRLock sync.Mutex - reroutedBRCond = sync.NewCond(&reroutedBRLock) - reroutedBufMaxSize int - - reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`) - reroutedBufWaits = metrics.NewCounter(`vm_rpc_rerouted_buf_waits_total{name="vminsert"}`) - reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`) - _ = metrics.NewGauge(`vm_rpc_rerouted_rows_pending{name="vminsert"}`, func() float64 { - reroutedBRLock.Lock() - n := reroutedBR.rows - reroutedBRLock.Unlock() - return float64(n) - }) - _ = metrics.NewGauge(`vm_rpc_rerouted_buf_pending_bytes{name="vminsert"}`, func() float64 { - reroutedBRLock.Lock() - n := len(reroutedBR.buf) - reroutedBRLock.Unlock() - return float64(n) - }) - - rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`) - rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`) + reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`) + reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`) rowsIncompletelyReplicatedTotal = metrics.NewCounter(`vm_rpc_rows_incompletely_replicated_total{name="vminsert"}`) ) diff --git a/dashboards/victoriametrics.json b/dashboards/victoriametrics.json index 54227b18c4..f8ccd2a773 100644 --- a/dashboards/victoriametrics.json +++ b/dashboards/victoriametrics.json @@ -1481,13 +1481,6 @@ "intervalFactor": 1, "legendFormat": "Handshake", "refId": "E" - }, - { - "expr": "sum(rate(vm_rpc_reroute_errors_total{job=~\"$job\",instance=~\"$instance.*\"}[5m]))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "Reroute", - "refId": "C" } ], "thresholds": [], @@ -3252,13 +3245,6 @@ "intervalFactor": 1, "legendFormat": "Handshake", "refId": "E" - }, - { - "expr": "sum(rate(vm_rpc_reroute_errors_total{job=~\"$job\",instance=~\"$instance\"}[5m]))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "Reroute", - "refId": "C" } ], "thresholds": [], @@ -3610,102 +3596,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "$ds", - "description": "The number of rows that were lost because of unhealthy vmstorage nodes.", - "fieldConfig": { - "defaults": { - "links": [] - }, - "overrides": [] - }, - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 62 - }, - "hiddenSeries": false, - "id": 84, - "legend": { - "alignAsTable": true, - "avg": true, - "current": true, - "max": false, - "min": false, - "show": false, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "options": { - "alertThreshold": true - }, - "percentage": false, - "pluginVersion": "7.5.1", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "sum(rate(vm_rpc_rows_lost_total{job=~\"$job\", instance=~\"$instance\"}[5m]))", - "legendFormat": "rows", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Rows lost ($instance)", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - }, - { - "format": "bytes", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -6383,4 +6273,4 @@ "title": "VictoriaMetrics - cluster", "uid": "oS7Bi_0Wz", "version": 1 -} \ No newline at end of file +} diff --git a/deployment/docker/alerts.yml b/deployment/docker/alerts.yml index c537bf8e28..dccbe8ec81 100644 --- a/deployment/docker/alerts.yml +++ b/deployment/docker/alerts.yml @@ -89,8 +89,6 @@ groups: sum(increase(vm_rpc_dial_errors_total[5m])) by(job, instance) + sum(increase(vm_rpc_handshake_errors_total[5m])) by(job, instance) - + - sum(increase(vm_rpc_reroute_errors_total[5m])) by(job, instance) ) > 0 for: 15m labels: @@ -208,17 +206,6 @@ groups: This prevents from ingesting metrics with too many labels. Please verify that `-maxLabelsPerTimeseries` is configured correctly or that clients which send these metrics aren't misbehaving." - - alert: VminsertIsDroppingRows - expr: sum(rate(vm_rpc_rows_lost_total[5m])) by(instance) > 0 - for: 15m - labels: - severity: warning - annotations: - dashboard: "http://localhost:3000/d/oS7Bi_0Wz?viewPanel=84&var-instance={{ $labels.instance }}" - summary: "VMinsert \"{{ $labels.job }}\" on instance {{ $labels.instance }} drops rows due to RPC errors." - description: "VMinsert starts to drop rows if there are no healthy VMstorage nodes where it can route - insert requests to. Check the health state of VMstorage nodes and RPC metrics." - # Alerts group for vmagent assumes that Grafana dashboard # https://grafana.com/grafana/dashboards/12683 is installed. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 39f144b435..07694d6069 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,7 @@ sort: 15 * FEATURE: vmalert: add a command-line flag `-rule.configCheckInterval` for automatic re-reading of `-rule` files without the need to send SIGHUP signal. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/512). * FEATURE: vmagent: respect the `sample_limit` and `-promscrape.maxScrapeSize` values when scraping targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1331). * FEATURE: vmauth: add ability to specify mutliple `url_prefix` entries for balancing the load among multiple `vmselect` and/or `vminsert` nodes in a cluster. See [these docs](https://docs.victoriametrics.com/vmauth.html#load-balancing). +* FEATURE: vminsert: add `-disableRerouting` command-line flag for forcibly disabling the rerouting. This should help resolving [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1054) issues. * BUGFIX: reduce CPU usage by up to 2x during querying a database with big number of active daily time series. The issue has been introduced in `v1.59.0`.