From 3023b68d429371690613c45cad9277927acd5c1b Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Mon, 15 Jan 2024 13:37:02 +0200
Subject: [PATCH] lib/pushmetrics: wait until the background goroutines, which
 push metrics, are stopped at pushmetrics.Stop()

Previously the was a race condition when the background goroutine still could try collecting metrics
from already stopped resources after returning from pushmetrics.Stop().
Now the pushmetrics.Stop() waits until the background goroutine is stopped before returning.

This is a follow-up for https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549
and the commit fe2d9f6646a49c347b1ee03a0285971eaf681ed6 .

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548
---
 app/victoria-logs/main.go      | 5 ++---
 app/vmagent/main.go            | 4 ++--
 app/vmalert/main.go            | 5 +++--
 app/vmauth/main.go             | 4 ++--
 app/vmbackup/main.go           | 4 ++--
 app/vminsert/main.go           | 5 ++---
 app/vmrestore/main.go          | 4 ++--
 app/vmselect/main.go           | 5 ++---
 app/vmstorage/main.go          | 5 ++---
 docs/CHANGELOG.md              | 2 +-
 lib/pushmetrics/pushmetrics.go | 5 +++++
 11 files changed, 25 insertions(+), 23 deletions(-)

diff --git a/app/victoria-logs/main.go b/app/victoria-logs/main.go
index 62d952c07a..1a3aea2a96 100644
--- a/app/victoria-logs/main.go
+++ b/app/victoria-logs/main.go
@@ -37,7 +37,6 @@ func main() {
 	cgroup.SetGOGC(*gogc)
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	logger.Infof("starting VictoriaLogs at %q...", *httpListenAddr)
 	startTime := time.Now()
@@ -49,8 +48,10 @@ func main() {
 	go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
 	logger.Infof("started VictoriaLogs in %.3f seconds; see https://docs.victoriametrics.com/VictoriaLogs/", time.Since(startTime).Seconds())
 
+	pushmetrics.Init()
 	sig := procutil.WaitForSigterm()
 	logger.Infof("received signal %s", sig)
+	pushmetrics.Stop()
 
 	logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr)
 	startTime = time.Now()
@@ -59,8 +60,6 @@ func main() {
 	}
 	logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
 
-	pushmetrics.Stop()
-
 	vlinsert.Stop()
 	vlselect.Stop()
 	vlstorage.Stop()
diff --git a/app/vmagent/main.go b/app/vmagent/main.go
index 151e957c4e..96043fd634 100644
--- a/app/vmagent/main.go
+++ b/app/vmagent/main.go
@@ -95,7 +95,6 @@ func main() {
 	remotewrite.InitSecretFlags()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	if promscrape.IsDryRun() {
 		if err := promscrape.CheckConfig(); err != nil {
@@ -146,8 +145,10 @@ func main() {
 	}
 	logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())
 
+	pushmetrics.Init()
 	sig := procutil.WaitForSigterm()
 	logger.Infof("received signal %s", sig)
+	pushmetrics.Stop()
 
 	startTime = time.Now()
 	if len(*httpListenAddr) > 0 {
@@ -158,7 +159,6 @@ func main() {
 		logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
 	}
 
-	pushmetrics.Stop()
 	promscrape.Stop()
 
 	if len(*influxListenAddr) > 0 {
diff --git a/app/vmalert/main.go b/app/vmalert/main.go
index bbf22dc842..b47c58f6cb 100644
--- a/app/vmalert/main.go
+++ b/app/vmalert/main.go
@@ -104,7 +104,6 @@ func main() {
 	notifier.InitSecretFlags()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	if !*remoteReadIgnoreRestoreErrors {
 		logger.Warnf("flag `remoteRead.ignoreRestoreErrors` is deprecated and will be removed in next releases.")
@@ -190,12 +189,14 @@ func main() {
 	rh := &requestHandler{m: manager}
 	go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler)
 
+	pushmetrics.Init()
 	sig := procutil.WaitForSigterm()
 	logger.Infof("service received signal %s", sig)
+	pushmetrics.Stop()
+
 	if err := httpserver.Stop(*httpListenAddr); err != nil {
 		logger.Fatalf("cannot stop the webservice: %s", err)
 	}
-	pushmetrics.Stop()
 	cancel()
 	manager.close()
 }
diff --git a/app/vmauth/main.go b/app/vmauth/main.go
index 17b1baf27b..71c5a0d252 100644
--- a/app/vmauth/main.go
+++ b/app/vmauth/main.go
@@ -51,7 +51,6 @@ func main() {
 	envflag.Parse()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	logger.Infof("starting vmauth at %q...", *httpListenAddr)
 	startTime := time.Now()
@@ -59,15 +58,16 @@ func main() {
 	go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
 	logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds())
 
+	pushmetrics.Init()
 	sig := procutil.WaitForSigterm()
 	logger.Infof("received signal %s", sig)
+	pushmetrics.Stop()
 
 	startTime = time.Now()
 	logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr)
 	if err := httpserver.Stop(*httpListenAddr); err != nil {
 		logger.Fatalf("cannot stop the webservice: %s", err)
 	}
-	pushmetrics.Stop()
 	logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
 	stopAuthConfig()
 	logger.Infof("successfully stopped vmauth in %.3f seconds", time.Since(startTime).Seconds())
diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go
index 9b3f79996f..1e978c1dde 100644
--- a/app/vmbackup/main.go
+++ b/app/vmbackup/main.go
@@ -47,7 +47,6 @@ func main() {
 	envflag.Parse()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	// Storing snapshot delete function to be able to call it in case
 	// of error since logger.Fatal will exit the program without
@@ -98,18 +97,19 @@ func main() {
 
 	go httpserver.Serve(*httpListenAddr, false, nil)
 
+	pushmetrics.Init()
 	err := makeBackup()
 	deleteSnapshot()
 	if err != nil {
 		logger.Fatalf("cannot create backup: %s", err)
 	}
+	pushmetrics.Stop()
 
 	startTime := time.Now()
 	logger.Infof("gracefully shutting down http server for metrics at %q", *httpListenAddr)
 	if err := httpserver.Stop(*httpListenAddr); err != nil {
 		logger.Fatalf("cannot stop http server for metrics: %s", err)
 	}
-	pushmetrics.Stop()
 	logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
 }
 
diff --git a/app/vminsert/main.go b/app/vminsert/main.go
index cd0ae83e16..d1dc06fdad 100644
--- a/app/vminsert/main.go
+++ b/app/vminsert/main.go
@@ -93,7 +93,6 @@ func main() {
 	envflag.Parse()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	logger.Infof("initializing netstorage for storageNodes %s...", *storageNodes)
 	startTime := time.Now()
@@ -148,8 +147,10 @@ func main() {
 		httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
 	}()
 
+	pushmetrics.Init()
 	sig := procutil.WaitForSigterm()
 	logger.Infof("service received signal %s", sig)
+	pushmetrics.Stop()
 
 	logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
 	startTime = time.Now()
@@ -158,8 +159,6 @@ func main() {
 	}
 	logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
 
-	pushmetrics.Stop()
-
 	if len(*clusternativeListenAddr) > 0 {
 		clusternativeServer.MustStop()
 	}
diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go
index 799d56ba7e..c0389bc432 100644
--- a/app/vmrestore/main.go
+++ b/app/vmrestore/main.go
@@ -36,7 +36,6 @@ func main() {
 	envflag.Parse()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	go httpserver.Serve(*httpListenAddr, false, nil)
 
@@ -54,9 +53,11 @@ func main() {
 		Dst:                     dstFS,
 		SkipBackupCompleteCheck: *skipBackupCompleteCheck,
 	}
+	pushmetrics.Init()
 	if err := a.Run(); err != nil {
 		logger.Fatalf("cannot restore from backup: %s", err)
 	}
+	pushmetrics.Stop()
 	srcFS.MustStop()
 	dstFS.MustStop()
 
@@ -65,7 +66,6 @@ func main() {
 	if err := httpserver.Stop(*httpListenAddr); err != nil {
 		logger.Fatalf("cannot stop http server for metrics: %s", err)
 	}
-	pushmetrics.Stop()
 	logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
 }
 
diff --git a/app/vmselect/main.go b/app/vmselect/main.go
index 906a8d4fdb..70c8d14506 100644
--- a/app/vmselect/main.go
+++ b/app/vmselect/main.go
@@ -91,7 +91,6 @@ func main() {
 	envflag.Parse()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	logger.Infof("starting netstorage at storageNodes %s", *storageNodes)
 	startTime := time.Now()
@@ -135,8 +134,10 @@ func main() {
 		httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
 	}()
 
+	pushmetrics.Init()
 	sig := procutil.WaitForSigterm()
 	logger.Infof("service received signal %s", sig)
+	pushmetrics.Stop()
 
 	logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
 	startTime = time.Now()
@@ -145,8 +146,6 @@ func main() {
 	}
 	logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
 
-	pushmetrics.Stop()
-
 	if vmselectapiServer != nil {
 		logger.Infof("stopping vmselectapi server...")
 		vmselectapiServer.MustStop()
diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go
index f232e93bfc..e6710bb645 100644
--- a/app/vmstorage/main.go
+++ b/app/vmstorage/main.go
@@ -85,7 +85,6 @@ func main() {
 	envflag.Parse()
 	buildinfo.Init()
 	logger.Init()
-	pushmetrics.Init()
 
 	storage.SetDedupInterval(*minScrapeInterval)
 	storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
@@ -134,8 +133,10 @@ func main() {
 		httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
 	}()
 
+	pushmetrics.Init()
 	sig := procutil.WaitForSigterm()
 	logger.Infof("service received signal %s", sig)
+	pushmetrics.Stop()
 
 	logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
 	startTime = time.Now()
@@ -144,8 +145,6 @@ func main() {
 	}
 	logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
 
-	pushmetrics.Stop()
-
 	logger.Infof("gracefully shutting down the service")
 	startTime = time.Now()
 	stopStaleSnapshotsRemover()
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 8f1e1bdfe5..dba93ec7dd 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -18,7 +18,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
 * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly return results from [bottomk](https://docs.victoriametrics.com/MetricsQL.html#bottomk) and `bottomk_*()` functions when some of these results contain NaN values. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506). Thanks to @xiaozongyang for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5509).
 * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle queries, which wrap [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) with multiple arguments without explicitly specified lookbehind window in square brackets into [aggregate functions](https://docs.victoriametrics.com/MetricsQL.html#aggregate-functions). For example, `sum(quantile_over_time(0.5, process_resident_memory_bytes))` was resulting to `expecting at least 2 args to ...; got 1 args` error. Thanks to @atykhyy for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5414).
 * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly assume role with [AWS IRSA authorization](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html). Previously role chaining was not supported. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3822) for details.
-* BUGFIX: all: fix potential panic during components shutdown when `-pushmetrics.url` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549).
+* BUGFIX: all: fix potential panic during components shutdown when [metrics push](https://docs.victoriametrics.com/#push-metrics) is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549).
 * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): check for Error field in response from influx client during migration. Before, only network errors were checked. Thanks to @wozz for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5446).
 * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): retry on import errors in `vm-native` mode. Before, retries happened only on writes into a network connection between source and destination. But errors returned by server after all the data was transmitted were logged, but not retried.
 
diff --git a/lib/pushmetrics/pushmetrics.go b/lib/pushmetrics/pushmetrics.go
index 5528d80d36..45e1e0a1d4 100644
--- a/lib/pushmetrics/pushmetrics.go
+++ b/lib/pushmetrics/pushmetrics.go
@@ -1,8 +1,10 @@
 package pushmetrics
 
 import (
+	"context"
 	"flag"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/appmetrics"
@@ -26,6 +28,7 @@ func init() {
 
 var (
 	pushCtx, cancelPushCtx = context.WithCancel(context.Background())
+	wgDone                 sync.WaitGroup
 )
 
 // Init must be called after logger.Init
@@ -34,6 +37,7 @@ func Init() {
 	for _, pu := range *pushURL {
 		opts := &metrics.PushOptions{
 			ExtraLabels: extraLabels,
+			WaitGroup:   &wgDone,
 		}
 		if err := metrics.InitPushExtWithOptions(pushCtx, pu, *pushInterval, appmetrics.WritePrometheusMetrics, opts); err != nil {
 			logger.Fatalf("cannot initialize pushmetrics: %s", err)
@@ -48,4 +52,5 @@ func Init() {
 // Stop must be called after Init.
 func Stop() {
 	cancelPushCtx()
+	wgDone.Wait()
 }