diff --git a/app/victoria-logs/main.go b/app/victoria-logs/main.go index 62d952c07a..1a3aea2a96 100644 --- a/app/victoria-logs/main.go +++ b/app/victoria-logs/main.go @@ -37,7 +37,6 @@ func main() { cgroup.SetGOGC(*gogc) buildinfo.Init() logger.Init() - pushmetrics.Init() logger.Infof("starting VictoriaLogs at %q...", *httpListenAddr) startTime := time.Now() @@ -49,8 +48,10 @@ func main() { go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) logger.Infof("started VictoriaLogs in %.3f seconds; see https://docs.victoriametrics.com/VictoriaLogs/", time.Since(startTime).Seconds()) + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("received signal %s", sig) + pushmetrics.Stop() logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr) startTime = time.Now() @@ -59,8 +60,6 @@ func main() { } logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds()) - pushmetrics.Stop() - vlinsert.Stop() vlselect.Stop() vlstorage.Stop() diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 151e957c4e..96043fd634 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -95,7 +95,6 @@ func main() { remotewrite.InitSecretFlags() buildinfo.Init() logger.Init() - pushmetrics.Init() if promscrape.IsDryRun() { if err := promscrape.CheckConfig(); err != nil { @@ -146,8 +145,10 @@ func main() { } logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds()) + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("received signal %s", sig) + pushmetrics.Stop() startTime = time.Now() if len(*httpListenAddr) > 0 { @@ -158,7 +159,6 @@ func main() { logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds()) } - pushmetrics.Stop() promscrape.Stop() if len(*influxListenAddr) > 0 { diff --git a/app/vmalert/main.go b/app/vmalert/main.go index bbf22dc842..b47c58f6cb 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -104,7 +104,6 @@ func main() { notifier.InitSecretFlags() buildinfo.Init() logger.Init() - pushmetrics.Init() if !*remoteReadIgnoreRestoreErrors { logger.Warnf("flag `remoteRead.ignoreRestoreErrors` is deprecated and will be removed in next releases.") @@ -190,12 +189,14 @@ func main() { rh := &requestHandler{m: manager} go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler) + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() + if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop the webservice: %s", err) } - pushmetrics.Stop() cancel() manager.close() } diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 17b1baf27b..71c5a0d252 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -51,7 +51,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() logger.Infof("starting vmauth at %q...", *httpListenAddr) startTime := time.Now() @@ -59,15 +58,16 @@ func main() { go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds()) + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("received signal %s", sig) + pushmetrics.Stop() startTime = time.Now() logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr) if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop the webservice: %s", err) } - pushmetrics.Stop() logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds()) stopAuthConfig() logger.Infof("successfully stopped vmauth in %.3f seconds", time.Since(startTime).Seconds()) diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 9b3f79996f..1e978c1dde 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -47,7 +47,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() // Storing snapshot delete function to be able to call it in case // of error since logger.Fatal will exit the program without @@ -98,18 +97,19 @@ func main() { go httpserver.Serve(*httpListenAddr, false, nil) + pushmetrics.Init() err := makeBackup() deleteSnapshot() if err != nil { logger.Fatalf("cannot create backup: %s", err) } + pushmetrics.Stop() startTime := time.Now() logger.Infof("gracefully shutting down http server for metrics at %q", *httpListenAddr) if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop http server for metrics: %s", err) } - pushmetrics.Stop() logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds()) } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index cd0ae83e16..d1dc06fdad 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -93,7 +93,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() logger.Infof("initializing netstorage for storageNodes %s...", *storageNodes) startTime := time.Now() @@ -148,8 +147,10 @@ func main() { httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() logger.Infof("gracefully shutting down http service at %q", *httpListenAddr) startTime = time.Now() @@ -158,8 +159,6 @@ func main() { } logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds()) - pushmetrics.Stop() - if len(*clusternativeListenAddr) > 0 { clusternativeServer.MustStop() } diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index 799d56ba7e..c0389bc432 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -36,7 +36,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() go httpserver.Serve(*httpListenAddr, false, nil) @@ -54,9 +53,11 @@ func main() { Dst: dstFS, SkipBackupCompleteCheck: *skipBackupCompleteCheck, } + pushmetrics.Init() if err := a.Run(); err != nil { logger.Fatalf("cannot restore from backup: %s", err) } + pushmetrics.Stop() srcFS.MustStop() dstFS.MustStop() @@ -65,7 +66,6 @@ func main() { if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop http server for metrics: %s", err) } - pushmetrics.Stop() logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds()) } diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 906a8d4fdb..70c8d14506 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -91,7 +91,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() logger.Infof("starting netstorage at storageNodes %s", *storageNodes) startTime := time.Now() @@ -135,8 +134,10 @@ func main() { httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() logger.Infof("gracefully shutting down http service at %q", *httpListenAddr) startTime = time.Now() @@ -145,8 +146,6 @@ func main() { } logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds()) - pushmetrics.Stop() - if vmselectapiServer != nil { logger.Infof("stopping vmselectapi server...") vmselectapiServer.MustStop() diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index f232e93bfc..e6710bb645 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -85,7 +85,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - pushmetrics.Init() storage.SetDedupInterval(*minScrapeInterval) storage.SetDataFlushInterval(*inmemoryDataFlushInterval) @@ -134,8 +133,10 @@ func main() { httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler) }() + pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("service received signal %s", sig) + pushmetrics.Stop() logger.Infof("gracefully shutting down http service at %q", *httpListenAddr) startTime = time.Now() @@ -144,8 +145,6 @@ func main() { } logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds()) - pushmetrics.Stop() - logger.Infof("gracefully shutting down the service") startTime = time.Now() stopStaleSnapshotsRemover() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8f1e1bdfe5..dba93ec7dd 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -18,7 +18,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly return results from [bottomk](https://docs.victoriametrics.com/MetricsQL.html#bottomk) and `bottomk_*()` functions when some of these results contain NaN values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506). Thanks to @xiaozongyang for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5509). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle queries, which wrap [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) with multiple arguments without explicitly specified lookbehind window in square brackets into [aggregate functions](https://docs.victoriametrics.com/MetricsQL.html#aggregate-functions). For example, `sum(quantile_over_time(0.5, process_resident_memory_bytes))` was resulting to `expecting at least 2 args to ...; got 1 args` error. Thanks to @atykhyy for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5414). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly assume role with [AWS IRSA authorization](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html). Previously role chaining was not supported. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3822) for details. -* BUGFIX: all: fix potential panic during components shutdown when `-pushmetrics.url` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549). +* BUGFIX: all: fix potential panic during components shutdown when [metrics push](https://docs.victoriametrics.com/#push-metrics) is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): check for Error field in response from influx client during migration. Before, only network errors were checked. Thanks to @wozz for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5446). * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): retry on import errors in `vm-native` mode. Before, retries happened only on writes into a network connection between source and destination. But errors returned by server after all the data was transmitted were logged, but not retried. diff --git a/lib/pushmetrics/pushmetrics.go b/lib/pushmetrics/pushmetrics.go index 5528d80d36..45e1e0a1d4 100644 --- a/lib/pushmetrics/pushmetrics.go +++ b/lib/pushmetrics/pushmetrics.go @@ -1,8 +1,10 @@ package pushmetrics import ( + "context" "flag" "strings" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/appmetrics" @@ -26,6 +28,7 @@ func init() { var ( pushCtx, cancelPushCtx = context.WithCancel(context.Background()) + wgDone sync.WaitGroup ) // Init must be called after logger.Init @@ -34,6 +37,7 @@ func Init() { for _, pu := range *pushURL { opts := &metrics.PushOptions{ ExtraLabels: extraLabels, + WaitGroup: &wgDone, } if err := metrics.InitPushExtWithOptions(pushCtx, pu, *pushInterval, appmetrics.WritePrometheusMetrics, opts); err != nil { logger.Fatalf("cannot initialize pushmetrics: %s", err) @@ -48,4 +52,5 @@ func Init() { // Stop must be called after Init. func Stop() { cancelPushCtx() + wgDone.Wait() }