mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/pushmetrics: wait until the background goroutines, which push metrics, are stopped at pushmetrics.Stop()
Previously the was a race condition when the background goroutine still could try collecting metrics
from already stopped resources after returning from pushmetrics.Stop().
Now the pushmetrics.Stop() waits until the background goroutine is stopped before returning.
This is a follow-up for https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549
and the commit fe2d9f6646
.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548
This commit is contained in:
parent
fce76b2fb0
commit
9e5e514faf
15 changed files with 41 additions and 27 deletions
|
@ -37,7 +37,6 @@ func main() {
|
||||||
cgroup.SetGOGC(*gogc)
|
cgroup.SetGOGC(*gogc)
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
logger.Infof("starting VictoriaLogs at %q...", *httpListenAddr)
|
logger.Infof("starting VictoriaLogs at %q...", *httpListenAddr)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
@ -49,8 +48,10 @@ func main() {
|
||||||
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
||||||
logger.Infof("started VictoriaLogs in %.3f seconds; see https://docs.victoriametrics.com/VictoriaLogs/", time.Since(startTime).Seconds())
|
logger.Infof("started VictoriaLogs in %.3f seconds; see https://docs.victoriametrics.com/VictoriaLogs/", time.Since(startTime).Seconds())
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
sig := procutil.WaitForSigterm()
|
sig := procutil.WaitForSigterm()
|
||||||
logger.Infof("received signal %s", sig)
|
logger.Infof("received signal %s", sig)
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr)
|
logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr)
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
|
@ -59,8 +60,6 @@ func main() {
|
||||||
}
|
}
|
||||||
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
|
||||||
pushmetrics.Stop()
|
|
||||||
|
|
||||||
vlinsert.Stop()
|
vlinsert.Stop()
|
||||||
vlselect.Stop()
|
vlselect.Stop()
|
||||||
vlstorage.Stop()
|
vlstorage.Stop()
|
||||||
|
|
|
@ -96,7 +96,6 @@ func main() {
|
||||||
remotewrite.InitSecretFlags()
|
remotewrite.InitSecretFlags()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
if promscrape.IsDryRun() {
|
if promscrape.IsDryRun() {
|
||||||
if err := promscrape.CheckConfig(); err != nil {
|
if err := promscrape.CheckConfig(); err != nil {
|
||||||
|
@ -147,8 +146,10 @@ func main() {
|
||||||
}
|
}
|
||||||
logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
sig := procutil.WaitForSigterm()
|
sig := procutil.WaitForSigterm()
|
||||||
logger.Infof("received signal %s", sig)
|
logger.Infof("received signal %s", sig)
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
if len(*httpListenAddr) > 0 {
|
if len(*httpListenAddr) > 0 {
|
||||||
|
@ -159,7 +160,6 @@ func main() {
|
||||||
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
pushmetrics.Stop()
|
|
||||||
promscrape.Stop()
|
promscrape.Stop()
|
||||||
|
|
||||||
if len(*influxListenAddr) > 0 {
|
if len(*influxListenAddr) > 0 {
|
||||||
|
|
|
@ -96,7 +96,6 @@ func main() {
|
||||||
notifier.InitSecretFlags()
|
notifier.InitSecretFlags()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
if !*remoteReadIgnoreRestoreErrors {
|
if !*remoteReadIgnoreRestoreErrors {
|
||||||
logger.Warnf("flag `remoteRead.ignoreRestoreErrors` is deprecated and will be removed in next releases.")
|
logger.Warnf("flag `remoteRead.ignoreRestoreErrors` is deprecated and will be removed in next releases.")
|
||||||
|
@ -182,12 +181,14 @@ func main() {
|
||||||
rh := &requestHandler{m: manager}
|
rh := &requestHandler{m: manager}
|
||||||
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler)
|
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler)
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
sig := procutil.WaitForSigterm()
|
sig := procutil.WaitForSigterm()
|
||||||
logger.Infof("service received signal %s", sig)
|
logger.Infof("service received signal %s", sig)
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||||
logger.Fatalf("cannot stop the webservice: %s", err)
|
logger.Fatalf("cannot stop the webservice: %s", err)
|
||||||
}
|
}
|
||||||
pushmetrics.Stop()
|
|
||||||
cancel()
|
cancel()
|
||||||
manager.close()
|
manager.close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,6 @@ func main() {
|
||||||
envflag.Parse()
|
envflag.Parse()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
logger.Infof("starting vmauth at %q...", *httpListenAddr)
|
logger.Infof("starting vmauth at %q...", *httpListenAddr)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
@ -72,15 +71,16 @@ func main() {
|
||||||
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
||||||
logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
sig := procutil.WaitForSigterm()
|
sig := procutil.WaitForSigterm()
|
||||||
logger.Infof("received signal %s", sig)
|
logger.Infof("received signal %s", sig)
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr)
|
logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr)
|
||||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||||
logger.Fatalf("cannot stop the webservice: %s", err)
|
logger.Fatalf("cannot stop the webservice: %s", err)
|
||||||
}
|
}
|
||||||
pushmetrics.Stop()
|
|
||||||
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
stopAuthConfig()
|
stopAuthConfig()
|
||||||
logger.Infof("successfully stopped vmauth in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully stopped vmauth in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
|
|
@ -47,7 +47,6 @@ func main() {
|
||||||
envflag.Parse()
|
envflag.Parse()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
// Storing snapshot delete function to be able to call it in case
|
// Storing snapshot delete function to be able to call it in case
|
||||||
// of error since logger.Fatal will exit the program without
|
// of error since logger.Fatal will exit the program without
|
||||||
|
@ -96,18 +95,19 @@ func main() {
|
||||||
|
|
||||||
go httpserver.Serve(*httpListenAddr, false, nil)
|
go httpserver.Serve(*httpListenAddr, false, nil)
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
err := makeBackup()
|
err := makeBackup()
|
||||||
deleteSnapshot()
|
deleteSnapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot create backup: %s", err)
|
logger.Fatalf("cannot create backup: %s", err)
|
||||||
}
|
}
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
logger.Infof("gracefully shutting down http server for metrics at %q", *httpListenAddr)
|
logger.Infof("gracefully shutting down http server for metrics at %q", *httpListenAddr)
|
||||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||||
logger.Fatalf("cannot stop http server for metrics: %s", err)
|
logger.Fatalf("cannot stop http server for metrics: %s", err)
|
||||||
}
|
}
|
||||||
pushmetrics.Stop()
|
|
||||||
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,6 @@ func main() {
|
||||||
envflag.Parse()
|
envflag.Parse()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
logger.Infof("initializing netstorage for storageNodes %s...", *storageNodes)
|
logger.Infof("initializing netstorage for storageNodes %s...", *storageNodes)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
@ -151,8 +150,10 @@ func main() {
|
||||||
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
sig := procutil.WaitForSigterm()
|
sig := procutil.WaitForSigterm()
|
||||||
logger.Infof("service received signal %s", sig)
|
logger.Infof("service received signal %s", sig)
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
|
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
|
@ -161,8 +162,6 @@ func main() {
|
||||||
}
|
}
|
||||||
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
|
||||||
pushmetrics.Stop()
|
|
||||||
|
|
||||||
if len(*clusternativeListenAddr) > 0 {
|
if len(*clusternativeListenAddr) > 0 {
|
||||||
clusternativeServer.MustStop()
|
clusternativeServer.MustStop()
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,6 @@ func main() {
|
||||||
envflag.Parse()
|
envflag.Parse()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
go httpserver.Serve(*httpListenAddr, false, nil)
|
go httpserver.Serve(*httpListenAddr, false, nil)
|
||||||
|
|
||||||
|
@ -54,9 +53,11 @@ func main() {
|
||||||
Dst: dstFS,
|
Dst: dstFS,
|
||||||
SkipBackupCompleteCheck: *skipBackupCompleteCheck,
|
SkipBackupCompleteCheck: *skipBackupCompleteCheck,
|
||||||
}
|
}
|
||||||
|
pushmetrics.Init()
|
||||||
if err := a.Run(); err != nil {
|
if err := a.Run(); err != nil {
|
||||||
logger.Fatalf("cannot restore from backup: %s", err)
|
logger.Fatalf("cannot restore from backup: %s", err)
|
||||||
}
|
}
|
||||||
|
pushmetrics.Stop()
|
||||||
srcFS.MustStop()
|
srcFS.MustStop()
|
||||||
dstFS.MustStop()
|
dstFS.MustStop()
|
||||||
|
|
||||||
|
@ -65,7 +66,6 @@ func main() {
|
||||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||||
logger.Fatalf("cannot stop http server for metrics: %s", err)
|
logger.Fatalf("cannot stop http server for metrics: %s", err)
|
||||||
}
|
}
|
||||||
pushmetrics.Stop()
|
|
||||||
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,6 @@ func main() {
|
||||||
envflag.Parse()
|
envflag.Parse()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
logger.Infof("starting netstorage at storageNodes %s", *storageNodes)
|
logger.Infof("starting netstorage at storageNodes %s", *storageNodes)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
@ -135,8 +134,10 @@ func main() {
|
||||||
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
sig := procutil.WaitForSigterm()
|
sig := procutil.WaitForSigterm()
|
||||||
logger.Infof("service received signal %s", sig)
|
logger.Infof("service received signal %s", sig)
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
|
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
|
@ -145,8 +146,6 @@ func main() {
|
||||||
}
|
}
|
||||||
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
|
||||||
pushmetrics.Stop()
|
|
||||||
|
|
||||||
if vmselectapiServer != nil {
|
if vmselectapiServer != nil {
|
||||||
logger.Infof("stopping vmselectapi server...")
|
logger.Infof("stopping vmselectapi server...")
|
||||||
vmselectapiServer.MustStop()
|
vmselectapiServer.MustStop()
|
||||||
|
|
|
@ -85,7 +85,6 @@ func main() {
|
||||||
envflag.Parse()
|
envflag.Parse()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
|
||||||
|
|
||||||
storage.SetDedupInterval(*minScrapeInterval)
|
storage.SetDedupInterval(*minScrapeInterval)
|
||||||
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
|
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
|
||||||
|
@ -134,8 +133,10 @@ func main() {
|
||||||
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
pushmetrics.Init()
|
||||||
sig := procutil.WaitForSigterm()
|
sig := procutil.WaitForSigterm()
|
||||||
logger.Infof("service received signal %s", sig)
|
logger.Infof("service received signal %s", sig)
|
||||||
|
pushmetrics.Stop()
|
||||||
|
|
||||||
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
|
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
|
@ -144,8 +145,6 @@ func main() {
|
||||||
}
|
}
|
||||||
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
|
||||||
pushmetrics.Stop()
|
|
||||||
|
|
||||||
logger.Infof("gracefully shutting down the service")
|
logger.Infof("gracefully shutting down the service")
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
stopStaleSnapshotsRemover()
|
stopStaleSnapshotsRemover()
|
||||||
|
|
|
@ -54,7 +54,7 @@ The sandbox cluster installation is running under the constant load generated by
|
||||||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): retry on import errors in `vm-native` mode. Before, retries happened only on writes into a network connection between source and destination. But errors returned by server after all the data was transmitted were logged, but not retried.
|
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): retry on import errors in `vm-native` mode. Before, retries happened only on writes into a network connection between source and destination. But errors returned by server after all the data was transmitted were logged, but not retried.
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly assume role with [AWS IRSA authorization](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html). Previously role chaining was not supported. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3822) for details.
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly assume role with [AWS IRSA authorization](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html). Previously role chaining was not supported. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3822) for details.
|
||||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix a link for the statistic inaccuracy explanation in the cardinality explorer tool. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5460).
|
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix a link for the statistic inaccuracy explanation in the cardinality explorer tool. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5460).
|
||||||
* BUGFIX: all: fix potential panic during components shutdown when `-pushmetrics.url` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549).
|
* BUGFIX: all: fix potential panic during components shutdown when [metrics push](https://docs.victoriametrics.com/#push-metrics) is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548). Thanks to @zhdd99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5549).
|
||||||
|
|
||||||
## [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0)
|
## [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0)
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -12,7 +12,7 @@ require (
|
||||||
// Do not use the original github.com/valyala/fasthttp because of issues
|
// Do not use the original github.com/valyala/fasthttp because of issues
|
||||||
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
||||||
github.com/VictoriaMetrics/fasthttp v1.2.0
|
github.com/VictoriaMetrics/fasthttp v1.2.0
|
||||||
github.com/VictoriaMetrics/metrics v1.29.1
|
github.com/VictoriaMetrics/metrics v1.30.0
|
||||||
github.com/VictoriaMetrics/metricsql v0.70.0
|
github.com/VictoriaMetrics/metricsql v0.70.0
|
||||||
github.com/aws/aws-sdk-go-v2 v1.24.0
|
github.com/aws/aws-sdk-go-v2 v1.24.0
|
||||||
github.com/aws/aws-sdk-go-v2/config v1.26.1
|
github.com/aws/aws-sdk-go-v2/config v1.26.1
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -65,8 +65,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkT
|
||||||
github.com/VictoriaMetrics/fasthttp v1.2.0 h1:nd9Wng4DlNtaI27WlYh5mGXCJOmee/2c2blTJwfyU9I=
|
github.com/VictoriaMetrics/fasthttp v1.2.0 h1:nd9Wng4DlNtaI27WlYh5mGXCJOmee/2c2blTJwfyU9I=
|
||||||
github.com/VictoriaMetrics/fasthttp v1.2.0/go.mod h1:zv5YSmasAoSyv8sBVexfArzFDIGGTN4TfCKAtAw7IfE=
|
github.com/VictoriaMetrics/fasthttp v1.2.0/go.mod h1:zv5YSmasAoSyv8sBVexfArzFDIGGTN4TfCKAtAw7IfE=
|
||||||
github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys=
|
github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys=
|
||||||
github.com/VictoriaMetrics/metrics v1.29.1 h1:yTORfGeO1T0C6P/tEeT4Mf7rBU5TUu3kjmHvmlaoeO8=
|
github.com/VictoriaMetrics/metrics v1.30.0 h1:m8o1sEDTpvFGwvliAmcaxxCDrIYS16rJPmOhwQNgavo=
|
||||||
github.com/VictoriaMetrics/metrics v1.29.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
github.com/VictoriaMetrics/metrics v1.30.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
||||||
github.com/VictoriaMetrics/metricsql v0.70.0 h1:G0k/m1yAF6pmk0dM3VT9/XI5PZ8dL7EbcLhREf4bgeI=
|
github.com/VictoriaMetrics/metricsql v0.70.0 h1:G0k/m1yAF6pmk0dM3VT9/XI5PZ8dL7EbcLhREf4bgeI=
|
||||||
github.com/VictoriaMetrics/metricsql v0.70.0/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I=
|
github.com/VictoriaMetrics/metricsql v0.70.0/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I=
|
||||||
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/appmetrics"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/appmetrics"
|
||||||
|
@ -30,6 +31,7 @@ func init() {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
pushCtx, cancelPushCtx = context.WithCancel(context.Background())
|
pushCtx, cancelPushCtx = context.WithCancel(context.Background())
|
||||||
|
wgDone sync.WaitGroup
|
||||||
)
|
)
|
||||||
|
|
||||||
// Init must be called after logger.Init
|
// Init must be called after logger.Init
|
||||||
|
@ -40,6 +42,7 @@ func Init() {
|
||||||
ExtraLabels: extraLabels,
|
ExtraLabels: extraLabels,
|
||||||
Headers: *pushHeader,
|
Headers: *pushHeader,
|
||||||
DisableCompression: *disableCompression,
|
DisableCompression: *disableCompression,
|
||||||
|
WaitGroup: &wgDone,
|
||||||
}
|
}
|
||||||
if err := metrics.InitPushExtWithOptions(pushCtx, pu, *pushInterval, appmetrics.WritePrometheusMetrics, opts); err != nil {
|
if err := metrics.InitPushExtWithOptions(pushCtx, pu, *pushInterval, appmetrics.WritePrometheusMetrics, opts); err != nil {
|
||||||
logger.Fatalf("cannot initialize pushmetrics: %s", err)
|
logger.Fatalf("cannot initialize pushmetrics: %s", err)
|
||||||
|
@ -54,4 +57,5 @@ func Init() {
|
||||||
// Stop must be called after Init.
|
// Stop must be called after Init.
|
||||||
func Stop() {
|
func Stop() {
|
||||||
cancelPushCtx()
|
cancelPushCtx()
|
||||||
|
wgDone.Wait()
|
||||||
}
|
}
|
||||||
|
|
13
vendor/github.com/VictoriaMetrics/metrics/push.go
generated
vendored
13
vendor/github.com/VictoriaMetrics/metrics/push.go
generated
vendored
|
@ -31,6 +31,9 @@ type PushOptions struct {
|
||||||
//
|
//
|
||||||
// By default the compression is enabled.
|
// By default the compression is enabled.
|
||||||
DisableCompression bool
|
DisableCompression bool
|
||||||
|
|
||||||
|
// Optional WaitGroup for waiting until all the push workers created with this WaitGroup are stopped.
|
||||||
|
WaitGroup *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval.
|
// InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval.
|
||||||
|
@ -207,6 +210,13 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D
|
||||||
}
|
}
|
||||||
pushMetricsSet.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pc.pushURLRedacted)).Set(interval.Seconds())
|
pushMetricsSet.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pc.pushURLRedacted)).Set(interval.Seconds())
|
||||||
|
|
||||||
|
var wg *sync.WaitGroup
|
||||||
|
if opts != nil {
|
||||||
|
wg = opts.WaitGroup
|
||||||
|
if wg != nil {
|
||||||
|
wg.Add(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
@ -221,6 +231,9 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D
|
||||||
log.Printf("ERROR: metrics.push: %s", err)
|
log.Printf("ERROR: metrics.push: %s", err)
|
||||||
}
|
}
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
|
if wg != nil {
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -100,7 +100,7 @@ github.com/VictoriaMetrics/fastcache
|
||||||
github.com/VictoriaMetrics/fasthttp
|
github.com/VictoriaMetrics/fasthttp
|
||||||
github.com/VictoriaMetrics/fasthttp/fasthttputil
|
github.com/VictoriaMetrics/fasthttp/fasthttputil
|
||||||
github.com/VictoriaMetrics/fasthttp/stackless
|
github.com/VictoriaMetrics/fasthttp/stackless
|
||||||
# github.com/VictoriaMetrics/metrics v1.29.1
|
# github.com/VictoriaMetrics/metrics v1.30.0
|
||||||
## explicit; go 1.17
|
## explicit; go 1.17
|
||||||
github.com/VictoriaMetrics/metrics
|
github.com/VictoriaMetrics/metrics
|
||||||
# github.com/VictoriaMetrics/metricsql v0.70.0
|
# github.com/VictoriaMetrics/metricsql v0.70.0
|
||||||
|
|
Loading…
Reference in a new issue