mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/{vminsert,vmselect}: add -vmstorageDialTimeout
command-line flag for tuning the maximum time needed for establishing connections to vmstorage
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
This commit is contained in:
parent
0abf46d66a
commit
b28c6febf9
4 changed files with 10 additions and 10 deletions
|
@ -31,6 +31,7 @@ var (
|
|||
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
|
||||
disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -dropSamplesOnOverload")
|
||||
dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded")
|
||||
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage")
|
||||
)
|
||||
|
||||
var errStorageReadOnly = errors.New("storage node is read only")
|
||||
|
@ -474,7 +475,7 @@ func InitStorageNodes(addrs []string, hashSeed uint64) {
|
|||
addr += ":8400"
|
||||
}
|
||||
sn := &storageNode{
|
||||
dialer: netutil.NewTCPDialer("vminsert", addr),
|
||||
dialer: netutil.NewTCPDialer("vminsert", addr, *vmstorageDialTimeout),
|
||||
|
||||
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
|
|
|
@ -36,8 +36,9 @@ import (
|
|||
var (
|
||||
replicationFactor = flag.Int("replicationFactor", 1, "How many copies of every time series is available on vmstorage nodes. "+
|
||||
"See -replicationFactor command-line flag for vminsert nodes")
|
||||
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery")
|
||||
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
|
||||
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery")
|
||||
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
|
||||
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 5*time.Second, "Timeout for establishing RPC connections from vmselect to vmstorage")
|
||||
)
|
||||
|
||||
// Result is a single timeseries result.
|
||||
|
@ -2275,7 +2276,7 @@ func InitStorageNodes(addrs []string) {
|
|||
}
|
||||
sn := &storageNode{
|
||||
// There is no need in requests compression, since they are usually very small.
|
||||
connPool: netutil.NewConnPool("vmselect", addr, handshake.VMSelectClient, 0),
|
||||
connPool: netutil.NewConnPool("vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout),
|
||||
|
||||
concurrentQueries: metrics.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
|
||||
|
||||
|
|
|
@ -38,9 +38,9 @@ type connWithTimestamp struct {
|
|||
// Name is used in exported metrics.
|
||||
// handshakeFunc is used for handshaking after the connection establishing.
|
||||
// The compression is disabled if compressionLevel <= 0.
|
||||
func NewConnPool(name, addr string, handshakeFunc handshake.Func, compressionLevel int) *ConnPool {
|
||||
func NewConnPool(name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout time.Duration) *ConnPool {
|
||||
cp := &ConnPool{
|
||||
d: NewTCPDialer(name, addr),
|
||||
d: NewTCPDialer(name, addr, dialTimeout),
|
||||
concurrentDialsCh: make(chan struct{}, 8),
|
||||
|
||||
name: name,
|
||||
|
|
|
@ -12,12 +12,10 @@ import (
|
|||
//
|
||||
// The name is used in metric tags for the returned dialer.
|
||||
// The name must be unique among dialers.
|
||||
func NewTCPDialer(name, addr string) *TCPDialer {
|
||||
func NewTCPDialer(name, addr string, dialTimeout time.Duration) *TCPDialer {
|
||||
d := &TCPDialer{
|
||||
d: &net.Dialer{
|
||||
// The timeout for establishing a TCP connection.
|
||||
// 5 seconds should be enough for the majority of cases.
|
||||
Timeout: 5 * time.Second,
|
||||
Timeout: dialTimeout,
|
||||
|
||||
// How frequently to send keep-alive packets over established TCP connections.
|
||||
KeepAlive: time.Second,
|
||||
|
|
Loading…
Reference in a new issue