lib/netutil.ConnPool: skip dialing remote address if the previous dial attempt was unsuccessful

If the previous dial attempt was unsuccessful, then all the new dial attempts are skipped
until the background goroutine determines that the given address can be successfully dialed.

This reduces query latency when some of vmstorage nodes are unavailable and dialing them is slow.

This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711

This commit is based on ideas from the https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2756

The main differences are:

- The check for healthy/unhealthy storage nodes is moved one level lower from app/vmselect/netstorage to lib/netutil.ConnPool.
  This makes possible re-using this feature everywhere lib/netutil.ConnPool is used.
- The check doesn't take into account handshake errors for already established connections.
  Handshake errors usually mean improperly configured VictoriaMetrics cluster, so they shouldn't be ignored.
This commit is contained in:
Aliaksandr Valialkin 2022-06-20 17:25:08 +03:00
parent 45e9732764
commit a1629bd3be
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 79 additions and 11 deletions

View file

@ -39,6 +39,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-promscrape.cluster.name` command-line flag, which allows proper data de-duplication when the same target is scraped from multiple [vmagent clusters](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2679).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `action: graphite` relabeling rules optimized for extracting labels from Graphite-style metric names. See [these docs](https://docs.victoriametrics.com/vmagent.html#graphite-relabeling) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2737).
* FEATURE: [VictoriaMetrics enterprise](https://victoriametrics.com/products/enterprise/): expose `vm_downsampling_partitions_scheduled` and `vm_downsampling_partitions_scheduled_size_bytes` metrics, which can be used for tracking the progress of initial [downsampling](https://docs.victoriametrics.com/#downsampling) for historical data. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2612).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): do not spend up to 5 seconds when trying to connect to unavailable `vmstorage` nodes. This should improve query latency when some of `vmstorage` nodes aren't available. Expose `vm_tcpdialer_addr_available{addr="..."}` metric at `http://vmselect:8481/metrics` for determining whether the given `addr` is available for establishing new connections. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711#issuecomment-1160363187).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `-vmstorageDialTimeout` command-line flags to `vmselect` and `vminsert` for tuning the maximum duration for connection estabilishing to `vmstorage` nodes. This should help resolving [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711).
* BUGFIX: support for data ingestion in [DataDog format](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) from legacy clients / agents. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670). Thanks to @elProxy for the fix.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not expose `vm_promscrape_service_discovery_duration_seconds_bucket` metric for unused service discovery types. This reduces the number of metrics exported at `http://vmagent:8429/metrics`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2671).

View file

@ -26,6 +26,12 @@ type ConnPool struct {
compressionLevel int
conns []connWithTimestamp
// lastDialError contains the last error seen when dialing remote addr.
// When it is non-nil and conns is empty, then ConnPool.Get() return this error.
// This reduces the time needed for dialing unavailable remote storage systems.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711#issuecomment-1160363187
lastDialError error
}
type connWithTimestamp struct {
@ -47,12 +53,22 @@ func NewConnPool(name, addr string, handshakeFunc handshake.Func, compressionLev
handshakeFunc: handshakeFunc,
compressionLevel: compressionLevel,
}
cp.checkAvailability(true)
_ = metrics.NewGauge(fmt.Sprintf(`vm_tcpdialer_conns_idle{name=%q, addr=%q}`, name, addr), func() float64 {
cp.mu.Lock()
n := len(cp.conns)
cp.mu.Unlock()
return float64(n)
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_tcpdialer_addr_available{name=%q, addr=%q}`, name, addr), func() float64 {
cp.mu.Lock()
isAvailable := len(cp.conns) > 0 || cp.lastDialError == nil
cp.mu.Unlock()
if isAvailable {
return 1
}
return 0
})
connPoolsMu.Lock()
connPools = append(connPools, cp)
connPoolsMu.Unlock()
@ -66,10 +82,18 @@ func (cp *ConnPool) Addr() string {
// Get returns free connection from the pool.
func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
if bc := cp.tryGetConn(); bc != nil {
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc != nil {
// Fast path - obtained the connection from pool.
return bc, nil
}
return cp.getConnSlow()
}
func (cp *ConnPool) getConnSlow() (*handshake.BufferedConn, error) {
// Limit the number of concurrent dials.
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
cp.concurrentDialsCh <- struct{}{}
@ -78,34 +102,50 @@ func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
}()
// Make an attempt to get already established connections from the pool.
// It may appear there while waiting for cp.concurrentDialsCh.
if bc := cp.tryGetConn(); bc != nil {
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc != nil {
return bc, nil
}
// Pool is empty. Create new connection.
return cp.dialAndHandshake()
}
func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
c, err := cp.d.Dial()
if err != nil {
return nil, fmt.Errorf("cannot dial %s: %w", cp.d.Addr(), err)
err = fmt.Errorf("cannot dial %s: %w", cp.d.Addr(), err)
}
cp.mu.Lock()
cp.lastDialError = err
cp.mu.Unlock()
if err != nil {
return nil, err
}
bc, err := cp.handshakeFunc(c, cp.compressionLevel)
if err != nil {
// Do not put handshake error to cp.lastDialError, because handshake
// is perfomed on an already established connection.
err = fmt.Errorf("cannot perform %q handshake with server %q: %w", cp.name, cp.d.Addr(), err)
_ = c.Close()
return nil, err
}
return bc, nil
return bc, err
}
func (cp *ConnPool) tryGetConn() *handshake.BufferedConn {
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
cp.mu.Lock()
defer cp.mu.Unlock()
if len(cp.conns) == 0 {
return nil
return nil, cp.lastDialError
}
c := cp.conns[len(cp.conns)-1]
bc := c.bc
c.bc = nil
cp.conns = cp.conns[:len(cp.conns)-1]
return bc
return bc, nil
}
// Put puts bc back to the pool.
@ -146,18 +186,44 @@ func (cp *ConnPool) closeIdleConns() {
cp.mu.Unlock()
}
func (cp *ConnPool) checkAvailability(force bool) {
cp.mu.Lock()
hasDialError := cp.lastDialError != nil
cp.mu.Unlock()
if hasDialError || force {
bc, _ := cp.dialAndHandshake()
if bc != nil {
cp.Put(bc)
}
}
}
func init() {
go func() {
for {
time.Sleep(17 * time.Second)
connPoolsMu.Lock()
for _, cp := range connPools {
forEachConnPool(func(cp *ConnPool) {
cp.closeIdleConns()
}
connPoolsMu.Unlock()
})
}
}()
go func() {
for {
time.Sleep(time.Second)
forEachConnPool(func(cp *ConnPool) {
cp.checkAvailability(false)
})
}
}()
}
var connPoolsMu sync.Mutex
var connPools []*ConnPool
func forEachConnPool(f func(cp *ConnPool)) {
connPoolsMu.Lock()
for _, cp := range connPools {
f(cp)
}
connPoolsMu.Unlock()
}