app/{vminsert,vmselect}/netstorage: allow calling Init()+MustStop() in a loop

Previously netstorage.MustStop() call didn't free up all the resources,
so the subsequent call to nestorage.Init() would panic.

This allows writing tests, which call nestorage.Init() + nestorage.MustStop() in a loop.
This commit is contained in:
Aliaksandr Valialkin 2022-10-25 14:41:56 +03:00
parent 9ccd22c1f6
commit d9bbf24183
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 72 additions and 36 deletions

2
go.mod
View file

@ -10,7 +10,7 @@ require (
// Do not use the original github.com/valyala/fasthttp because of issues
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.1.0
github.com/VictoriaMetrics/metrics v1.22.2
github.com/VictoriaMetrics/metrics v1.23.0
github.com/VictoriaMetrics/metricsql v0.45.0
github.com/aws/aws-sdk-go-v2 v1.17.0
github.com/aws/aws-sdk-go-v2/config v1.17.9

4
go.sum
View file

@ -95,8 +95,8 @@ github.com/VictoriaMetrics/fastcache v1.12.0/go.mod h1:tjiYeEfYXCqacuvYw/7UoDIeJ
github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a/ueoLdLL0=
github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ=
github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/VictoriaMetrics/metrics v1.22.2 h1:A6LsNidYwkAHetxsvNFaUWjtzu5ltdgNEoS6i7Bn+6I=
github.com/VictoriaMetrics/metrics v1.22.2/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/VictoriaMetrics/metrics v1.23.0 h1:WzfqyzCaxUZip+OBbg1+lV33WChDSu4ssYII3nxtpeA=
github.com/VictoriaMetrics/metrics v1.23.0/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/VictoriaMetrics/metricsql v0.45.0 h1:kVQHnkDJm4qyJ8f5msTclmwqAtlUdPbbEJ7zoa/FTNs=
github.com/VictoriaMetrics/metricsql v0.45.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=

View file

@ -26,20 +26,20 @@ type connMetrics struct {
conns *metrics.Counter
}
func (cm *connMetrics) init(group, name, addr string) {
cm.readCalls = metrics.NewCounter(fmt.Sprintf(`%s_read_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.readBytes = metrics.NewCounter(fmt.Sprintf(`%s_read_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.readErrors = metrics.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="read"}`, group, name, addr))
cm.readTimeouts = metrics.NewCounter(fmt.Sprintf(`%s_read_timeouts_total{name=%q, addr=%q}`, group, name, addr))
func (cm *connMetrics) init(ms *metrics.Set, group, name, addr string) {
cm.readCalls = ms.NewCounter(fmt.Sprintf(`%s_read_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.readBytes = ms.NewCounter(fmt.Sprintf(`%s_read_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.readErrors = ms.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="read"}`, group, name, addr))
cm.readTimeouts = ms.NewCounter(fmt.Sprintf(`%s_read_timeouts_total{name=%q, addr=%q}`, group, name, addr))
cm.writeCalls = metrics.NewCounter(fmt.Sprintf(`%s_write_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.writtenBytes = metrics.NewCounter(fmt.Sprintf(`%s_written_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.writeErrors = metrics.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="write"}`, group, name, addr))
cm.writeTimeouts = metrics.NewCounter(fmt.Sprintf(`%s_write_timeouts_total{name=%q, addr=%q}`, group, name, addr))
cm.writeCalls = ms.NewCounter(fmt.Sprintf(`%s_write_calls_total{name=%q, addr=%q}`, group, name, addr))
cm.writtenBytes = ms.NewCounter(fmt.Sprintf(`%s_written_bytes_total{name=%q, addr=%q}`, group, name, addr))
cm.writeErrors = ms.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="write"}`, group, name, addr))
cm.writeTimeouts = ms.NewCounter(fmt.Sprintf(`%s_write_timeouts_total{name=%q, addr=%q}`, group, name, addr))
cm.closeErrors = metrics.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="close"}`, group, name, addr))
cm.closeErrors = ms.NewCounter(fmt.Sprintf(`%s_errors_total{name=%q, addr=%q, type="close"}`, group, name, addr))
cm.conns = metrics.NewCounter(fmt.Sprintf(`%s_conns{name=%q, addr=%q}`, group, name, addr))
cm.conns = ms.NewCounter(fmt.Sprintf(`%s_conns{name=%q, addr=%q}`, group, name, addr))
}
type statConn struct {

View file

@ -16,8 +16,7 @@ var enableTCP6 = flag.Bool("enableTCP6", false, "Whether to enable IPv6 for list
// NewTCPListener returns new TCP listener for the given addr and optional tlsConfig.
//
// name is used for exported metrics. Each listener in the program must have
// distinct name.
// name is used for metrics registered in ms. Each listener in the program must have distinct name.
func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, error) {
network := GetTCPNetwork()
ln, err := net.Listen(network, addr)
@ -27,13 +26,14 @@ func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, err
if tlsConfig != nil {
ln = tls.NewListener(ln, tlsConfig)
}
ms := metrics.GetDefaultSet()
tln := &TCPListener{
Listener: ln,
accepts: metrics.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)),
acceptErrors: metrics.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)),
accepts: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)),
acceptErrors: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)),
}
tln.connMetrics.init("vm_tcplistener", name, addr)
tln.connMetrics.init(ms, "vm_tcplistener", name, addr)
return tln, err
}

View file

@ -22,6 +22,7 @@ import (
type namedMetric struct {
name string
metric metric
isAux bool
}
type metric interface {
@ -49,6 +50,8 @@ func RegisterSet(s *Set) {
}
// UnregisterSet stops exporting metrics for the given s via global WritePrometheus() call.
//
// Call s.UnregisterAllMetrics() after unregistering s if it is no longer used.
func UnregisterSet(s *Set) {
registeredSetsLock.Lock()
delete(registeredSets, s)
@ -180,11 +183,23 @@ func WriteFDMetrics(w io.Writer) {
}
// UnregisterMetric removes metric with the given name from default set.
//
// See also UnregisterAllMetrics.
func UnregisterMetric(name string) bool {
return defaultSet.UnregisterMetric(name)
}
// ListMetricNames returns a list of all the metric names from default set.
// UnregisterAllMetrics unregisters all the metrics from default set.
func UnregisterAllMetrics() {
defaultSet.UnregisterAllMetrics()
}
// ListMetricNames returns sorted list of all the metric names from default set.
func ListMetricNames() []string {
return defaultSet.ListMetricNames()
}
// GetDefaultSet returns the default metrics set.
func GetDefaultSet() *Set {
return defaultSet
}

View file

@ -336,7 +336,7 @@ func (s *Set) NewSummaryExt(name string, window time.Duration, quantiles []float
// checks in tests
defer s.mu.Unlock()
s.mustRegisterLocked(name, sm)
s.mustRegisterLocked(name, sm, false)
registerSummaryLocked(sm)
s.registerSummaryQuantilesLocked(name, sm)
s.summaries = append(s.summaries, sm)
@ -420,7 +420,7 @@ func (s *Set) registerSummaryQuantilesLocked(name string, sm *Summary) {
sm: sm,
idx: i,
}
s.mustRegisterLocked(quantileValueName, qv)
s.mustRegisterLocked(quantileValueName, qv, true)
}
}
@ -432,18 +432,19 @@ func (s *Set) registerMetric(name string, m metric) {
// defer will unlock in case of panic
// checks in test
defer s.mu.Unlock()
s.mustRegisterLocked(name, m)
s.mustRegisterLocked(name, m, false)
}
// mustRegisterLocked registers given metric with
// the given name. Panics if the given name was
// already registered before.
func (s *Set) mustRegisterLocked(name string, m metric) {
// mustRegisterLocked registers given metric with the given name.
//
// Panics if the given name was already registered before.
func (s *Set) mustRegisterLocked(name string, m metric, isAux bool) {
nm, ok := s.m[name]
if !ok {
nm = &namedMetric{
name: name,
metric: m,
isAux: isAux,
}
s.m[name] = nm
s.a = append(s.a, nm)
@ -465,8 +466,16 @@ func (s *Set) UnregisterMetric(name string) bool {
if !ok {
return false
}
m := nm.metric
if nm.isAux {
// Do not allow deleting auxiliary metrics such as summary_metric{quantile="..."}
// Such metrics must be deleted via parent metric name, e.g. summary_metric .
return false
}
return s.unregisterMetricLocked(nm)
}
func (s *Set) unregisterMetricLocked(nm *namedMetric) bool {
name := nm.name
delete(s.m, name)
deleteFromList := func(metricName string) {
@ -482,9 +491,9 @@ func (s *Set) UnregisterMetric(name string) bool {
// remove metric from s.a
deleteFromList(name)
sm, ok := m.(*Summary)
sm, ok := nm.metric.(*Summary)
if !ok {
// There is no need in cleaning up summary.
// There is no need in cleaning up non-summary metrics.
return true
}
@ -511,13 +520,25 @@ func (s *Set) UnregisterMetric(name string) bool {
return true
}
// ListMetricNames returns a list of all the metrics in s.
// UnregisterAllMetrics de-registers all metrics registered in s.
func (s *Set) UnregisterAllMetrics() {
metricNames := s.ListMetricNames()
for _, name := range metricNames {
s.UnregisterMetric(name)
}
}
// ListMetricNames returns sorted list of all the metrics in s.
func (s *Set) ListMetricNames() []string {
s.mu.Lock()
defer s.mu.Unlock()
var list []string
for name := range s.m {
list = append(list, name)
metricNames := make([]string, 0, len(s.m))
for _, nm := range s.m {
if nm.isAux {
continue
}
metricNames = append(metricNames, nm.name)
}
return list
sort.Strings(metricNames)
return metricNames
}

2
vendor/modules.txt vendored
View file

@ -62,7 +62,7 @@ github.com/VictoriaMetrics/fastcache
github.com/VictoriaMetrics/fasthttp
github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.22.2
# github.com/VictoriaMetrics/metrics v1.23.0
## explicit; go 1.15
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.45.0