From 4f3764b140e14c5d57537313d6331d6520713657 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 15 Jan 2024 13:37:02 +0200 Subject: [PATCH] lib/pushmetrics: wait until the background goroutines, which push metrics, are stopped at pushmetrics.Stop() Previously the was a race condition when the background goroutine still could try collecting metrics from already stopped resources after returning from pushmetrics.Stop(). Now the pushmetrics.Stop() waits until the background goroutine is stopped before returning. This is a follow-up for https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549 and the commit fe2d9f6646a49c347b1ee03a0285971eaf681ed6 . Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548 --- app/vmagent/main.go | 4 ++-- app/vmalert/main.go | 5 +++-- app/vmauth/main.go | 4 ++-- app/vmbackup/main.go | 4 ++-- app/vminsert/main.go | 5 ++--- app/vmrestore/main.go | 4 ++-- app/vmselect/main.go | 5 ++--- app/vmstorage/main.go | 5 ++--- docs/CHANGELOG.md | 2 +- lib/pushmetrics/pushmetrics.go | 5 +++++ 10 files changed, 23 insertions(+), 20 deletions(-) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 92ab52514..5e5d7125d 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -94,7 +94,6 @@ func main() { remotewrite.InitSecretFlags() buildinfo.Init() logger.Init() - pushmetrics.Init() if promscrape.IsDryRun() { if err := promscrape.CheckConfig(); err != nil { @@ -142,8 +141,10 @@ func main() { } logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds()) + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("received signal %s", sig) + pushmetrics.Stop() startTime = time.Now() if len(*httpListenAddr) > 0 { @@ -154,7 +155,6 @@ func main() { logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds()) } - pushmetrics.Stop() promscrape.Stop() if len(*influxListenAddr) > 0 { diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 96f9cca53..389e135bb 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -93,7 +93,6 @@ func main() { datasource.InitSecretFlags() buildinfo.Init() logger.Init() - pushmetrics.Init() if !*remoteReadIgnoreRestoreErrors { logger.Warnf("flag `remoteRead.ignoreRestoreErrors` is deprecated and will be removed in next releases.") @@ -179,12 +178,14 @@ func main() { rh := &requestHandler{m: manager} go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler) + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() + if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop the webservice: %s", err) } - pushmetrics.Stop() cancel() manager.close() } diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 00c08f953..b13f06333 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -47,7 +47,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() logger.Infof("starting vmauth at %q...", *httpListenAddr) startTime := time.Now() @@ -55,15 +54,16 @@ func main() { go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds()) + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("received signal %s", sig) + pushmetrics.Stop() startTime = time.Now() logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr) if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop the webservice: %s", err) } - pushmetrics.Stop() logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds()) stopAuthConfig() logger.Infof("successfully stopped vmauth in %.3f seconds", time.Since(startTime).Seconds()) diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index dc157459b..87b85cb72 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -47,7 +47,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() // Storing snapshot delete function to be able to call it in case // of error since logger.Fatal will exit the program without @@ -98,18 +97,19 @@ func main() { go httpserver.Serve(*httpListenAddr, false, nil) + pushmetrics.Init() err := makeBackup() deleteSnapshot() if err != nil { logger.Fatalf("cannot create backup: %s", err) } + pushmetrics.Stop() startTime := time.Now() logger.Infof("gracefully shutting down http server for metrics at %q", *httpListenAddr) if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop http server for metrics: %s", err) } - pushmetrics.Stop() logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds()) } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index cc2ce3a91..51a126f52 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -92,7 +92,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() logger.Infof("initializing netstorage for storageNodes %s...", *storageNodes) startTime := time.Now() @@ -147,8 +146,10 @@ func main() { httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() logger.Infof("gracefully shutting down http service at %q", *httpListenAddr) startTime = time.Now() @@ -157,8 +158,6 @@ func main() { } logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds()) - pushmetrics.Stop() - if len(*clusternativeListenAddr) > 0 { clusternativeServer.MustStop() } diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index 799d56ba7..c0389bc43 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -36,7 +36,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() go httpserver.Serve(*httpListenAddr, false, nil) @@ -54,9 +53,11 @@ func main() { Dst: dstFS, SkipBackupCompleteCheck: *skipBackupCompleteCheck, } + pushmetrics.Init() if err := a.Run(); err != nil { logger.Fatalf("cannot restore from backup: %s", err) } + pushmetrics.Stop() srcFS.MustStop() dstFS.MustStop() @@ -65,7 +66,6 @@ func main() { if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop http server for metrics: %s", err) } - pushmetrics.Stop() logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds()) } diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 56141d6e3..7d8586ee7 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -89,7 +89,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() logger.Infof("starting netstorage at storageNodes %s", *storageNodes) startTime := time.Now() @@ -133,8 +132,10 @@ func main() { httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() logger.Infof("gracefully shutting down http service at %q", *httpListenAddr) startTime = time.Now() @@ -143,8 +144,6 @@ func main() { } logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds()) - pushmetrics.Stop() - if vmselectapiServer != nil { logger.Infof("stopping vmselectapi server...") vmselectapiServer.MustStop() diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 4e30d8c8a..0e1c310b0 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -83,7 +83,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() storage.SetDedupInterval(*minScrapeInterval) storage.SetDataFlushInterval(*inmemoryDataFlushInterval) @@ -135,8 +134,10 @@ func main() { httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() logger.Infof("gracefully shutting down http service at %q", *httpListenAddr) startTime = time.Now() @@ -145,8 +146,6 @@ func main() { } logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds()) - pushmetrics.Stop() - logger.Infof("gracefully shutting down the service") startTime = time.Now() stopStaleSnapshotsRemover() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9a89d6dec..9a887c8ba 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,7 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle queries, which wrap [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) with multiple arguments without explicitly specified lookbehind window in square brackets into [aggregate functions](https://docs.victoriametrics.com/MetricsQL.html#aggregate-functions). For example, `sum(quantile_over_time(0.5, process_resident_memory_bytes))` was resulting to `expecting at least 2 args to ...; got 1 args' error. Thanks to @atykhyy for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5414). * BUGFIX: `vmstorage`: properly expire `storage/prefetchedMetricIDs` cache. Previously this cache was never expired, so it could grow big under [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). This could result in increasing CPU load over time. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly return results from [bottomk](https://docs.victoriametrics.com/MetricsQL.html#bottomk) and `bottomk_...()` functions when some of these results contain NaN values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506). Thanks to @xiaozongyang for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5509). -* BUGFIX: all: fix potential panic during components shutdown when `-pushmetrics.url` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549). +* BUGFIX: all: fix potential panic during components shutdown when [metrics push](https://docs.victoriametrics.com/#push-metrics) is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549). ## [v1.87.12](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.12) diff --git a/lib/pushmetrics/pushmetrics.go b/lib/pushmetrics/pushmetrics.go index 626866890..ca96cd8ba 100644 --- a/lib/pushmetrics/pushmetrics.go +++ b/lib/pushmetrics/pushmetrics.go @@ -1,8 +1,10 @@ package pushmetrics import ( + "context" "flag" "strings" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/appmetrics" @@ -26,6 +28,7 @@ func init() { var ( pushCtx, cancelPushCtx = context.WithCancel(context.Background()) + wgDone sync.WaitGroup ) // Init must be called after logger.Init @@ -34,6 +37,7 @@ func Init() { for _, pu := range *pushURL { opts := &metrics.PushOptions{ ExtraLabels: extraLabels, + WaitGroup: &wgDone, } if err := metrics.InitPushExtWithOptions(pushCtx, pu, *pushInterval, appmetrics.WritePrometheusMetrics, opts); err != nil { logger.Fatalf("cannot initialize pushmetrics: %s", err) @@ -48,4 +52,5 @@ func Init() { // Stop must be called after Init. func Stop() { cancelPushCtx() + wgDone.Wait() }