diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index aba2c78f9..3a05f6677 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,7 @@ # tip +* FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503 * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. * FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. diff --git a/lib/netutil/proxy.go b/lib/netutil/proxy.go deleted file mode 100644 index d2a899cb2..000000000 --- a/lib/netutil/proxy.go +++ /dev/null @@ -1,124 +0,0 @@ -package netutil - -import ( - "bufio" - "encoding/base64" - "fmt" - "net" - "net/url" - "strings" - - "github.com/VictoriaMetrics/fasthttp" -) - -// ProxyURL implements marshal interfaces for url.URL. -type ProxyURL struct { - url *url.URL -} - -// URL returns *url.URL. -func (pu ProxyURL) URL() *url.URL { - return pu.url -} - -// String implements String interface. -func (pu ProxyURL) String() string { - if pu.url == nil { - return "" - } - return pu.url.String() -} - -// MarshalYAML implements yaml.Marshaler interface. -func (pu ProxyURL) MarshalYAML() (interface{}, error) { - if pu.url == nil { - return nil, nil - } - return pu.url.String(), nil -} - -// UnmarshalYAML implements yaml.Unmarshaler interface. -func (pu *ProxyURL) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - parsedURL, err := url.Parse(s) - if err != nil { - return fmt.Errorf("failed parse proxy_url=%q as *url.URL, err=%w", s, err) - } - pu.url = parsedURL - return nil -} - -// GetProxyDialFunc returns dial proxy func for the given proxy url. -// currently only http based proxy is supported. -func GetProxyDialFunc(proxyURL *url.URL) (fasthttp.DialFunc, error) { - if strings.HasPrefix(proxyURL.Scheme, "http") { - return httpProxy(proxyURL.Host, MakeBasicAuthHeader(nil, proxyURL)), nil - } - return nil, fmt.Errorf("unknown scheme=%q for proxy_url: %q, must be http or https", proxyURL.Scheme, proxyURL) -} - -func httpProxy(proxyAddr string, auth []byte) fasthttp.DialFunc { - return func(addr string) (net.Conn, error) { - var ( - conn net.Conn - err error - ) - if TCP6Enabled() { - conn, err = fasthttp.DialDualStack(proxyAddr) - } else { - conn, err = fasthttp.Dial(proxyAddr) - } - if err != nil { - return nil, fmt.Errorf("cannot connect to the proxy=%q,err=%w", proxyAddr, err) - } - if err := MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { - _ = conn.Close() - return nil, err - } - return conn, nil - } -} - -// MakeBasicAuthHeader encodes and writes basic auth http header from url into given dst and returns it. -func MakeBasicAuthHeader(dst []byte, url *url.URL) []byte { - if url == nil || url.User == nil { - return dst - } - if len(url.User.Username()) > 0 { - dst = append(dst, "Proxy-Authorization: Basic "...) - dst = append(dst, base64.StdEncoding.EncodeToString([]byte(url.User.String()))...) - } - return dst -} - -// MakeProxyConnectCall execute CONNECT method to proxy with given destination address. -func MakeProxyConnectCall(conn net.Conn, dstAddr, auth []byte) error { - conReq := make([]byte, 0, 10) - conReq = append(conReq, []byte("CONNECT ")...) - conReq = append(conReq, dstAddr...) - conReq = append(conReq, []byte(" HTTP/1.1\r\n")...) - if len(auth) > 0 { - conReq = append(conReq, auth...) - conReq = append(conReq, []byte("\r\n")...) - } - conReq = append(conReq, []byte("\r\n")...) - - res := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(res) - res.SkipBody = true - if _, err := conn.Write(conReq); err != nil { - return err - } - if err := res.Read(bufio.NewReader(conn)); err != nil { - _ = conn.Close() - return fmt.Errorf("cannot read CONNECT response from proxy, err=%w", err) - } - if res.Header.StatusCode() != 200 { - _ = conn.Close() - return fmt.Errorf("unexpected proxy response status code, want: 200, get: %d", res.Header.StatusCode()) - } - return nil -} diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 2e24e8a9f..bec3ef23e 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -8,6 +8,7 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "strings" "time" @@ -66,10 +67,14 @@ func newClient(sw *ScrapeWork) *client { host += ":443" } } + dialFunc, err := newStatDialFunc(sw.ProxyURL, tlsCfg) + if err != nil { + logger.Fatalf("cannot create dial func: %s", err) + } hc := &fasthttp.HostClient{ Addr: host, Name: "vm_promscrape", - Dial: getDialStatConn(sw.ProxyURL), + Dial: dialFunc, IsTLS: isTLS, TLSConfig: tlsCfg, MaxIdleConnDuration: 2 * sw.ScrapeInterval, @@ -80,10 +85,14 @@ func newClient(sw *ScrapeWork) *client { } var sc *http.Client if *streamParse || sw.StreamParse { + var proxy func(*http.Request) (*url.URL, error) + if proxyURL := sw.ProxyURL.URL(); proxyURL != nil { + proxy = http.ProxyURL(proxyURL) + } sc = &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, - Proxy: http.ProxyURL(sw.ProxyURL), + Proxy: proxy, TLSHandshakeTimeout: 10 * time.Second, IdleConnTimeout: 2 * sw.ScrapeInterval, DisableCompression: *disableCompression || sw.DisableCompression, diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 65cd03cfc..31e1ce858 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" @@ -24,6 +23,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "gopkg.in/yaml.v2" ) @@ -72,7 +72,7 @@ type ScrapeConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"` @@ -497,7 +497,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf metricsPath: metricsPath, scheme: scheme, params: params, - proxyURL: sc.ProxyURL.URL(), + proxyURL: sc.ProxyURL, authConfig: ac, honorLabels: honorLabels, honorTimestamps: honorTimestamps, @@ -519,7 +519,7 @@ type scrapeWorkConfig struct { metricsPath string scheme string params map[string][]string - proxyURL *url.URL + proxyURL proxy.URL authConfig *promauth.Config honorLabels bool honorTimestamps bool diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index a487eda61..eec5a3c9a 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -58,7 +58,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/consul/consul.go b/lib/promscrape/discovery/consul/consul.go index 2e59f9747..9c980f779 100644 --- a/lib/promscrape/discovery/consul/consul.go +++ b/lib/promscrape/discovery/consul/consul.go @@ -3,8 +3,8 @@ package consul import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) // SDConfig represents service discovery config for Consul. @@ -17,7 +17,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Services []string `yaml:"services,omitempty"` Tags []string `yaml:"tags,omitempty"` diff --git a/lib/promscrape/discovery/dockerswarm/api.go b/lib/promscrape/discovery/dockerswarm/api.go index fd72a13ee..c79d12fc7 100644 --- a/lib/promscrape/discovery/dockerswarm/api.go +++ b/lib/promscrape/discovery/dockerswarm/api.go @@ -39,7 +39,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { if err != nil { return nil, err } - client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err) } diff --git a/lib/promscrape/discovery/dockerswarm/dockerswarm.go b/lib/promscrape/discovery/dockerswarm/dockerswarm.go index e09f27315..bd9564306 100644 --- a/lib/promscrape/discovery/dockerswarm/dockerswarm.go +++ b/lib/promscrape/discovery/dockerswarm/dockerswarm.go @@ -3,8 +3,8 @@ package dockerswarm import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) // SDConfig represents docker swarm service discovery configuration @@ -16,7 +16,7 @@ type SDConfig struct { Port int `yaml:"port,omitempty"` Filters []Filter `yaml:"filters,omitempty"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` diff --git a/lib/promscrape/discovery/eureka/api.go b/lib/promscrape/discovery/eureka/api.go index cfb121497..255c9005d 100644 --- a/lib/promscrape/discovery/eureka/api.go +++ b/lib/promscrape/discovery/eureka/api.go @@ -43,7 +43,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/eureka/eureka.go b/lib/promscrape/discovery/eureka/eureka.go index 53b68fbb3..c8ebc55ef 100644 --- a/lib/promscrape/discovery/eureka/eureka.go +++ b/lib/promscrape/discovery/eureka/eureka.go @@ -5,9 +5,9 @@ import ( "fmt" "strconv" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) const appsAPIPath = "/apps" @@ -22,7 +22,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // RefreshInterval time.Duration `yaml:"refresh_interval"` // refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option. diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index 39c09e8d8..2c0463214 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -56,7 +56,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } ac = acNew } - client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index f98343694..2c7a48995 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -3,8 +3,8 @@ package kubernetes import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) // SDConfig represents kubernetes-based service discovery config. @@ -16,7 +16,7 @@ type SDConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Namespaces Namespaces `yaml:"namespaces,omitempty"` Selectors []Selector `yaml:"selectors,omitempty"` diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 10963894a..b971be8ec 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -6,13 +6,12 @@ import ( "fmt" "net" "net/http" - "net/url" "strings" "sync" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/fasthttp" ) @@ -46,7 +45,7 @@ type Client struct { } // NewClient returns new Client for the given apiServer and the given ac. -func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Client, error) { +func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Client, error) { var ( dialFunc fasthttp.DialFunc tlsCfg *tls.Config @@ -63,12 +62,6 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien return net.Dial("unix", dialAddr) } } - if proxyURL != nil { - dialFunc, err = netutil.GetProxyDialFunc(proxyURL) - if err != nil { - return nil, err - } - } hostPort := string(u.Host()) isTLS := string(u.Scheme()) == "https" @@ -82,10 +75,15 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien } hostPort = net.JoinHostPort(hostPort, port) } + if dialFunc == nil { + dialFunc, err = proxyURL.NewDialFunc(tlsCfg) + if err != nil { + return nil, err + } + } hc := &fasthttp.HostClient{ Addr: hostPort, Name: "vm_promscrape/discovery", - DialDualStack: netutil.TCP6Enabled(), IsTLS: isTLS, TLSConfig: tlsCfg, ReadTimeout: time.Minute, @@ -97,7 +95,6 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien blockingClient := &fasthttp.HostClient{ Addr: hostPort, Name: "vm_promscrape/discovery", - DialDualStack: netutil.TCP6Enabled(), IsTLS: isTLS, TLSConfig: tlsCfg, ReadTimeout: BlockingClientReadTimeout, diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 59d84a510..30f28e3a3 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "math/bits" - "net/url" "strconv" "strings" "sync" @@ -18,6 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/metrics" xxhash "github.com/cespare/xxhash/v2" ) @@ -72,7 +72,7 @@ type ScrapeWork struct { AuthConfig *promauth.Config // ProxyURL HTTP proxy url - ProxyURL *url.URL + ProxyURL proxy.URL // Optional `metric_relabel_configs`. MetricRelabelConfigs []promrelabel.ParsedRelabelConfig diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index 893beefca..2fefaf420 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -2,14 +2,15 @@ package promscrape import ( "context" + "crypto/tls" "fmt" "net" - "net/url" "sync" "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) @@ -48,18 +49,13 @@ var ( stdDialerOnce sync.Once ) -func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc { - auth := netutil.MakeBasicAuthHeader(nil, proxyURL) - return func(addr string) (conn net.Conn, err error) { - dialAddr := addr - if proxyURL != nil { - dialAddr = proxyURL.Host - } - if netutil.TCP6Enabled() { - conn, err = fasthttp.DialDualStack(dialAddr) - } else { - conn, err = fasthttp.Dial(dialAddr) - } +func newStatDialFunc(proxyURL proxy.URL, tlsConfig *tls.Config) (fasthttp.DialFunc, error) { + dialFunc, err := proxyURL.NewDialFunc(tlsConfig) + if err != nil { + return nil, err + } + statDialFunc := func(addr string) (net.Conn, error) { + conn, err := dialFunc(addr) dialsTotal.Inc() if err != nil { dialErrors.Inc() @@ -69,17 +65,12 @@ func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc { return nil, err } conns.Inc() - if proxyURL != nil { - if err := netutil.MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { - _ = conn.Close() - return nil, err - } - } sc := &statConn{ Conn: conn, } return sc, nil } + return statDialFunc, nil } var ( diff --git a/lib/proxy/proxy.go b/lib/proxy/proxy.go new file mode 100644 index 000000000..82cb7b46a --- /dev/null +++ b/lib/proxy/proxy.go @@ -0,0 +1,117 @@ +package proxy + +import ( + "bufio" + "crypto/tls" + "encoding/base64" + "fmt" + "net" + "net/url" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/fasthttp" +) + +// URL implements YAML.Marshaler and yaml.Unmarshaler interfaces for url.URL. +type URL struct { + url *url.URL +} + +// URL return the underlying url. +func (u *URL) URL() *url.URL { + if u == nil || u.url == nil { + return nil + } + return u.url +} + +// MarshalYAML implements yaml.Marshaler interface. +func (u *URL) MarshalYAML() (interface{}, error) { + if u.url == nil { + return nil, nil + } + return u.url.String(), nil +} + +// UnmarshalYAML implements yaml.Unmarshaler interface. +func (u *URL) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + parsedURL, err := url.Parse(s) + if err != nil { + return fmt.Errorf("cannot parse proxy_url=%q as *url.URL: %w", s, err) + } + u.url = parsedURL + return nil +} + +// NewDialFunc returns dial func for the given pu and tlsConfig. +func (u *URL) NewDialFunc(tlsConfig *tls.Config) (fasthttp.DialFunc, error) { + if u == nil || u.url == nil { + return defaultDialFunc, nil + } + pu := u.url + if pu.Scheme != "http" && pu.Scheme != "https" { + return nil, fmt.Errorf("unknown scheme=%q for proxy_url=%q, must be http or https", pu.Scheme, pu) + } + var authHeader string + if pu.User != nil && len(pu.User.Username()) > 0 { + userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String())) + authHeader = "Proxy-Authorization: Basic " + userPasswordEncoded + "\r\n" + } + dialFunc := func(addr string) (net.Conn, error) { + proxyConn, err := defaultDialFunc(pu.Host) + if err != nil { + return nil, fmt.Errorf("cannot connect to proxy %q: %w", pu, err) + } + if pu.Scheme == "https" { + proxyConn = tls.Client(proxyConn, tlsConfig) + } + conn, err := sendConnectRequest(proxyConn, addr, authHeader) + if err != nil { + _ = proxyConn.Close() + return nil, fmt.Errorf("error when sending CONNECT request to proxy %q: %w", pu, err) + } + return conn, nil + } + return dialFunc, nil +} + +func defaultDialFunc(addr string) (net.Conn, error) { + if netutil.TCP6Enabled() { + return fasthttp.DialDualStack(addr) + } + return fasthttp.Dial(addr) +} + +// sendConnectRequest sends CONNECT request to proxyConn for the given addr and authHeader and returns the established connection to dstAddr. +func sendConnectRequest(proxyConn net.Conn, dstAddr, authHeader string) (net.Conn, error) { + req := "CONNECT " + dstAddr + " HTTP/1.1\r\nHost: " + dstAddr + "\r\n" + authHeader + "\r\n" + if _, err := proxyConn.Write([]byte(req)); err != nil { + return nil, fmt.Errorf("cannot send CONNECT request for dstAddr=%q: %w", dstAddr, err) + } + var res fasthttp.Response + res.SkipBody = true + conn := &bufferedReaderConn{ + br: bufio.NewReader(proxyConn), + Conn: proxyConn, + } + if err := res.Read(conn.br); err != nil { + return nil, fmt.Errorf("cannot read CONNECT response for dstAddr=%q: %w", dstAddr, err) + } + if statusCode := res.Header.StatusCode(); statusCode != 200 { + return nil, fmt.Errorf("unexpected status code received: %d; want: 200", statusCode) + } + return conn, nil +} + +type bufferedReaderConn struct { + net.Conn + br *bufio.Reader +} + +func (brc *bufferedReaderConn) Read(p []byte) (int, error) { + return brc.br.Read(p) +}