VictoriaMetrics/lib/ingestserver/conns_map.go

94 lines
2.3 KiB
Go
Raw Normal View History

package ingestserver
import (
"net"
vmcluster: re-routing enhancement (#5293) * app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 00:00:42 +00:00
"sort"
"sync"
vmcluster: re-routing enhancement (#5293) * app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 00:00:42 +00:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// ConnsMap is used for tracking active connections.
type ConnsMap struct {
vmcluster: re-routing enhancement (#5293) * app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 00:00:42 +00:00
clientName string
mu sync.Mutex
m map[net.Conn]struct{}
isClosed bool
}
// Init initializes cm.
vmcluster: re-routing enhancement (#5293) * app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 00:00:42 +00:00
func (cm *ConnsMap) Init(clientName string) {
cm.clientName = clientName
cm.m = make(map[net.Conn]struct{})
cm.isClosed = false
}
// Add adds c to cm.
func (cm *ConnsMap) Add(c net.Conn) bool {
cm.mu.Lock()
ok := !cm.isClosed
if ok {
cm.m[c] = struct{}{}
}
cm.mu.Unlock()
return ok
}
// Delete deletes c from cm.
func (cm *ConnsMap) Delete(c net.Conn) {
cm.mu.Lock()
delete(cm.m, c)
cm.mu.Unlock()
}
vmcluster: re-routing enhancement (#5293) * app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 00:00:42 +00:00
// CloseAll gradually closes all the cm conns with during the given shutdownDuration.
func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
cm.mu.Lock()
conns := make([]net.Conn, 0, len(cm.m))
for c := range cm.m {
vmcluster: re-routing enhancement (#5293) * app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 00:00:42 +00:00
conns = append(conns, c)
delete(cm.m, c)
}
cm.isClosed = true
cm.mu.Unlock()
vmcluster: re-routing enhancement (#5293) * app/vmstorage: close vminsert connections gradually before stopping storage Implements graceful shutdown approach suggested here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1768146878 Test results for this can be found here - https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4922#issuecomment-1790640274 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update graceful shutdown logic - close connections from vminsert in determenistic order - update flag description - lower default timeout to 25 seconds. 25 seconds value was chosen because the lowest default value used in default configuration deployments is 30s(default value in Kubernetes and ansible-playbooks). Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add information about re-routing enhancement during restart Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/changelog: add entry for new command-line flag Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * {app/vmstorage,lib/ingestserver}: address review feedback Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/cluster: add note to update workload scheduler timeout Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * wip --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 00:00:42 +00:00
if shutdownDuration <= 0 {
// Close all the connections at once.
for _, c := range conns {
_ = c.Close()
}
return
}
if len(conns) == 0 {
return
}
if len(conns) == 1 {
// Simple case - just close a single connection and that's it!
_ = conns[0].Close()
return
}
// Sort vminsert conns in order to make the order of closing connections deterministic across vmstorage nodes.
// This should reduce resource usage spikes at vmstorage nodes during rolling restarts.
sort.Slice(conns, func(i, j int) bool {
return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String()
})
shutdownInterval := shutdownDuration / time.Duration(len(conns)-1)
startTime := time.Now()
logger.Infof("closing %d %s connections with %dms interval between them", len(conns), cm.clientName, shutdownInterval.Milliseconds())
remoteAddr := conns[0].RemoteAddr().String()
_ = conns[0].Close()
logger.Infof("closed %s connection %s", cm.clientName, remoteAddr)
conns = conns[1:]
for _, c := range conns {
time.Sleep(shutdownInterval)
remoteAddr := c.RemoteAddr().String()
_ = c.Close()
logger.Infof("closed %s connection %s", cm.clientName, remoteAddr)
}
logger.Infof("closed %d %s connections in %s", len(conns), cm.clientName, time.Since(startTime))
}