From ad6290953cec458b891c9effddf2210b2ca90dd2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 20 Jul 2020 19:27:25 +0300 Subject: [PATCH] app/vmagent: add `-remoteWrite.proxyURL` command-line option This option allows writing data to `-remoteWrite.url` via http, https or socks5 proxy. This is similar to `proxy_url` option in `remote_write` section of Prometheus. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write --- app/vmagent/remotewrite/client.go | 169 ++++++++++++---------------- app/vmagent/remotewrite/statconn.go | 7 +- 2 files changed, 77 insertions(+), 99 deletions(-) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 76a58656e..7dcadcdfe 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -1,10 +1,14 @@ package remotewrite import ( + "bytes" "crypto/tls" "encoding/base64" "flag" "fmt" + "io/ioutil" + "net/http" + "net/url" "strings" "sync" "time" @@ -13,12 +17,13 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" - "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) var ( sendTimeout = flag.Duration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to -remoteWrite.url") + proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+ + "Example: -remoteWrite.proxyURL=socks5://proxy:1234") tlsInsecureSkipVerify = flag.Bool("remoteWrite.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -remoteWrite.url") tlsCertFile = flagutil.NewArray("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. "+ @@ -41,11 +46,9 @@ var ( type client struct { urlLabelValue string remoteWriteURL string - host string - requestURI string authHeader string fq *persistentqueue.FastQueue - hc *fasthttp.HostClient + hc *http.Client requestDuration *metrics.Histogram requestsOKCount *metrics.Counter @@ -57,6 +60,28 @@ type client struct { } func newClient(argIdx int, remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQueue, concurrency int) *client { + tlsCfg, err := getTLSConfig(argIdx) + if err != nil { + logger.Panicf("FATAL: cannot initialize TLS config: %s", err) + } + tr := &http.Transport{ + Dial: statDial, + TLSClientConfig: tlsCfg, + TLSHandshakeTimeout: 5 * time.Second, + MaxConnsPerHost: 2 * concurrency, + WriteBufferSize: 16 * 1024, + } + pURL := proxyURL.GetOptionalArg(argIdx) + if len(pURL) > 0 { + if !strings.Contains(pURL, "://") { + logger.Fatalf("cannot parse -remoteWrite.proxyURL=%q: it must start with `http://`, `https://` or `socks5://`", pURL) + } + urlProxy, err := url.Parse(pURL) + if err != nil { + logger.Fatalf("cannot parse -remoteWrite.proxyURL=%q: %s", pURL, err) + } + tr.Proxy = http.ProxyURL(urlProxy) + } authHeader := "" username := basicAuthUsername.GetOptionalArg(argIdx) password := basicAuthPassword.GetOptionalArg(argIdx) @@ -73,63 +98,16 @@ func newClient(argIdx int, remoteWriteURL, urlLabelValue string, fq *persistentq } authHeader = "Bearer " + token } - - readTimeout := *sendTimeout - if readTimeout <= 0 { - readTimeout = time.Minute - } - writeTimeout := readTimeout - var u fasthttp.URI - u.Update(remoteWriteURL) - scheme := string(u.Scheme()) - switch scheme { - case "http", "https": - default: - logger.Fatalf("unsupported scheme in -remoteWrite.url=%q: %q. It must be http or https", remoteWriteURL, scheme) - } - host := string(u.Host()) - if len(host) == 0 { - logger.Fatalf("invalid -remoteWrite.url=%q: host cannot be empty. Make sure the url looks like `http://host:port/path`", remoteWriteURL) - } - requestURI := string(u.RequestURI()) - isTLS := scheme == "https" - var tlsCfg *tls.Config - if isTLS { - var err error - tlsCfg, err = getTLSConfig(argIdx) - if err != nil { - logger.Panicf("FATAL: cannot initialize TLS config: %s", err) - } - } - if !strings.Contains(host, ":") { - if isTLS { - host += ":443" - } else { - host += ":80" - } - } - maxConns := 2 * concurrency - hc := &fasthttp.HostClient{ - Addr: host, - Name: "vmagent", - Dial: statDial, - IsTLS: isTLS, - TLSConfig: tlsCfg, - MaxConns: maxConns, - MaxIdleConnDuration: 10 * readTimeout, - ReadTimeout: readTimeout, - WriteTimeout: writeTimeout, - MaxResponseBodySize: 1024 * 1024, - } c := &client{ urlLabelValue: urlLabelValue, remoteWriteURL: remoteWriteURL, - host: host, - requestURI: requestURI, authHeader: authHeader, fq: fq, - hc: hc, - stopCh: make(chan struct{}), + hc: &http.Client{ + Transport: tr, + Timeout: *sendTimeout, + }, + stopCh: make(chan struct{}), } c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue)) c.requestsOKCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue)) @@ -153,14 +131,17 @@ func (c *client) MustStop() { } func getTLSConfig(argIdx int) (*tls.Config, error) { - tlsConfig := &promauth.TLSConfig{ + c := &promauth.TLSConfig{ CAFile: tlsCAFile.GetOptionalArg(argIdx), CertFile: tlsCertFile.GetOptionalArg(argIdx), KeyFile: tlsKeyFile.GetOptionalArg(argIdx), ServerName: tlsServerName.GetOptionalArg(argIdx), InsecureSkipVerify: *tlsInsecureSkipVerify, } - cfg, err := promauth.NewConfig(".", nil, "", "", tlsConfig) + if c.CAFile == "" && c.CertFile == "" && c.KeyFile == "" && c.ServerName == "" && !c.InsecureSkipVerify { + return nil, nil + } + cfg, err := promauth.NewConfig(".", nil, "", "", c) if err != nil { return nil, fmt.Errorf("cannot populate TLS config: %w", err) } @@ -201,32 +182,24 @@ func (c *client) runWorker() { } func (c *client) sendBlock(block []byte) { - req := fasthttp.AcquireRequest() - req.SetRequestURI(c.requestURI) - req.SetHost(c.host) - req.Header.SetMethod("POST") - req.Header.Add("Content-Type", "application/x-protobuf") - req.Header.Add("Content-Encoding", "snappy") - req.Header.Add("X-Prometheus-Remote-Write-Version", "0.1.0") + req, err := http.NewRequest("POST", c.remoteWriteURL, bytes.NewBuffer(block)) + if err != nil { + logger.Panicf("BUG: unexected error from http.NewRequest(%q): %s", c.remoteWriteURL, err) + } + h := req.Header + h.Set("User-Agent", "vmagent") + h.Set("Content-Type", "application/x-protobuf") + h.Set("Content-Encoding", "snappy") + h.Set("X-Prometheus-Remote-Write-Version", "0.1.0") if c.authHeader != "" { req.Header.Set("Authorization", c.authHeader) } - req.SetBody(block) retryDuration := time.Second - resp := fasthttp.AcquireResponse() again: - select { - case <-c.stopCh: - fasthttp.ReleaseRequest(req) - fasthttp.ReleaseResponse(resp) - return - default: - } - startTime := time.Now() - err := doRequestWithPossibleRetry(c.hc, req, resp) + resp, err := c.hc.Do(req) c.requestDuration.UpdateDuration(startTime) if err != nil { c.errorsCount.Inc() @@ -236,39 +209,39 @@ again: } logger.Errorf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds", len(block), c.remoteWriteURL, err, retryDuration.Seconds()) - time.Sleep(retryDuration) + t := time.NewTimer(retryDuration) + select { + case <-c.stopCh: + t.Stop() + return + case <-t.C: + } c.retriesCount.Inc() goto again } - statusCode := resp.StatusCode() + statusCode := resp.StatusCode if statusCode/100 != 2 { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.urlLabelValue, statusCode)).Inc() retryDuration *= 2 if retryDuration > time.Minute { retryDuration = time.Minute } - logger.Errorf("unexpected status code received after sending a block with size %d bytes to %q: %d; response body=%q; re-sending the block in %.3f seconds", - len(block), c.remoteWriteURL, statusCode, resp.Body(), retryDuration.Seconds()) - time.Sleep(retryDuration) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + logger.Errorf("cannot read response body from %q: %s", c.remoteWriteURL, err) + } else { + logger.Errorf("unexpected status code received after sending a block with size %d bytes to %q: %d; response body=%q; re-sending the block in %.3f seconds", + len(block), c.remoteWriteURL, statusCode, body, retryDuration.Seconds()) + } + t := time.NewTimer(retryDuration) + select { + case <-c.stopCh: + t.Stop() + return + case <-t.C: + } c.retriesCount.Inc() goto again } c.requestsOKCount.Inc() - - // The block has been successfully sent to the remote storage. - fasthttp.ReleaseResponse(resp) - fasthttp.ReleaseRequest(req) -} - -func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) error { - // There is no need in calling DoTimeout, since the timeout must be already set in hc.ReadTimeout. - err := hc.Do(req, resp) - if err == nil { - return nil - } - if err != fasthttp.ErrConnectionClosed { - return err - } - // Retry request if the server closed the keep-alive connection during the first attempt. - return hc.Do(req, resp) } diff --git a/app/vmagent/remotewrite/statconn.go b/app/vmagent/remotewrite/statconn.go index 581dd8410..93cbf0a15 100644 --- a/app/vmagent/remotewrite/statconn.go +++ b/app/vmagent/remotewrite/statconn.go @@ -1,7 +1,9 @@ package remotewrite import ( + "fmt" "net" + "strings" "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -9,7 +11,10 @@ import ( "github.com/VictoriaMetrics/metrics" ) -func statDial(addr string) (conn net.Conn, err error) { +func statDial(network, addr string) (conn net.Conn, err error) { + if !strings.HasPrefix(network, "tcp") { + return nil, fmt.Errorf("unexpected network passed to statDial: %q; it must start from `tcp`", network) + } if netutil.TCP6Enabled() { conn, err = fasthttp.DialDualStack(addr) } else {