mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vmcluster: re-routing enhancement (#5293)
* app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
c1f651a9f9
commit
f7834767c1
10 changed files with 132 additions and 22 deletions
26
README.md
26
README.md
|
@ -312,6 +312,10 @@ See more details about cardinality limiter in [these docs](https://docs.victoria
|
|||
|
||||
## Troubleshooting
|
||||
|
||||
- If your VictoriaMetrics cluster experiences data ingestion delays during
|
||||
[rolling restarts and configuration updates](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#updating--reconfiguring-cluster-nodes),
|
||||
then see [these docs](#improving-re-routing-performance-during-restart).
|
||||
|
||||
[Troubleshooting docs for single-node VictoriaMetrics](https://docs.victoriametrics.com/Troubleshooting.html) apply to VictoriaMetrics cluster as well.
|
||||
|
||||
## Readonly mode
|
||||
|
@ -459,8 +463,18 @@ This strategy allows upgrading the cluster without downtime if the following con
|
|||
- The updated config / upgraded binary is compatible with the remaining components in the cluster.
|
||||
See the [CHANGELOG](https://docs.victoriametrics.com/CHANGELOG.html) for compatibility notes between different releases.
|
||||
|
||||
If at least a single condition isn't met, then the rolling restart may result in cluster unavailability
|
||||
during the config update / version upgrade. In this case the following strategy is recommended.
|
||||
If at least a single condition isn't met, then the rolling restart may result in cluster unavailability
|
||||
during the config update / version upgrade. In this case the following strategy is recommended.
|
||||
|
||||
#### Improving re-routing performance during restart
|
||||
|
||||
`vmstorage` nodes may experience increased usage for CPU, RAM and disk IO during
|
||||
[rolling restarts](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#no-downtime-strategy),
|
||||
since they need to process higher load when some of `vmstorage` nodes are temporarily unavailable in the cluster.
|
||||
It is possible to reduce resource usage spikes by running more `vminsert` nodes and by passing bigger values
|
||||
to `-storage.vminsertConnsShutdownDuration` command-line flag at `vmstorage` nodes.
|
||||
Make sure that the `-storage.vminsertConnsShutdownDuration` is smaller than the graceful timeout configured at the system which manages `vmstorage`
|
||||
(e.g. Docker, Kubernetes, systemd, etc.). Otherwise the system may kill `vmstorage` node before it finishes gradual closing of `vminsert` connections.
|
||||
|
||||
### Minimum downtime strategy
|
||||
|
||||
|
@ -855,6 +869,8 @@ Below is the output for `/path/to/vminsert -help`:
|
|||
Whether to skip verification of TLS certificates provided by -storageNode nodes if -cluster.tls flag is set. Note that disabled TLS certificate verification breaks security. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
-cluster.tlsKeyFile string
|
||||
Path to client-side TLS key file to use when connecting to -storageNode if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection . This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
-clusternative.vminsertConnsShutdownDuration duration
|
||||
The time needed for gradual closing of upstream vminsert connections during graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining lower-level clusters during rolling restart. Smaller duration reduces the time needed to close all the upstream vminsert connections, thus reducing the time for graceful shutdown. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#improving-re-routing-performance-during-restart (default 25s)
|
||||
-clusternativeListenAddr string
|
||||
TCP address to listen for data from other vminsert nodes in multi-level cluster setup. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8400 should be set to match default vmstorage port for vminsert. Disabled work if empty
|
||||
-csvTrimTimestamp duration
|
||||
|
@ -963,6 +979,8 @@ Below is the output for `/path/to/vminsert -help`:
|
|||
Allows renaming fields in JSON formatted logs. Example: "ts:timestamp,msg:message" renames "ts" to "timestamp" and "msg" to "message". Supported fields: ts, level, caller, msg
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
|
||||
-loggerMaxArgLen int
|
||||
The maximum length of a single logged argument. Longer arguments are replaced with 'arg_start..arg_end', where 'arg_start' and 'arg_end' is prefix and suffix of the arg with the length not exceeding -loggerMaxArgLen / 2 (default 500)
|
||||
-loggerOutput string
|
||||
Output for the logs. Supported values: stderr, stdout (default "stderr")
|
||||
-loggerTimezone string
|
||||
|
@ -1421,6 +1439,8 @@ Below is the output for `/path/to/vmstorage -help`:
|
|||
Allows renaming fields in JSON formatted logs. Example: "ts:timestamp,msg:message" renames "ts" to "timestamp" and "msg" to "message". Supported fields: ts, level, caller, msg
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
|
||||
-loggerMaxArgLen int
|
||||
The maximum length of a single logged argument. Longer arguments are replaced with 'arg_start..arg_end', where 'arg_start' and 'arg_end' is prefix and suffix of the arg with the length not exceeding -loggerMaxArgLen / 2 (default 500)
|
||||
-loggerOutput string
|
||||
Output for the logs. Supported values: stderr, stdout (default "stderr")
|
||||
-loggerTimezone string
|
||||
|
@ -1500,6 +1520,8 @@ Below is the output for `/path/to/vmstorage -help`:
|
|||
-storage.minFreeDiskSpaceBytes size
|
||||
The minimum free disk space at -storageDataPath after which the storage stops accepting new data
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 10000000)
|
||||
-storage.vminsertConnsShutdownDuration duration
|
||||
The time needed for gradual closing of vminsert connections during graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining vmstorage nodes during rolling restart. Smaller duration reduces the time needed to close all the vminsert connections, thus reducing the time for graceful shutdown. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#improving-re-routing-performance-during-restart (default 25s)
|
||||
-storageDataPath string
|
||||
Path to storage data (default "vmstorage-data")
|
||||
-tls
|
||||
|
|
|
@ -7,6 +7,9 @@ import (
|
|||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
||||
|
@ -15,10 +18,16 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative/stream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
|
||||
var (
|
||||
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression "+
|
||||
"at the cost of precision loss")
|
||||
vminsertConnsShutdownDuration = flag.Duration("storage.vminsertConnsShutdownDuration", 25*time.Second, "The time needed for gradual closing of vminsert connections during "+
|
||||
"graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining vmstorage nodes during rolling restart. "+
|
||||
"Smaller duration reduces the time needed to close all the vminsert connections, thus reducing the time for graceful shutdown. "+
|
||||
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#improving-re-routing-performance-during-restart")
|
||||
)
|
||||
|
||||
// VMInsertServer processes connections from vminsert.
|
||||
type VMInsertServer struct {
|
||||
|
@ -52,7 +61,7 @@ func NewVMInsertServer(addr string, storage *storage.Storage) (*VMInsertServer,
|
|||
storage: storage,
|
||||
ln: ln,
|
||||
}
|
||||
s.connsMap.Init()
|
||||
s.connsMap.Init("vminsert")
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.run()
|
||||
|
@ -145,7 +154,7 @@ func (s *VMInsertServer) MustStop() {
|
|||
|
||||
// Close existing connections from vminsert, so the goroutines
|
||||
// processing these connections are finished.
|
||||
s.connsMap.CloseAll()
|
||||
s.connsMap.CloseAll(*vminsertConnsShutdownDuration)
|
||||
|
||||
// Wait until all the goroutines processing vminsert conns are finished.
|
||||
s.wg.Wait()
|
||||
|
|
|
@ -81,6 +81,7 @@ The sandbox cluster installation is running under the constant load generated by
|
|||
* FEATURE: all: track requests with wrong auth key and wrong basic auth at `vm_http_request_errors_total` [metric](https://docs.victoriametrics.com/#monitoring) with `reason="wrong_auth_key"` and `reason="wrong_basic_auth"`. See [this issue](https://github.com/victoriaMetrics/victoriaMetrics/issues/4590). Thanks to @venkatbvc for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5166).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to drop the specified number of `/`-delimited prefix parts from the request path before proxying the request to the matching backend. See [these docs](https://docs.victoriametrics.com/vmauth.html#dropping-request-path-prefix).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to skip TLS verification and to specify TLS Root CA when connecting to backends. See [these docs](https://docs.victoriametrics.com/vmauth.html#backend-tls-setup) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5240).
|
||||
* FEATURE: `vmstorage`: gradually close `vminsert` connections during 25 seconds at [graceful shutdown](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#updating--reconfiguring-cluster-nodes). This should reduce data ingestion slowdown during rolling restarts. The duration for gradual closing of `vminsert` connections can be configured via `-storage.vminsertConnsShutdownDuration` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922) and [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#improving-re-routing-performance-during-restart) for details.
|
||||
* FEATURE: `vmstorage`: add `-blockcache.missesBeforeCaching` command-line flag, which can be used for fine-tuning RAM usage for `indexdb/dataBlocks` cache when queries touching big number of time series are executed.
|
||||
* FEATURE: add `-loggerMaxArgLen` command-line flag for fine-tuning the maximum lengths of logged args.
|
||||
|
||||
|
|
|
@ -323,6 +323,10 @@ See more details about cardinality limiter in [these docs](https://docs.victoria
|
|||
|
||||
## Troubleshooting
|
||||
|
||||
- If your VictoriaMetrics cluster experiences data ingestion delays during
|
||||
[rolling restarts and configuration updates](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#updating--reconfiguring-cluster-nodes),
|
||||
then see [these docs](#improving-re-routing-performance-during-restart).
|
||||
|
||||
[Troubleshooting docs for single-node VictoriaMetrics](https://docs.victoriametrics.com/Troubleshooting.html) apply to VictoriaMetrics cluster as well.
|
||||
|
||||
## Readonly mode
|
||||
|
@ -470,8 +474,18 @@ This strategy allows upgrading the cluster without downtime if the following con
|
|||
- The updated config / upgraded binary is compatible with the remaining components in the cluster.
|
||||
See the [CHANGELOG](https://docs.victoriametrics.com/CHANGELOG.html) for compatibility notes between different releases.
|
||||
|
||||
If at least a single condition isn't met, then the rolling restart may result in cluster unavailability
|
||||
during the config update / version upgrade. In this case the following strategy is recommended.
|
||||
If at least a single condition isn't met, then the rolling restart may result in cluster unavailability
|
||||
during the config update / version upgrade. In this case the following strategy is recommended.
|
||||
|
||||
#### Improving re-routing performance during restart
|
||||
|
||||
`vmstorage` nodes may experience increased usage for CPU, RAM and disk IO during
|
||||
[rolling restarts](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#no-downtime-strategy),
|
||||
since they need to process higher load when some of `vmstorage` nodes are temporarily unavailable in the cluster.
|
||||
It is possible to reduce resource usage spikes by running more `vminsert` nodes and by passing bigger values
|
||||
to `-storage.vminsertConnsShutdownDuration` command-line flag at `vmstorage` nodes.
|
||||
Make sure that the `-storage.vminsertConnsShutdownDuration` is smaller than the graceful timeout configured at the system which manages `vmstorage`
|
||||
(e.g. Docker, Kubernetes, systemd, etc.). Otherwise the system may kill `vmstorage` node before it finishes gradual closing of `vminsert` connections.
|
||||
|
||||
### Minimum downtime strategy
|
||||
|
||||
|
@ -866,6 +880,8 @@ Below is the output for `/path/to/vminsert -help`:
|
|||
Whether to skip verification of TLS certificates provided by -storageNode nodes if -cluster.tls flag is set. Note that disabled TLS certificate verification breaks security. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
-cluster.tlsKeyFile string
|
||||
Path to client-side TLS key file to use when connecting to -storageNode if -cluster.tls flag is set. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection . This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
-clusternative.vminsertConnsShutdownDuration duration
|
||||
The time needed for gradual closing of upstream vminsert connections during graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining lower-level clusters during rolling restart. Smaller duration reduces the time needed to close all the upstream vminsert connections, thus reducing the time for graceful shutdown. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#improving-re-routing-performance-during-restart (default 25s)
|
||||
-clusternativeListenAddr string
|
||||
TCP address to listen for data from other vminsert nodes in multi-level cluster setup. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8400 should be set to match default vmstorage port for vminsert. Disabled work if empty
|
||||
-csvTrimTimestamp duration
|
||||
|
@ -974,6 +990,8 @@ Below is the output for `/path/to/vminsert -help`:
|
|||
Allows renaming fields in JSON formatted logs. Example: "ts:timestamp,msg:message" renames "ts" to "timestamp" and "msg" to "message". Supported fields: ts, level, caller, msg
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
|
||||
-loggerMaxArgLen int
|
||||
The maximum length of a single logged argument. Longer arguments are replaced with 'arg_start..arg_end', where 'arg_start' and 'arg_end' is prefix and suffix of the arg with the length not exceeding -loggerMaxArgLen / 2 (default 500)
|
||||
-loggerOutput string
|
||||
Output for the logs. Supported values: stderr, stdout (default "stderr")
|
||||
-loggerTimezone string
|
||||
|
@ -1177,6 +1195,8 @@ Below is the output for `/path/to/vmselect -help`:
|
|||
Allows renaming fields in JSON formatted logs. Example: "ts:timestamp,msg:message" renames "ts" to "timestamp" and "msg" to "message". Supported fields: ts, level, caller, msg
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
|
||||
-loggerMaxArgLen int
|
||||
The maximum length of a single logged argument. Longer arguments are replaced with 'arg_start..arg_end', where 'arg_start' and 'arg_end' is prefix and suffix of the arg with the length not exceeding -loggerMaxArgLen / 2 (default 500)
|
||||
-loggerOutput string
|
||||
Output for the logs. Supported values: stderr, stdout (default "stderr")
|
||||
-loggerTimezone string
|
||||
|
@ -1430,6 +1450,8 @@ Below is the output for `/path/to/vmstorage -help`:
|
|||
Allows renaming fields in JSON formatted logs. Example: "ts:timestamp,msg:message" renames "ts" to "timestamp" and "msg" to "message". Supported fields: ts, level, caller, msg
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
|
||||
-loggerMaxArgLen int
|
||||
The maximum length of a single logged argument. Longer arguments are replaced with 'arg_start..arg_end', where 'arg_start' and 'arg_end' is prefix and suffix of the arg with the length not exceeding -loggerMaxArgLen / 2 (default 500)
|
||||
-loggerOutput string
|
||||
Output for the logs. Supported values: stderr, stdout (default "stderr")
|
||||
-loggerTimezone string
|
||||
|
@ -1509,6 +1531,8 @@ Below is the output for `/path/to/vmstorage -help`:
|
|||
-storage.minFreeDiskSpaceBytes size
|
||||
The minimum free disk space at -storageDataPath after which the storage stops accepting new data
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 10000000)
|
||||
-storage.vminsertConnsShutdownDuration duration
|
||||
The time needed for gradual closing of vminsert connections during graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining vmstorage nodes during rolling restart. Smaller duration reduces the time needed to close all the vminsert connections, thus reducing the time for graceful shutdown. See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#improving-re-routing-performance-during-restart (default 25s)
|
||||
-storageDataPath string
|
||||
Path to storage data (default "vmstorage-data")
|
||||
-tls
|
||||
|
|
|
@ -2,6 +2,7 @@ package clusternative
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -13,6 +14,13 @@ import (
|
|||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
vminsertConnsShutdownDuration = flag.Duration("clusternative.vminsertConnsShutdownDuration", 25*time.Second, "The time needed for gradual closing of upstream "+
|
||||
"vminsert connections during graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining lower-level clusters "+
|
||||
"during rolling restart. Smaller duration reduces the time needed to close all the upstream vminsert connections, thus reducing the time for graceful shutdown. "+
|
||||
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#improving-re-routing-performance-during-restart")
|
||||
)
|
||||
|
||||
var (
|
||||
writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="clusternative", net="tcp"}`)
|
||||
writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="clusternative", net="tcp"}`)
|
||||
|
@ -41,7 +49,7 @@ func MustStart(addr string, insertHandler func(c net.Conn) error) *Server {
|
|||
addr: addr,
|
||||
lnTCP: lnTCP,
|
||||
}
|
||||
s.cm.Init()
|
||||
s.cm.Init("vminsert_upstream")
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
@ -57,7 +65,7 @@ func (s *Server) MustStop() {
|
|||
if err := s.lnTCP.Close(); err != nil {
|
||||
logger.Errorf("cannot close TCP clusternative server: %s", err)
|
||||
}
|
||||
s.cm.CloseAll()
|
||||
s.cm.CloseAll(*vminsertConnsShutdownDuration)
|
||||
s.wg.Wait()
|
||||
logger.Infof("TCP clusternative server at %q has been stopped", s.addr)
|
||||
}
|
||||
|
|
|
@ -2,18 +2,25 @@ package ingestserver
|
|||
|
||||
import (
|
||||
"net"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
// ConnsMap is used for tracking active connections.
|
||||
type ConnsMap struct {
|
||||
clientName string
|
||||
|
||||
mu sync.Mutex
|
||||
m map[net.Conn]struct{}
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
// Init initializes cm.
|
||||
func (cm *ConnsMap) Init() {
|
||||
func (cm *ConnsMap) Init(clientName string) {
|
||||
cm.clientName = clientName
|
||||
cm.m = make(map[net.Conn]struct{})
|
||||
cm.isClosed = false
|
||||
}
|
||||
|
@ -36,12 +43,51 @@ func (cm *ConnsMap) Delete(c net.Conn) {
|
|||
cm.mu.Unlock()
|
||||
}
|
||||
|
||||
// CloseAll closes all the added conns.
|
||||
func (cm *ConnsMap) CloseAll() {
|
||||
// CloseAll gradually closes all the cm conns with during the given shutdownDuration.
|
||||
func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
|
||||
cm.mu.Lock()
|
||||
conns := make([]net.Conn, len(cm.m))
|
||||
for c := range cm.m {
|
||||
_ = c.Close()
|
||||
conns = append(conns, c)
|
||||
delete(cm.m, c)
|
||||
}
|
||||
cm.isClosed = true
|
||||
cm.mu.Unlock()
|
||||
|
||||
if shutdownDuration <= 0 {
|
||||
// Close all the connections at once.
|
||||
for _, c := range conns {
|
||||
_ = c.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(conns) == 0 {
|
||||
return
|
||||
}
|
||||
if len(conns) == 1 {
|
||||
// Simple case - just close a single connection and that's it!
|
||||
_ = conns[0].Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Sort vminsert conns in order to make the order of closing connections deterministic across vmstorage nodes.
|
||||
// This should reduce resource usage spikes at vmstorage nodes during rolling restarts.
|
||||
sort.Slice(conns, func(i, j int) bool {
|
||||
return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String()
|
||||
})
|
||||
|
||||
shutdownInterval := shutdownDuration / time.Duration(len(conns)-1)
|
||||
startTime := time.Now()
|
||||
logger.Infof("closing %d %s connections with %dms interval between them", len(conns), cm.clientName, shutdownInterval.Milliseconds())
|
||||
remoteAddr := conns[0].RemoteAddr().String()
|
||||
_ = conns[0].Close()
|
||||
logger.Infof("closed %s connection %s", cm.clientName, remoteAddr)
|
||||
conns = conns[1:]
|
||||
for _, c := range conns {
|
||||
time.Sleep(shutdownInterval)
|
||||
remoteAddr := c.RemoteAddr().String()
|
||||
_ = c.Close()
|
||||
logger.Infof("closed %s connection %s", cm.clientName, remoteAddr)
|
||||
}
|
||||
logger.Infof("closed %d %s connections in %s", len(conns), cm.clientName, time.Since(startTime))
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reade
|
|||
lnTCP: lnTCP,
|
||||
lnUDP: lnUDP,
|
||||
}
|
||||
s.cm.Init()
|
||||
s.cm.Init("graphite")
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
@ -85,7 +85,7 @@ func (s *Server) MustStop() {
|
|||
if err := s.lnUDP.Close(); err != nil {
|
||||
logger.Errorf("cannot close UDP Graphite server: %s", err)
|
||||
}
|
||||
s.cm.CloseAll()
|
||||
s.cm.CloseAll(0)
|
||||
s.wg.Wait()
|
||||
logger.Infof("TCP and UDP Graphite servers at %q have been stopped", s.addr)
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reade
|
|||
lnTCP: lnTCP,
|
||||
lnUDP: lnUDP,
|
||||
}
|
||||
s.cm.Init()
|
||||
s.cm.Init("influx")
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
@ -85,7 +85,7 @@ func (s *Server) MustStop() {
|
|||
if err := s.lnUDP.Close(); err != nil {
|
||||
logger.Errorf("cannot close UDP InfluxDB server: %s", err)
|
||||
}
|
||||
s.cm.CloseAll()
|
||||
s.cm.CloseAll(0)
|
||||
s.wg.Wait()
|
||||
logger.Infof("TCP and UDP InfluxDB servers at %q have been stopped", s.addr)
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ func MustStart(addr string, useProxyProtocol bool, telnetInsertHandler func(r io
|
|||
httpServer: httpServer,
|
||||
lnUDP: lnUDP,
|
||||
}
|
||||
s.cm.Init()
|
||||
s.cm.Init("opentsdb")
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
@ -103,7 +103,7 @@ func (s *Server) MustStop() {
|
|||
if err := s.lnUDP.Close(); err != nil {
|
||||
logger.Errorf("cannot stop UDP OpenTSDB server: %s", err)
|
||||
}
|
||||
s.cm.CloseAll()
|
||||
s.cm.CloseAll(0)
|
||||
s.wg.Wait()
|
||||
logger.Infof("TCP and UDP OpenTSDB servers at %q have been stopped", s.addr)
|
||||
}
|
||||
|
|
|
@ -145,7 +145,7 @@ func NewServer(addr string, api API, limits Limits, disableResponseCompression b
|
|||
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_rows_read_total{addr=%q}`, addr)),
|
||||
}
|
||||
|
||||
s.connsMap.Init()
|
||||
s.connsMap.Init("vmselect")
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.run()
|
||||
|
@ -232,7 +232,7 @@ func (s *Server) MustStop() {
|
|||
|
||||
// Close existing connections from vmselect, so the goroutines
|
||||
// processing these connections are finished.
|
||||
s.connsMap.CloseAll()
|
||||
s.connsMap.CloseAll(0)
|
||||
|
||||
// Wait until all the goroutines processing vmselect conns are finished.
|
||||
s.wg.Wait()
|
||||
|
|
Loading…
Reference in a new issue