diff --git a/lib/netutil/proxy.go b/lib/netutil/proxy.go new file mode 100644 index 000000000..d2a899cb2 --- /dev/null +++ b/lib/netutil/proxy.go @@ -0,0 +1,124 @@ +package netutil + +import ( + "bufio" + "encoding/base64" + "fmt" + "net" + "net/url" + "strings" + + "github.com/VictoriaMetrics/fasthttp" +) + +// ProxyURL implements marshal interfaces for url.URL. +type ProxyURL struct { + url *url.URL +} + +// URL returns *url.URL. +func (pu ProxyURL) URL() *url.URL { + return pu.url +} + +// String implements String interface. +func (pu ProxyURL) String() string { + if pu.url == nil { + return "" + } + return pu.url.String() +} + +// MarshalYAML implements yaml.Marshaler interface. +func (pu ProxyURL) MarshalYAML() (interface{}, error) { + if pu.url == nil { + return nil, nil + } + return pu.url.String(), nil +} + +// UnmarshalYAML implements yaml.Unmarshaler interface. +func (pu *ProxyURL) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + parsedURL, err := url.Parse(s) + if err != nil { + return fmt.Errorf("failed parse proxy_url=%q as *url.URL, err=%w", s, err) + } + pu.url = parsedURL + return nil +} + +// GetProxyDialFunc returns dial proxy func for the given proxy url. +// currently only http based proxy is supported. +func GetProxyDialFunc(proxyURL *url.URL) (fasthttp.DialFunc, error) { + if strings.HasPrefix(proxyURL.Scheme, "http") { + return httpProxy(proxyURL.Host, MakeBasicAuthHeader(nil, proxyURL)), nil + } + return nil, fmt.Errorf("unknown scheme=%q for proxy_url: %q, must be http or https", proxyURL.Scheme, proxyURL) +} + +func httpProxy(proxyAddr string, auth []byte) fasthttp.DialFunc { + return func(addr string) (net.Conn, error) { + var ( + conn net.Conn + err error + ) + if TCP6Enabled() { + conn, err = fasthttp.DialDualStack(proxyAddr) + } else { + conn, err = fasthttp.Dial(proxyAddr) + } + if err != nil { + return nil, fmt.Errorf("cannot connect to the proxy=%q,err=%w", proxyAddr, err) + } + if err := MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { + _ = conn.Close() + return nil, err + } + return conn, nil + } +} + +// MakeBasicAuthHeader encodes and writes basic auth http header from url into given dst and returns it. +func MakeBasicAuthHeader(dst []byte, url *url.URL) []byte { + if url == nil || url.User == nil { + return dst + } + if len(url.User.Username()) > 0 { + dst = append(dst, "Proxy-Authorization: Basic "...) + dst = append(dst, base64.StdEncoding.EncodeToString([]byte(url.User.String()))...) + } + return dst +} + +// MakeProxyConnectCall execute CONNECT method to proxy with given destination address. +func MakeProxyConnectCall(conn net.Conn, dstAddr, auth []byte) error { + conReq := make([]byte, 0, 10) + conReq = append(conReq, []byte("CONNECT ")...) + conReq = append(conReq, dstAddr...) + conReq = append(conReq, []byte(" HTTP/1.1\r\n")...) + if len(auth) > 0 { + conReq = append(conReq, auth...) + conReq = append(conReq, []byte("\r\n")...) + } + conReq = append(conReq, []byte("\r\n")...) + + res := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(res) + res.SkipBody = true + if _, err := conn.Write(conReq); err != nil { + return err + } + if err := res.Read(bufio.NewReader(conn)); err != nil { + _ = conn.Close() + return fmt.Errorf("cannot read CONNECT response from proxy, err=%w", err) + } + if res.Header.StatusCode() != 200 { + _ = conn.Close() + return fmt.Errorf("unexpected proxy response status code, want: 200, get: %d", res.Header.StatusCode()) + } + return nil +} diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 98e1ff230..2e24e8a9f 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -69,7 +69,7 @@ func newClient(sw *ScrapeWork) *client { hc := &fasthttp.HostClient{ Addr: host, Name: "vm_promscrape", - Dial: statDial, + Dial: getDialStatConn(sw.ProxyURL), IsTLS: isTLS, TLSConfig: tlsCfg, MaxIdleConnDuration: 2 * sw.ScrapeInterval, @@ -83,6 +83,7 @@ func newClient(sw *ScrapeWork) *client { sc = &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, + Proxy: http.ProxyURL(sw.ProxyURL), TLSHandshakeTimeout: 10 * time.Second, IdleConnTimeout: 2 * sw.ScrapeInterval, DisableCompression: *disableCompression || sw.DisableCompression, @@ -93,9 +94,8 @@ func newClient(sw *ScrapeWork) *client { } } return &client{ - hc: hc, - sc: sc, - + hc: hc, + sc: sc, scrapeURL: sw.ScrapeURL, host: host, requestURI: requestURI, diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index e4aaca37e..65cd03cfc 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" @@ -71,6 +72,7 @@ type ScrapeConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"` @@ -495,6 +497,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf metricsPath: metricsPath, scheme: scheme, params: params, + proxyURL: sc.ProxyURL.URL(), authConfig: ac, honorLabels: honorLabels, honorTimestamps: honorTimestamps, @@ -516,6 +519,7 @@ type scrapeWorkConfig struct { metricsPath string scheme string params map[string][]string + proxyURL *url.URL authConfig *promauth.Config honorLabels bool honorTimestamps bool @@ -750,6 +754,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e HonorTimestamps: swc.honorTimestamps, OriginalLabels: originalLabels, Labels: labels, + ProxyURL: swc.proxyURL, AuthConfig: swc.authConfig, MetricRelabelConfigs: swc.metricRelabelConfigs, SampleLimit: swc.sampleLimit, diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index 6a888eabb..a487eda61 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -58,7 +58,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/consul/consul.go b/lib/promscrape/discovery/consul/consul.go index 6376d4062..2e59f9747 100644 --- a/lib/promscrape/discovery/consul/consul.go +++ b/lib/promscrape/discovery/consul/consul.go @@ -3,6 +3,7 @@ package consul import ( "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) @@ -16,6 +17,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Services []string `yaml:"services,omitempty"` Tags []string `yaml:"tags,omitempty"` diff --git a/lib/promscrape/discovery/dockerswarm/api.go b/lib/promscrape/discovery/dockerswarm/api.go index 3853600a4..fd72a13ee 100644 --- a/lib/promscrape/discovery/dockerswarm/api.go +++ b/lib/promscrape/discovery/dockerswarm/api.go @@ -34,11 +34,12 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { port: sdc.Port, filtersQueryArg: getFiltersQueryArg(sdc.Filters), } + ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) if err != nil { return nil, err } - client, err := discoveryutils.NewClient(sdc.Host, ac) + client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err) } diff --git a/lib/promscrape/discovery/dockerswarm/dockerswarm.go b/lib/promscrape/discovery/dockerswarm/dockerswarm.go index 61b4b02f6..e09f27315 100644 --- a/lib/promscrape/discovery/dockerswarm/dockerswarm.go +++ b/lib/promscrape/discovery/dockerswarm/dockerswarm.go @@ -3,6 +3,7 @@ package dockerswarm import ( "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) @@ -15,7 +16,7 @@ type SDConfig struct { Port int `yaml:"port,omitempty"` Filters []Filter `yaml:"filters,omitempty"` - // TODO: add support for proxy_url + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` diff --git a/lib/promscrape/discovery/eureka/api.go b/lib/promscrape/discovery/eureka/api.go index f1a319bc3..cfb121497 100644 --- a/lib/promscrape/discovery/eureka/api.go +++ b/lib/promscrape/discovery/eureka/api.go @@ -43,7 +43,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/eureka/eureka.go b/lib/promscrape/discovery/eureka/eureka.go index 5dca81937..53b68fbb3 100644 --- a/lib/promscrape/discovery/eureka/eureka.go +++ b/lib/promscrape/discovery/eureka/eureka.go @@ -5,9 +5,9 @@ import ( "fmt" "strconv" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" - + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) const appsAPIPath = "/apps" @@ -22,6 +22,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // RefreshInterval time.Duration `yaml:"refresh_interval"` // refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option. diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index caa330643..39c09e8d8 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -56,7 +56,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } ac = acNew } - client, err := discoveryutils.NewClient(apiServer, ac) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index 87c381963..f98343694 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -3,6 +3,7 @@ package kubernetes import ( "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) @@ -15,6 +16,7 @@ type SDConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Namespaces Namespaces `yaml:"namespaces,omitempty"` Selectors []Selector `yaml:"selectors,omitempty"` diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 9469fe06d..10963894a 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + "net/url" "strings" "sync" "time" @@ -45,11 +46,12 @@ type Client struct { } // NewClient returns new Client for the given apiServer and the given ac. -func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { +func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Client, error) { var ( dialFunc fasthttp.DialFunc tlsCfg *tls.Config u fasthttp.URI + err error ) u.Update(apiServer) @@ -61,6 +63,13 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { return net.Dial("unix", dialAddr) } } + if proxyURL != nil { + dialFunc, err = netutil.GetProxyDialFunc(proxyURL) + if err != nil { + return nil, err + } + } + hostPort := string(u.Host()) isTLS := string(u.Scheme()) == "https" if isTLS && ac != nil { diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 62f759d58..59d84a510 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "math/bits" + "net/url" "strconv" "strings" "sync" @@ -70,6 +71,9 @@ type ScrapeWork struct { // Auth config AuthConfig *promauth.Config + // ProxyURL HTTP proxy url + ProxyURL *url.URL + // Optional `metric_relabel_configs`. MetricRelabelConfigs []promrelabel.ParsedRelabelConfig diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index 73c6dcc94..893beefca 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "net/url" "sync" "sync/atomic" "time" @@ -47,25 +48,38 @@ var ( stdDialerOnce sync.Once ) -func statDial(addr string) (conn net.Conn, err error) { - if netutil.TCP6Enabled() { - conn, err = fasthttp.DialDualStack(addr) - } else { - conn, err = fasthttp.Dial(addr) - } - dialsTotal.Inc() - if err != nil { - dialErrors.Inc() - if !netutil.TCP6Enabled() { - err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err) +func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc { + auth := netutil.MakeBasicAuthHeader(nil, proxyURL) + return func(addr string) (conn net.Conn, err error) { + dialAddr := addr + if proxyURL != nil { + dialAddr = proxyURL.Host } - return nil, err + if netutil.TCP6Enabled() { + conn, err = fasthttp.DialDualStack(dialAddr) + } else { + conn, err = fasthttp.Dial(dialAddr) + } + dialsTotal.Inc() + if err != nil { + dialErrors.Inc() + if !netutil.TCP6Enabled() { + err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err) + } + return nil, err + } + conns.Inc() + if proxyURL != nil { + if err := netutil.MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { + _ = conn.Close() + return nil, err + } + } + sc := &statConn{ + Conn: conn, + } + return sc, nil } - conns.Inc() - sc := &statConn{ - Conn: conn, - } - return sc, nil } var (