From b351f73984c10dafadbdbb37b9d699254e6d8953 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 27 Jul 2023 14:47:53 -0700 Subject: [PATCH] lib/promscrape/discovery: close unused HTTP connections to service discovery servers This should prevent from connection leaks See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4724 --- app/vmagent/remotewrite/remotewrite.go | 4 ++-- docs/CHANGELOG.md | 1 + lib/promscrape/discovery/azure/azure.go | 6 +++++- lib/promscrape/discovery/azure/machine_test.go | 1 + lib/promscrape/discovery/consul/api.go | 1 + lib/promscrape/discovery/digitalocean/digitalocean.go | 6 +++++- lib/promscrape/discovery/docker/docker.go | 6 +++++- lib/promscrape/discovery/dockerswarm/dockerswarm.go | 6 +++++- lib/promscrape/discovery/eureka/eureka.go | 6 +++++- lib/promscrape/discovery/gce/api.go | 3 +++ lib/promscrape/discovery/gce/gce.go | 6 +++++- lib/promscrape/discovery/http/http.go | 6 +++++- lib/promscrape/discovery/openstack/api.go | 4 ++++ lib/promscrape/discovery/openstack/openstack.go | 6 +++++- lib/promscrape/discoveryutils/client.go | 8 ++++++++ 15 files changed, 60 insertions(+), 10 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 7e7dc138f4..7ff6f942cf 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -526,8 +526,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI func (rwctx *remoteWriteCtx) MustStop() { // sas must be stopped before rwctx is closed // because sas can write pending series to rwctx.pss if there are any - sas := rwctx.sas.Swap(nil) - sas.MustStop() + rwctx.sas.MustStop() + rwctx.sas = nil for _, ps := range rwctx.pss { ps.MustStop() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ae4db82230..83eb7459ea 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * SECURITY: upgrade base docker image (Alpine) from 3.18.2 to 3.18.3. See [alpine 3.18.3 release notes](https://alpinelinux.org/posts/Alpine-3.15.10-3.16.7-3.17.5-3.18.3-released.html). * BUGFIX: vminsert: fixed decoding of label values with slash when accepting data via [pushgateway protocol](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format). This fixes Prometheus golang client compatibility. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4692). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): close HTTP connections to [service discovery](https://docs.victoriametrics.com/sd_configs.html) servers when they are no longer needed. This should prevent from possible connection exhasution in some cases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4724). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply `if` filters during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements). Previously the `if` filter could improperly work. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4806) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4816). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix possible panic at shutdown when [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) is enabled. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4407) for details. * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): Properly handle LOCAL command for proxy protocol. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335#issuecomment-1569864108). diff --git a/lib/promscrape/discovery/azure/azure.go b/lib/promscrape/discovery/azure/azure.go index b1fe35fe96..f9ec794e2d 100644 --- a/lib/promscrape/discovery/azure/azure.go +++ b/lib/promscrape/discovery/azure/azure.go @@ -58,7 +58,11 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.c.Stop() + } } func appendMachineLabels(vms []virtualMachine, port int, sdc *SDConfig) []*promutils.Labels { diff --git a/lib/promscrape/discovery/azure/machine_test.go b/lib/promscrape/discovery/azure/machine_test.go index f2ba253cad..64a3e336d6 100644 --- a/lib/promscrape/discovery/azure/machine_test.go +++ b/lib/promscrape/discovery/azure/machine_test.go @@ -70,6 +70,7 @@ func TestGetVirtualMachinesSuccess(t *testing.T) { if err != nil { t.Fatalf("unexpected error at client create: %s", err) } + defer c.Stop() ac := &apiConfig{ c: c, subscriptionID: "some-id", diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index 5f9d7e2246..837f0f6e57 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -90,6 +90,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } dc, err := getDatacenter(client, sdc.Datacenter) if err != nil { + client.Stop() return nil, fmt.Errorf("cannot obtain consul datacenter: %w", err) } diff --git a/lib/promscrape/discovery/digitalocean/digitalocean.go b/lib/promscrape/discovery/digitalocean/digitalocean.go index cf25f2da3c..aa4534c587 100644 --- a/lib/promscrape/discovery/digitalocean/digitalocean.go +++ b/lib/promscrape/discovery/digitalocean/digitalocean.go @@ -155,5 +155,9 @@ func addDropletLabels(droplets []droplet, defaultPort int) []*promutils.Labels { // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.client.Stop() + } } diff --git a/lib/promscrape/discovery/docker/docker.go b/lib/promscrape/discovery/docker/docker.go index 6be9086541..ade73572f9 100644 --- a/lib/promscrape/discovery/docker/docker.go +++ b/lib/promscrape/discovery/docker/docker.go @@ -47,5 +47,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.client.Stop() + } } diff --git a/lib/promscrape/discovery/dockerswarm/dockerswarm.go b/lib/promscrape/discovery/dockerswarm/dockerswarm.go index bfe7a06971..f527a51fc1 100644 --- a/lib/promscrape/discovery/dockerswarm/dockerswarm.go +++ b/lib/promscrape/discovery/dockerswarm/dockerswarm.go @@ -56,5 +56,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.client.Stop() + } } diff --git a/lib/promscrape/discovery/eureka/eureka.go b/lib/promscrape/discovery/eureka/eureka.go index 4b73cf81c4..e677e1ef23 100644 --- a/lib/promscrape/discovery/eureka/eureka.go +++ b/lib/promscrape/discovery/eureka/eureka.go @@ -101,7 +101,11 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.client.Stop() + } } func addInstanceLabels(apps *applications) []*promutils.Labels { diff --git a/lib/promscrape/discovery/gce/api.go b/lib/promscrape/discovery/gce/api.go index fc6c514387..5fa989b60e 100644 --- a/lib/promscrape/discovery/gce/api.go +++ b/lib/promscrape/discovery/gce/api.go @@ -42,6 +42,7 @@ func newAPIConfig(sdc *SDConfig) (*apiConfig, error) { if len(project) == 0 { proj, err := getCurrentProject() if err != nil { + client.CloseIdleConnections() return nil, fmt.Errorf("cannot determine the current project; make sure `vmagent` runs inside GCE; error: %w", err) } project = proj @@ -52,6 +53,7 @@ func newAPIConfig(sdc *SDConfig) (*apiConfig, error) { // Autodetect the current zone. zone, err := getCurrentZone() if err != nil { + client.CloseIdleConnections() return nil, fmt.Errorf("cannot determine the current zone; make sure `vmagent` runs inside GCE; error: %w", err) } zones = append(zones, zone) @@ -62,6 +64,7 @@ func newAPIConfig(sdc *SDConfig) (*apiConfig, error) { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3202 zs, err := getZonesForProject(client, project) if err != nil { + client.CloseIdleConnections() return nil, fmt.Errorf("cannot obtain zones for project %q: %w", project, err) } zones = zs diff --git a/lib/promscrape/discovery/gce/gce.go b/lib/promscrape/discovery/gce/gce.go index 70ece8a04f..a52e359626 100644 --- a/lib/promscrape/discovery/gce/gce.go +++ b/lib/promscrape/discovery/gce/gce.go @@ -73,5 +73,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.client.CloseIdleConnections() + } } diff --git a/lib/promscrape/discovery/http/http.go b/lib/promscrape/discovery/http/http.go index a6eb180c7a..b6a370c2de 100644 --- a/lib/promscrape/discovery/http/http.go +++ b/lib/promscrape/discovery/http/http.go @@ -56,5 +56,9 @@ func addHTTPTargetLabels(src []httpGroupTarget, sourceURL string) []*promutils.L // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.client.Stop() + } } diff --git a/lib/promscrape/discovery/openstack/api.go b/lib/promscrape/discovery/openstack/api.go index 4dbc741ad5..6b95805a2a 100644 --- a/lib/promscrape/discovery/openstack/api.go +++ b/lib/promscrape/discovery/openstack/api.go @@ -87,6 +87,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } ac, err := opts.NewConfig() if err != nil { + cfg.client.CloseIdleConnections() return nil, err } cfg.client.Transport = &http.Transport{ @@ -107,6 +108,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { sdcAuth = readCredentialsFromEnv() } if strings.HasSuffix(sdcAuth.IdentityEndpoint, "v2.0") { + cfg.client.CloseIdleConnections() return nil, errors.New("identity_endpoint v2.0 is not supported") } // trim .0 from v3.0 for prometheus cfg compatibility @@ -114,11 +116,13 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint) if err != nil { + cfg.client.CloseIdleConnections() return nil, fmt.Errorf("cannot parse identity_endpoint: %s as url, err: %w", sdcAuth.IdentityEndpoint, err) } cfg.endpoint = parsedURL tokenReq, err := buildAuthRequestBody(&sdcAuth) if err != nil { + cfg.client.CloseIdleConnections() return nil, err } cfg.authTokenReq = tokenReq diff --git a/lib/promscrape/discovery/openstack/openstack.go b/lib/promscrape/discovery/openstack/openstack.go index df26b92b1c..e380f66f8d 100644 --- a/lib/promscrape/discovery/openstack/openstack.go +++ b/lib/promscrape/discovery/openstack/openstack.go @@ -57,5 +57,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { // MustStop stops further usage for sdc. func (sdc *SDConfig) MustStop() { - configMap.Delete(sdc) + v := configMap.Delete(sdc) + if v != nil { + cfg := v.(*apiConfig) + cfg.client.CloseIdleConnections() + } } diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 9aea85a28c..7d0db2cf3d 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -74,6 +74,12 @@ type HTTPClient struct { ReadTimeout time.Duration } +func (hc *HTTPClient) stop() { + // Close idle connections to server in order to free up resources. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4724 + hc.client.CloseIdleConnections() +} + var defaultDialer = &net.Dialer{} // NewClient returns new Client for the given args. @@ -254,6 +260,8 @@ func (c *Client) APIServer() string { // Stop cancels all in-flight requests func (c *Client) Stop() { c.clientCancel() + c.client.stop() + c.blockingClient.stop() } func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {