app/vmagent: add -remoteWrite.proxyURL command-line option

This option allows writing data to `-remoteWrite.url` via http, https or socks5 proxy.
This is similar to `proxy_url` option in `remote_write` section of Prometheus.
See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
This commit is contained in:
Aliaksandr Valialkin 2020-07-20 19:27:25 +03:00
parent efcbb51968
commit ad6290953c
2 changed files with 77 additions and 99 deletions

View file

@ -1,10 +1,14 @@
package remotewrite package remotewrite
import ( import (
"bytes"
"crypto/tls" "crypto/tls"
"encoding/base64" "encoding/base64"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"net/http"
"net/url"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -13,12 +17,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var ( var (
sendTimeout = flag.Duration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to -remoteWrite.url") sendTimeout = flag.Duration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to -remoteWrite.url")
proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+
"Example: -remoteWrite.proxyURL=socks5://proxy:1234")
tlsInsecureSkipVerify = flag.Bool("remoteWrite.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -remoteWrite.url") tlsInsecureSkipVerify = flag.Bool("remoteWrite.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -remoteWrite.url")
tlsCertFile = flagutil.NewArray("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. "+ tlsCertFile = flagutil.NewArray("remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. "+
@ -41,11 +46,9 @@ var (
type client struct { type client struct {
urlLabelValue string urlLabelValue string
remoteWriteURL string remoteWriteURL string
host string
requestURI string
authHeader string authHeader string
fq *persistentqueue.FastQueue fq *persistentqueue.FastQueue
hc *fasthttp.HostClient hc *http.Client
requestDuration *metrics.Histogram requestDuration *metrics.Histogram
requestsOKCount *metrics.Counter requestsOKCount *metrics.Counter
@ -57,6 +60,28 @@ type client struct {
} }
func newClient(argIdx int, remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQueue, concurrency int) *client { func newClient(argIdx int, remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQueue, concurrency int) *client {
tlsCfg, err := getTLSConfig(argIdx)
if err != nil {
logger.Panicf("FATAL: cannot initialize TLS config: %s", err)
}
tr := &http.Transport{
Dial: statDial,
TLSClientConfig: tlsCfg,
TLSHandshakeTimeout: 5 * time.Second,
MaxConnsPerHost: 2 * concurrency,
WriteBufferSize: 16 * 1024,
}
pURL := proxyURL.GetOptionalArg(argIdx)
if len(pURL) > 0 {
if !strings.Contains(pURL, "://") {
logger.Fatalf("cannot parse -remoteWrite.proxyURL=%q: it must start with `http://`, `https://` or `socks5://`", pURL)
}
urlProxy, err := url.Parse(pURL)
if err != nil {
logger.Fatalf("cannot parse -remoteWrite.proxyURL=%q: %s", pURL, err)
}
tr.Proxy = http.ProxyURL(urlProxy)
}
authHeader := "" authHeader := ""
username := basicAuthUsername.GetOptionalArg(argIdx) username := basicAuthUsername.GetOptionalArg(argIdx)
password := basicAuthPassword.GetOptionalArg(argIdx) password := basicAuthPassword.GetOptionalArg(argIdx)
@ -73,62 +98,15 @@ func newClient(argIdx int, remoteWriteURL, urlLabelValue string, fq *persistentq
} }
authHeader = "Bearer " + token authHeader = "Bearer " + token
} }
readTimeout := *sendTimeout
if readTimeout <= 0 {
readTimeout = time.Minute
}
writeTimeout := readTimeout
var u fasthttp.URI
u.Update(remoteWriteURL)
scheme := string(u.Scheme())
switch scheme {
case "http", "https":
default:
logger.Fatalf("unsupported scheme in -remoteWrite.url=%q: %q. It must be http or https", remoteWriteURL, scheme)
}
host := string(u.Host())
if len(host) == 0 {
logger.Fatalf("invalid -remoteWrite.url=%q: host cannot be empty. Make sure the url looks like `http://host:port/path`", remoteWriteURL)
}
requestURI := string(u.RequestURI())
isTLS := scheme == "https"
var tlsCfg *tls.Config
if isTLS {
var err error
tlsCfg, err = getTLSConfig(argIdx)
if err != nil {
logger.Panicf("FATAL: cannot initialize TLS config: %s", err)
}
}
if !strings.Contains(host, ":") {
if isTLS {
host += ":443"
} else {
host += ":80"
}
}
maxConns := 2 * concurrency
hc := &fasthttp.HostClient{
Addr: host,
Name: "vmagent",
Dial: statDial,
IsTLS: isTLS,
TLSConfig: tlsCfg,
MaxConns: maxConns,
MaxIdleConnDuration: 10 * readTimeout,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
MaxResponseBodySize: 1024 * 1024,
}
c := &client{ c := &client{
urlLabelValue: urlLabelValue, urlLabelValue: urlLabelValue,
remoteWriteURL: remoteWriteURL, remoteWriteURL: remoteWriteURL,
host: host,
requestURI: requestURI,
authHeader: authHeader, authHeader: authHeader,
fq: fq, fq: fq,
hc: hc, hc: &http.Client{
Transport: tr,
Timeout: *sendTimeout,
},
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue)) c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.urlLabelValue))
@ -153,14 +131,17 @@ func (c *client) MustStop() {
} }
func getTLSConfig(argIdx int) (*tls.Config, error) { func getTLSConfig(argIdx int) (*tls.Config, error) {
tlsConfig := &promauth.TLSConfig{ c := &promauth.TLSConfig{
CAFile: tlsCAFile.GetOptionalArg(argIdx), CAFile: tlsCAFile.GetOptionalArg(argIdx),
CertFile: tlsCertFile.GetOptionalArg(argIdx), CertFile: tlsCertFile.GetOptionalArg(argIdx),
KeyFile: tlsKeyFile.GetOptionalArg(argIdx), KeyFile: tlsKeyFile.GetOptionalArg(argIdx),
ServerName: tlsServerName.GetOptionalArg(argIdx), ServerName: tlsServerName.GetOptionalArg(argIdx),
InsecureSkipVerify: *tlsInsecureSkipVerify, InsecureSkipVerify: *tlsInsecureSkipVerify,
} }
cfg, err := promauth.NewConfig(".", nil, "", "", tlsConfig) if c.CAFile == "" && c.CertFile == "" && c.KeyFile == "" && c.ServerName == "" && !c.InsecureSkipVerify {
return nil, nil
}
cfg, err := promauth.NewConfig(".", nil, "", "", c)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot populate TLS config: %w", err) return nil, fmt.Errorf("cannot populate TLS config: %w", err)
} }
@ -201,32 +182,24 @@ func (c *client) runWorker() {
} }
func (c *client) sendBlock(block []byte) { func (c *client) sendBlock(block []byte) {
req := fasthttp.AcquireRequest() req, err := http.NewRequest("POST", c.remoteWriteURL, bytes.NewBuffer(block))
req.SetRequestURI(c.requestURI) if err != nil {
req.SetHost(c.host) logger.Panicf("BUG: unexected error from http.NewRequest(%q): %s", c.remoteWriteURL, err)
req.Header.SetMethod("POST") }
req.Header.Add("Content-Type", "application/x-protobuf") h := req.Header
req.Header.Add("Content-Encoding", "snappy") h.Set("User-Agent", "vmagent")
req.Header.Add("X-Prometheus-Remote-Write-Version", "0.1.0") h.Set("Content-Type", "application/x-protobuf")
h.Set("Content-Encoding", "snappy")
h.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authHeader != "" { if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader) req.Header.Set("Authorization", c.authHeader)
} }
req.SetBody(block)
retryDuration := time.Second retryDuration := time.Second
resp := fasthttp.AcquireResponse()
again: again:
select {
case <-c.stopCh:
fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(resp)
return
default:
}
startTime := time.Now() startTime := time.Now()
err := doRequestWithPossibleRetry(c.hc, req, resp) resp, err := c.hc.Do(req)
c.requestDuration.UpdateDuration(startTime) c.requestDuration.UpdateDuration(startTime)
if err != nil { if err != nil {
c.errorsCount.Inc() c.errorsCount.Inc()
@ -236,39 +209,39 @@ again:
} }
logger.Errorf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds", logger.Errorf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds",
len(block), c.remoteWriteURL, err, retryDuration.Seconds()) len(block), c.remoteWriteURL, err, retryDuration.Seconds())
time.Sleep(retryDuration) t := time.NewTimer(retryDuration)
select {
case <-c.stopCh:
t.Stop()
return
case <-t.C:
}
c.retriesCount.Inc() c.retriesCount.Inc()
goto again goto again
} }
statusCode := resp.StatusCode() statusCode := resp.StatusCode
if statusCode/100 != 2 { if statusCode/100 != 2 {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.urlLabelValue, statusCode)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.urlLabelValue, statusCode)).Inc()
retryDuration *= 2 retryDuration *= 2
if retryDuration > time.Minute { if retryDuration > time.Minute {
retryDuration = time.Minute retryDuration = time.Minute
} }
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logger.Errorf("cannot read response body from %q: %s", c.remoteWriteURL, err)
} else {
logger.Errorf("unexpected status code received after sending a block with size %d bytes to %q: %d; response body=%q; re-sending the block in %.3f seconds", logger.Errorf("unexpected status code received after sending a block with size %d bytes to %q: %d; response body=%q; re-sending the block in %.3f seconds",
len(block), c.remoteWriteURL, statusCode, resp.Body(), retryDuration.Seconds()) len(block), c.remoteWriteURL, statusCode, body, retryDuration.Seconds())
time.Sleep(retryDuration) }
t := time.NewTimer(retryDuration)
select {
case <-c.stopCh:
t.Stop()
return
case <-t.C:
}
c.retriesCount.Inc() c.retriesCount.Inc()
goto again goto again
} }
c.requestsOKCount.Inc() c.requestsOKCount.Inc()
// The block has been successfully sent to the remote storage.
fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)
}
func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) error {
// There is no need in calling DoTimeout, since the timeout must be already set in hc.ReadTimeout.
err := hc.Do(req, resp)
if err == nil {
return nil
}
if err != fasthttp.ErrConnectionClosed {
return err
}
// Retry request if the server closed the keep-alive connection during the first attempt.
return hc.Do(req, resp)
} }

View file

@ -1,7 +1,9 @@
package remotewrite package remotewrite
import ( import (
"fmt"
"net" "net"
"strings"
"sync/atomic" "sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
@ -9,7 +11,10 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
func statDial(addr string) (conn net.Conn, err error) { func statDial(network, addr string) (conn net.Conn, err error) {
if !strings.HasPrefix(network, "tcp") {
return nil, fmt.Errorf("unexpected network passed to statDial: %q; it must start from `tcp`", network)
}
if netutil.TCP6Enabled() { if netutil.TCP6Enabled() {
conn, err = fasthttp.DialDualStack(addr) conn, err = fasthttp.DialDualStack(addr)
} else { } else {