mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape/discovery: close unused HTTP connections to service discovery servers
This should prevent from connection leaks See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4724
This commit is contained in:
parent
4c211028af
commit
b351f73984
15 changed files with 60 additions and 10 deletions
|
@ -526,8 +526,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
||||||
func (rwctx *remoteWriteCtx) MustStop() {
|
func (rwctx *remoteWriteCtx) MustStop() {
|
||||||
// sas must be stopped before rwctx is closed
|
// sas must be stopped before rwctx is closed
|
||||||
// because sas can write pending series to rwctx.pss if there are any
|
// because sas can write pending series to rwctx.pss if there are any
|
||||||
sas := rwctx.sas.Swap(nil)
|
rwctx.sas.MustStop()
|
||||||
sas.MustStop()
|
rwctx.sas = nil
|
||||||
|
|
||||||
for _, ps := range rwctx.pss {
|
for _, ps := range rwctx.pss {
|
||||||
ps.MustStop()
|
ps.MustStop()
|
||||||
|
|
|
@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
* SECURITY: upgrade base docker image (Alpine) from 3.18.2 to 3.18.3. See [alpine 3.18.3 release notes](https://alpinelinux.org/posts/Alpine-3.15.10-3.16.7-3.17.5-3.18.3-released.html).
|
* SECURITY: upgrade base docker image (Alpine) from 3.18.2 to 3.18.3. See [alpine 3.18.3 release notes](https://alpinelinux.org/posts/Alpine-3.15.10-3.16.7-3.17.5-3.18.3-released.html).
|
||||||
|
|
||||||
* BUGFIX: vminsert: fixed decoding of label values with slash when accepting data via [pushgateway protocol](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format). This fixes Prometheus golang client compatibility. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4692).
|
* BUGFIX: vminsert: fixed decoding of label values with slash when accepting data via [pushgateway protocol](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format). This fixes Prometheus golang client compatibility. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4692).
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): close HTTP connections to [service discovery](https://docs.victoriametrics.com/sd_configs.html) servers when they are no longer needed. This should prevent from possible connection exhasution in some cases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4724).
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply `if` filters during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements). Previously the `if` filter could improperly work. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4806) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4816).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply `if` filters during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements). Previously the `if` filter could improperly work. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4806) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4816).
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix possible panic at shutdown when [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) is enabled. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4407) for details.
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix possible panic at shutdown when [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) is enabled. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4407) for details.
|
||||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): Properly handle LOCAL command for proxy protocol. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335#issuecomment-1569864108).
|
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): Properly handle LOCAL command for proxy protocol. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335#issuecomment-1569864108).
|
||||||
|
|
|
@ -58,7 +58,11 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.c.Stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendMachineLabels(vms []virtualMachine, port int, sdc *SDConfig) []*promutils.Labels {
|
func appendMachineLabels(vms []virtualMachine, port int, sdc *SDConfig) []*promutils.Labels {
|
||||||
|
|
|
@ -70,6 +70,7 @@ func TestGetVirtualMachinesSuccess(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error at client create: %s", err)
|
t.Fatalf("unexpected error at client create: %s", err)
|
||||||
}
|
}
|
||||||
|
defer c.Stop()
|
||||||
ac := &apiConfig{
|
ac := &apiConfig{
|
||||||
c: c,
|
c: c,
|
||||||
subscriptionID: "some-id",
|
subscriptionID: "some-id",
|
||||||
|
|
|
@ -90,6 +90,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
}
|
}
|
||||||
dc, err := getDatacenter(client, sdc.Datacenter)
|
dc, err := getDatacenter(client, sdc.Datacenter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
client.Stop()
|
||||||
return nil, fmt.Errorf("cannot obtain consul datacenter: %w", err)
|
return nil, fmt.Errorf("cannot obtain consul datacenter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -155,5 +155,9 @@ func addDropletLabels(droplets []droplet, defaultPort int) []*promutils.Labels {
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.client.Stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,5 +47,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.client.Stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,5 +56,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.client.Stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,11 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.client.Stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func addInstanceLabels(apps *applications) []*promutils.Labels {
|
func addInstanceLabels(apps *applications) []*promutils.Labels {
|
||||||
|
|
|
@ -42,6 +42,7 @@ func newAPIConfig(sdc *SDConfig) (*apiConfig, error) {
|
||||||
if len(project) == 0 {
|
if len(project) == 0 {
|
||||||
proj, err := getCurrentProject()
|
proj, err := getCurrentProject()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
client.CloseIdleConnections()
|
||||||
return nil, fmt.Errorf("cannot determine the current project; make sure `vmagent` runs inside GCE; error: %w", err)
|
return nil, fmt.Errorf("cannot determine the current project; make sure `vmagent` runs inside GCE; error: %w", err)
|
||||||
}
|
}
|
||||||
project = proj
|
project = proj
|
||||||
|
@ -52,6 +53,7 @@ func newAPIConfig(sdc *SDConfig) (*apiConfig, error) {
|
||||||
// Autodetect the current zone.
|
// Autodetect the current zone.
|
||||||
zone, err := getCurrentZone()
|
zone, err := getCurrentZone()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
client.CloseIdleConnections()
|
||||||
return nil, fmt.Errorf("cannot determine the current zone; make sure `vmagent` runs inside GCE; error: %w", err)
|
return nil, fmt.Errorf("cannot determine the current zone; make sure `vmagent` runs inside GCE; error: %w", err)
|
||||||
}
|
}
|
||||||
zones = append(zones, zone)
|
zones = append(zones, zone)
|
||||||
|
@ -62,6 +64,7 @@ func newAPIConfig(sdc *SDConfig) (*apiConfig, error) {
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3202
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3202
|
||||||
zs, err := getZonesForProject(client, project)
|
zs, err := getZonesForProject(client, project)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
client.CloseIdleConnections()
|
||||||
return nil, fmt.Errorf("cannot obtain zones for project %q: %w", project, err)
|
return nil, fmt.Errorf("cannot obtain zones for project %q: %w", project, err)
|
||||||
}
|
}
|
||||||
zones = zs
|
zones = zs
|
||||||
|
|
|
@ -73,5 +73,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.client.CloseIdleConnections()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,5 +56,9 @@ func addHTTPTargetLabels(src []httpGroupTarget, sourceURL string) []*promutils.L
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.client.Stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,6 +87,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
}
|
}
|
||||||
ac, err := opts.NewConfig()
|
ac, err := opts.NewConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cfg.client.CloseIdleConnections()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cfg.client.Transport = &http.Transport{
|
cfg.client.Transport = &http.Transport{
|
||||||
|
@ -107,6 +108,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
sdcAuth = readCredentialsFromEnv()
|
sdcAuth = readCredentialsFromEnv()
|
||||||
}
|
}
|
||||||
if strings.HasSuffix(sdcAuth.IdentityEndpoint, "v2.0") {
|
if strings.HasSuffix(sdcAuth.IdentityEndpoint, "v2.0") {
|
||||||
|
cfg.client.CloseIdleConnections()
|
||||||
return nil, errors.New("identity_endpoint v2.0 is not supported")
|
return nil, errors.New("identity_endpoint v2.0 is not supported")
|
||||||
}
|
}
|
||||||
// trim .0 from v3.0 for prometheus cfg compatibility
|
// trim .0 from v3.0 for prometheus cfg compatibility
|
||||||
|
@ -114,11 +116,13 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
|
|
||||||
parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint)
|
parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cfg.client.CloseIdleConnections()
|
||||||
return nil, fmt.Errorf("cannot parse identity_endpoint: %s as url, err: %w", sdcAuth.IdentityEndpoint, err)
|
return nil, fmt.Errorf("cannot parse identity_endpoint: %s as url, err: %w", sdcAuth.IdentityEndpoint, err)
|
||||||
}
|
}
|
||||||
cfg.endpoint = parsedURL
|
cfg.endpoint = parsedURL
|
||||||
tokenReq, err := buildAuthRequestBody(&sdcAuth)
|
tokenReq, err := buildAuthRequestBody(&sdcAuth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cfg.client.CloseIdleConnections()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cfg.authTokenReq = tokenReq
|
cfg.authTokenReq = tokenReq
|
||||||
|
|
|
@ -57,5 +57,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||||
|
|
||||||
// MustStop stops further usage for sdc.
|
// MustStop stops further usage for sdc.
|
||||||
func (sdc *SDConfig) MustStop() {
|
func (sdc *SDConfig) MustStop() {
|
||||||
configMap.Delete(sdc)
|
v := configMap.Delete(sdc)
|
||||||
|
if v != nil {
|
||||||
|
cfg := v.(*apiConfig)
|
||||||
|
cfg.client.CloseIdleConnections()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,12 @@ type HTTPClient struct {
|
||||||
ReadTimeout time.Duration
|
ReadTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hc *HTTPClient) stop() {
|
||||||
|
// Close idle connections to server in order to free up resources.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4724
|
||||||
|
hc.client.CloseIdleConnections()
|
||||||
|
}
|
||||||
|
|
||||||
var defaultDialer = &net.Dialer{}
|
var defaultDialer = &net.Dialer{}
|
||||||
|
|
||||||
// NewClient returns new Client for the given args.
|
// NewClient returns new Client for the given args.
|
||||||
|
@ -254,6 +260,8 @@ func (c *Client) APIServer() string {
|
||||||
// Stop cancels all in-flight requests
|
// Stop cancels all in-flight requests
|
||||||
func (c *Client) Stop() {
|
func (c *Client) Stop() {
|
||||||
c.clientCancel()
|
c.clientCancel()
|
||||||
|
c.client.stop()
|
||||||
|
c.blockingClient.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
|
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
|
||||||
|
|
Loading…
Reference in a new issue