lib/promscrape: support for simple HTTP proxies without CONNECT method support such as https://github.com/prometheus-community/PushProx

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179
This commit is contained in:
Aliaksandr Valialkin 2021-04-04 00:40:08 +03:00
parent 4c56b1a6dd
commit 5153410ced
14 changed files with 169 additions and 70 deletions

View file

@ -178,7 +178,7 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
Please file feature requests to [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.
`vmagent` also support the following additional options in `scrape_config` section:
`vmagent` also support the following additional options in `scrape_configs` section:
* `disable_compression: true` - to disable response compression on a per-job basis. By default `vmagent` requests compressed responses from scrape targets
to save network bandwidth.

View file

@ -10,8 +10,10 @@
* FEATURE: vmagent: add support for `authorization` section in `-promscrape.config` in the same way as [Prometheus 2.26 does](https://github.com/prometheus/prometheus/pull/8512).
* FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167).
* FEATURE: vmagent: add AWS IAM roles for tasks support for EC2 service discovery according to [these docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html).
* FEATURE: vmagent: add support for `proxy_tls_config`, `proxy_authorization`, `proxy_basic_auth`, `proxy_bearer_token` and `proxy_bearer_token_file` options to `consul_sd_config`, `dockerswarm_sd_config` and `eureka_sd_config` sections.
* FEATURE: vmauth: add support for authorization via [bearer token](https://swagger.io/docs/specification/authentication/bearer-authentication/). See [the docs](https://victoriametrics.github.io/vmauth.html#auth-config) for details.
* BUGFIX: vmagent: properly work with simple HTTP proxies which don't support `CONNECT` method. For example, [PushProx](https://github.com/prometheus-community/PushProx). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179).
* BUGFIX: vmagent: properly discover targets if multiple namespace selectors are put inside `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1170).
* BUGFIX: properly generate filename for `*.tar.gz` archive inside `_checksums.txt` file posted at [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1171).

View file

@ -178,7 +178,7 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
Please file feature requests to [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.
`vmagent` also support the following additional options in `scrape_config` section:
`vmagent` also support the following additional options in `scrape_configs` section:
* `disable_compression: true` - to disable response compression on a per-job basis. By default `vmagent` requests compressed responses from scrape targets
to save network bandwidth.

View file

@ -45,6 +45,15 @@ type HTTPClientConfig struct {
TLSConfig *TLSConfig `yaml:"tls_config,omitempty"`
}
// ProxyClientConfig represents proxy client config.
type ProxyClientConfig struct {
Authorization *Authorization `yaml:"proxy_authorization,omitempty"`
BasicAuth *BasicAuthConfig `yaml:"proxy_basic_auth,omitempty"`
BearerToken string `yaml:"proxy_bearer_token,omitempty"`
BearerTokenFile string `yaml:"proxy_bearer_token_file,omitempty"`
TLSConfig *TLSConfig `yaml:"proxy_tls_config,omitempty"`
}
// Config is auth config.
type Config struct {
// Optional `Authorization` header.
@ -103,6 +112,11 @@ func (hcc *HTTPClientConfig) NewConfig(baseDir string) (*Config, error) {
return NewConfig(baseDir, hcc.Authorization, hcc.BasicAuth, hcc.BearerToken, hcc.BearerTokenFile, hcc.TLSConfig)
}
// NewConfig creates auth config for the given pcc.
func (pcc *ProxyClientConfig) NewConfig(baseDir string) (*Config, error) {
return NewConfig(baseDir, pcc.Authorization, pcc.BasicAuth, pcc.BearerToken, pcc.BearerTokenFile, pcc.TLSConfig)
}
// NewConfig creates auth config from the given args.
func NewConfig(baseDir string, az *Authorization, basicAuth *BasicAuthConfig, bearerToken, bearerTokenFile string, tlsConfig *TLSConfig) (*Config, error) {
var authorization string

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics"
)
@ -46,6 +47,7 @@ type client struct {
host string
requestURI string
authHeader string
proxyAuthHeader string
denyRedirects bool
disableCompression bool
disableKeepAlive bool
@ -61,6 +63,22 @@ func newClient(sw *ScrapeWork) *client {
if isTLS {
tlsCfg = sw.AuthConfig.NewTLSConfig()
}
proxyAuthHeader := ""
proxyURL := sw.ProxyURL
if !isTLS && proxyURL.IsHTTPOrHTTPS() {
// Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets
// like net/http package from Go does.
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
pu := proxyURL.URL()
host = pu.Host
requestURI = sw.ScrapeURL
isTLS = pu.Scheme == "https"
if isTLS {
tlsCfg = sw.ProxyAuthConfig.NewTLSConfig()
}
proxyAuthHeader = proxyURL.GetAuthHeader(sw.ProxyAuthConfig)
proxyURL = proxy.URL{}
}
if !strings.Contains(host, ":") {
if !isTLS {
host += ":80"
@ -68,7 +86,7 @@ func newClient(sw *ScrapeWork) *client {
host += ":443"
}
}
dialFunc, err := newStatDialFunc(sw.ProxyURL, sw.ProxyAuthConfig)
dialFunc, err := newStatDialFunc(proxyURL, sw.ProxyAuthConfig)
if err != nil {
logger.Fatalf("cannot create dial func: %s", err)
}
@ -86,14 +104,14 @@ func newClient(sw *ScrapeWork) *client {
}
var sc *http.Client
if *streamParse || sw.StreamParse {
var proxy func(*http.Request) (*url.URL, error)
var proxyURLFunc func(*http.Request) (*url.URL, error)
if proxyURL := sw.ProxyURL.URL(); proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
proxyURLFunc = http.ProxyURL(proxyURL)
}
sc = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: proxy,
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
@ -115,6 +133,7 @@ func newClient(sw *ScrapeWork) *client {
host: host,
requestURI: requestURI,
authHeader: sw.AuthConfig.Authorization,
proxyAuthHeader: proxyAuthHeader,
denyRedirects: sw.DenyRedirects,
disableCompression: sw.DisableCompression,
disableKeepAlive: sw.DisableKeepAlive,
@ -138,6 +157,9 @@ func (c *client) GetStreamReader() (*streamReader, error) {
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
if c.proxyAuthHeader != "" {
req.Header.Set("Proxy-Authorization", c.proxyAuthHeader)
}
resp, err := c.sc.Do(req)
if err != nil {
cancel()
@ -169,15 +191,18 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
if c.proxyAuthHeader != "" {
req.Header.Set("Proxy-Authorization", c.proxyAuthHeader)
}
if !*disableCompression && !c.disableCompression {
req.Header.Set("Accept-Encoding", "gzip")
}
if *disableKeepAlive || c.disableKeepAlive {
req.SetConnectionClose()
}
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
resp := fasthttp.AcquireResponse()
swapResponseBodies := len(dst) == 0
if swapResponseBodies {

View file

@ -109,16 +109,12 @@ type ScrapeConfig struct {
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs,omitempty"`
// These options are supported only by lib/promscrape.
DisableCompression bool `yaml:"disable_compression,omitempty"`
DisableKeepAlive bool `yaml:"disable_keepalive,omitempty"`
StreamParse bool `yaml:"stream_parse,omitempty"`
ScrapeAlignInterval time.Duration `yaml:"scrape_align_interval,omitempty"`
ScrapeOffset time.Duration `yaml:"scrape_offset,omitempty"`
ProxyAuthorization *promauth.Authorization `yaml:"proxy_authorization,omitempty"`
ProxyBasicAuth *promauth.BasicAuthConfig `yaml:"proxy_basic_auth,omitempty"`
ProxyBearerToken string `yaml:"proxy_bearer_token,omitempty"`
ProxyBearerTokenFile string `yaml:"proxy_bearer_token_file,omitempty"`
ProxyTLSConfig *promauth.TLSConfig `yaml:"proxy_tls_config,omitempty"`
DisableCompression bool `yaml:"disable_compression,omitempty"`
DisableKeepAlive bool `yaml:"disable_keepalive,omitempty"`
StreamParse bool `yaml:"stream_parse,omitempty"`
ScrapeAlignInterval time.Duration `yaml:"scrape_align_interval,omitempty"`
ScrapeOffset time.Duration `yaml:"scrape_offset,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
// This is set in loadConfig
swc *scrapeWorkConfig
@ -551,7 +547,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
if err != nil {
return nil, fmt.Errorf("cannot parse auth config for `job_name` %q: %w", jobName, err)
}
proxyAC, err := promauth.NewConfig(baseDir, sc.ProxyAuthorization, sc.ProxyBasicAuth, sc.ProxyBearerToken, sc.ProxyBearerTokenFile, sc.ProxyTLSConfig)
proxyAC, err := sc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config for `job_name` %q: %w", jobName, err)
}

View file

@ -65,7 +65,11 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
apiServer = scheme + "://" + apiServer
}
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
}
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL, proxyAC)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}

View file

@ -11,19 +11,20 @@ import (
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config
type SDConfig struct {
Server string `yaml:"server,omitempty"`
Token *string `yaml:"token"`
Datacenter string `yaml:"datacenter"`
Scheme string `yaml:"scheme,omitempty"`
Username string `yaml:"username"`
Password string `yaml:"password"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
Services []string `yaml:"services,omitempty"`
Tags []string `yaml:"tags,omitempty"`
NodeMeta map[string]string `yaml:"node_meta,omitempty"`
TagSeparator *string `yaml:"tag_separator,omitempty"`
AllowStale bool `yaml:"allow_stale,omitempty"`
Server string `yaml:"server,omitempty"`
Token *string `yaml:"token"`
Datacenter string `yaml:"datacenter"`
Scheme string `yaml:"scheme,omitempty"`
Username string `yaml:"username"`
Password string `yaml:"password"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
Services []string `yaml:"services,omitempty"`
Tags []string `yaml:"tags,omitempty"`
NodeMeta map[string]string `yaml:"node_meta,omitempty"`
TagSeparator *string `yaml:"tag_separator,omitempty"`
AllowStale bool `yaml:"allow_stale,omitempty"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.consulSDCheckInterval` command-line option.
}

View file

@ -35,9 +35,13 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
ac, err := sdc.HTTPClientConfig.NewConfig(baseDir)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL)
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
}
client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL, proxyAC)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err)
}

View file

@ -16,8 +16,9 @@ type SDConfig struct {
Port int `yaml:"port,omitempty"`
Filters []Filter `yaml:"filters,omitempty"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
// refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option
}

View file

@ -30,7 +30,11 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
apiServer = scheme + "://" + apiServer
}
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
}
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL, proxyAC)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}

View file

@ -16,9 +16,10 @@ const appsAPIPath = "/apps"
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka
type SDConfig struct {
Server string `yaml:"server,omitempty"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
HTTPClientConfig promauth.HTTPClientConfig `ymal:",inline"`
Server string `yaml:"server,omitempty"`
HTTPClientConfig promauth.HTTPClientConfig `ymal:",inline"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option.
}

View file

@ -40,22 +40,20 @@ type Client struct {
// blockingClient is used for long-polling requests.
blockingClient *fasthttp.HostClient
ac *promauth.Config
apiServer string
hostPort string
authHeader string
proxyAuthHeader string
sendFullURL bool
}
// NewClient returns new Client for the given apiServer and the given ac.
func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Client, error) {
var (
dialFunc fasthttp.DialFunc
tlsCfg *tls.Config
u fasthttp.URI
err error
)
// NewClient returns new Client for the given args.
func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL, proxyAC *promauth.Config) (*Client, error) {
var u fasthttp.URI
u.Update(apiServer)
// special case for unix socket connection
var dialFunc fasthttp.DialFunc
if string(u.Scheme()) == "unix" {
dialAddr := string(u.Path())
apiServer = "http://"
@ -66,9 +64,25 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Clie
hostPort := string(u.Host())
isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config
if isTLS {
tlsCfg = ac.NewTLSConfig()
}
sendFullURL := !isTLS && proxyURL.IsHTTPOrHTTPS()
proxyAuthHeader := ""
if sendFullURL {
// Send full urls in requests to a proxy host for non-TLS apiServer
// like net/http package from Go does.
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
pu := proxyURL.URL()
hostPort = pu.Host
isTLS = pu.Scheme == "https"
if isTLS {
tlsCfg = proxyAC.NewTLSConfig()
}
proxyAuthHeader = proxyURL.GetAuthHeader(proxyAC)
proxyURL = proxy.URL{}
}
if !strings.Contains(hostPort, ":") {
port := "80"
if isTLS {
@ -77,7 +91,8 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Clie
hostPort = net.JoinHostPort(hostPort, port)
}
if dialFunc == nil {
dialFunc, err = proxyURL.NewDialFunc(ac)
var err error
dialFunc, err = proxyURL.NewDialFunc(proxyAC)
if err != nil {
return nil, err
}
@ -104,12 +119,17 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Clie
MaxConns: 64 * 1024,
Dial: dialFunc,
}
authHeader := ""
if ac != nil {
authHeader = ac.Authorization
}
return &Client{
hc: hc,
blockingClient: blockingClient,
ac: ac,
apiServer: apiServer,
hostPort: hostPort,
hc: hc,
blockingClient: blockingClient,
apiServer: apiServer,
authHeader: authHeader,
proxyAuthHeader: proxyAuthHeader,
sendFullURL: sendFullURL,
}, nil
}
@ -159,11 +179,18 @@ func (c *Client) getAPIResponseWithParamsAndClient(client *fasthttp.HostClient,
var u fasthttp.URI
u.Update(requestURL)
var req fasthttp.Request
req.SetRequestURIBytes(u.RequestURI())
req.SetHost(c.hostPort)
if c.sendFullURL {
req.SetRequestURIBytes(u.FullURI())
} else {
req.SetRequestURIBytes(u.RequestURI())
req.SetHostBytes(u.Host())
}
req.Header.Set("Accept-Encoding", "gzip")
if c.ac != nil && c.ac.Authorization != "" {
req.Header.Set("Authorization", c.ac.Authorization)
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
if c.proxyAuthHeader != "" {
req.Header.Set("Proxy-Authorization", c.proxyAuthHeader)
}
var resp fasthttp.Response

View file

@ -40,6 +40,16 @@ func (u *URL) URL() *url.URL {
return u.url
}
// IsHTTPOrHTTPS returns true if u is http or https
func (u *URL) IsHTTPOrHTTPS() bool {
pu := u.URL()
if pu == nil {
return false
}
scheme := u.url.Scheme
return scheme == "http" || scheme == "https"
}
// String returns string representation of u.
func (u *URL) String() string {
pu := u.URL()
@ -49,6 +59,23 @@ func (u *URL) String() string {
return pu.String()
}
// GetAuthHeader returns Proxy-Authorization auth header for the given u and ac.
func (u *URL) GetAuthHeader(ac *promauth.Config) string {
authHeader := ""
if ac != nil {
authHeader = ac.Authorization
}
if u == nil || u.url == nil {
return authHeader
}
pu := u.url
if pu.User != nil && len(pu.User.Username()) > 0 {
userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String()))
authHeader = "Basic " + userPasswordEncoded
}
return authHeader
}
// MarshalYAML implements yaml.Marshaler interface.
func (u *URL) MarshalYAML() (interface{}, error) {
if u.url == nil {
@ -82,14 +109,7 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) {
}
isTLS := pu.Scheme == "https"
proxyAddr := addMissingPort(pu.Host, isTLS)
var authHeader string
if ac != nil {
authHeader = ac.Authorization
}
if pu.User != nil && len(pu.User.Username()) > 0 {
userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String()))
authHeader = "Basic " + userPasswordEncoded
}
authHeader := u.GetAuthHeader(ac)
if authHeader != "" {
authHeader = "Proxy-Authorization: " + authHeader + "\r\n"
}