mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/netutil: limit the number of concurrently established connections when calling ConnPool.Get()
This should reduce potential spikes in the number of established connections in the following cases: - when the connection establishing procedure becomes temporarily slow - after a temporary spike in the rate of ConnPool.Get() calls See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
This commit is contained in:
parent
0a420c4708
commit
d8a276fbe4
2 changed files with 38 additions and 12 deletions
|
@ -18,6 +18,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `influx-prometheus-mode` command-line flag, which allows to restore the original time series written from Prometheus into InfluxDB during data migration from InfluxDB to VictoriaMetrics. See [this feature request](https://github.com/VictoriaMetrics/vmctl/issues/8). Thanks to @mback2k for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2545).
|
||||
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not return values from [label_value()](https://docs.victoriametrics.com/MetricsQL.html#label_value) functionif the original time series has no values at the selected timestamps.
|
||||
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): limit the number of concurrently established connections from vmselect to vmstorage. This should prevent from potentially high spikes in the number of established connections after temporary slowdown in connection handshake procedure between vmselect and vmstorage because of spikes in workload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552).
|
||||
|
||||
## [v1.77.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.1)
|
||||
|
||||
|
|
|
@ -15,6 +15,12 @@ type ConnPool struct {
|
|||
mu sync.Mutex
|
||||
d *TCPDialer
|
||||
|
||||
// concurrentDialsCh limits the number of concurrent dials the ConnPool can make.
|
||||
// This should prevent from creating an excees number of connections during temporary
|
||||
// spikes in workload at vmselect and vmstorage nodes.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
|
||||
concurrentDialsCh chan struct{}
|
||||
|
||||
name string
|
||||
handshakeFunc handshake.Func
|
||||
compressionLevel int
|
||||
|
@ -34,7 +40,8 @@ type connWithTimestamp struct {
|
|||
// The compression is disabled if compressionLevel <= 0.
|
||||
func NewConnPool(name, addr string, handshakeFunc handshake.Func, compressionLevel int) *ConnPool {
|
||||
cp := &ConnPool{
|
||||
d: NewTCPDialer(name, addr),
|
||||
d: NewTCPDialer(name, addr),
|
||||
concurrentDialsCh: make(chan struct{}, 8),
|
||||
|
||||
name: name,
|
||||
handshakeFunc: handshakeFunc,
|
||||
|
@ -59,25 +66,28 @@ func (cp *ConnPool) Addr() string {
|
|||
|
||||
// Get returns free connection from the pool.
|
||||
func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
|
||||
var bc *handshake.BufferedConn
|
||||
cp.mu.Lock()
|
||||
if len(cp.conns) > 0 {
|
||||
c := cp.conns[len(cp.conns)-1]
|
||||
bc = c.bc
|
||||
c.bc = nil
|
||||
cp.conns = cp.conns[:len(cp.conns)-1]
|
||||
}
|
||||
cp.mu.Unlock()
|
||||
if bc != nil {
|
||||
if bc := cp.tryGetConn(); bc != nil {
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// Limit the number of concurrent dials.
|
||||
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
|
||||
cp.concurrentDialsCh <- struct{}{}
|
||||
defer func() {
|
||||
<-cp.concurrentDialsCh
|
||||
}()
|
||||
// Make an attempt to get already established connections from the pool.
|
||||
// It may appear there while waiting for cp.concurrentDialsCh.
|
||||
if bc := cp.tryGetConn(); bc != nil {
|
||||
return bc, nil
|
||||
}
|
||||
// Pool is empty. Create new connection.
|
||||
c, err := cp.d.Dial()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot dial %s: %w", cp.d.Addr(), err)
|
||||
}
|
||||
if bc, err = cp.handshakeFunc(c, cp.compressionLevel); err != nil {
|
||||
bc, err := cp.handshakeFunc(c, cp.compressionLevel)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot perform %q handshake with server %q: %w", cp.name, cp.d.Addr(), err)
|
||||
_ = c.Close()
|
||||
return nil, err
|
||||
|
@ -85,6 +95,19 @@ func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
|
|||
return bc, nil
|
||||
}
|
||||
|
||||
func (cp *ConnPool) tryGetConn() *handshake.BufferedConn {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
if len(cp.conns) == 0 {
|
||||
return nil
|
||||
}
|
||||
c := cp.conns[len(cp.conns)-1]
|
||||
bc := c.bc
|
||||
c.bc = nil
|
||||
cp.conns = cp.conns[:len(cp.conns)-1]
|
||||
return bc
|
||||
}
|
||||
|
||||
// Put puts bc back to the pool.
|
||||
//
|
||||
// Do not put broken and closed connections to the pool!
|
||||
|
@ -105,6 +128,8 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
|
|||
|
||||
func (cp *ConnPool) closeIdleConns() {
|
||||
// Close connections, which were idle for more than 30 seconds.
|
||||
// This should reduce the number of connections after sudden spikes in query rate.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2508
|
||||
deadline := fasttime.UnixTimestamp() - 30
|
||||
var activeConns []connWithTimestamp
|
||||
cp.mu.Lock()
|
||||
|
|
Loading…
Reference in a new issue