mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
adds proxy_url support, (#980)
* adds proxy_url support, adds proxy_url to the dockerswarm, eureka, kubernetes and consul service discovery, adds proxy_url to the scrape_config for targets scrapping, http based proxy is supported atm, https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503 * fixes imports
This commit is contained in:
parent
9e4ed5e591
commit
8dd03ecf19
14 changed files with 192 additions and 29 deletions
124
lib/netutil/proxy.go
Normal file
124
lib/netutil/proxy.go
Normal file
|
@ -0,0 +1,124 @@
|
|||
package netutil
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/fasthttp"
|
||||
)
|
||||
|
||||
// ProxyURL implements marshal interfaces for url.URL.
|
||||
type ProxyURL struct {
|
||||
url *url.URL
|
||||
}
|
||||
|
||||
// URL returns *url.URL.
|
||||
func (pu ProxyURL) URL() *url.URL {
|
||||
return pu.url
|
||||
}
|
||||
|
||||
// String implements String interface.
|
||||
func (pu ProxyURL) String() string {
|
||||
if pu.url == nil {
|
||||
return ""
|
||||
}
|
||||
return pu.url.String()
|
||||
}
|
||||
|
||||
// MarshalYAML implements yaml.Marshaler interface.
|
||||
func (pu ProxyURL) MarshalYAML() (interface{}, error) {
|
||||
if pu.url == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return pu.url.String(), nil
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements yaml.Unmarshaler interface.
|
||||
func (pu *ProxyURL) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
var s string
|
||||
if err := unmarshal(&s); err != nil {
|
||||
return err
|
||||
}
|
||||
parsedURL, err := url.Parse(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed parse proxy_url=%q as *url.URL, err=%w", s, err)
|
||||
}
|
||||
pu.url = parsedURL
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetProxyDialFunc returns dial proxy func for the given proxy url.
|
||||
// currently only http based proxy is supported.
|
||||
func GetProxyDialFunc(proxyURL *url.URL) (fasthttp.DialFunc, error) {
|
||||
if strings.HasPrefix(proxyURL.Scheme, "http") {
|
||||
return httpProxy(proxyURL.Host, MakeBasicAuthHeader(nil, proxyURL)), nil
|
||||
}
|
||||
return nil, fmt.Errorf("unknown scheme=%q for proxy_url: %q, must be http or https", proxyURL.Scheme, proxyURL)
|
||||
}
|
||||
|
||||
func httpProxy(proxyAddr string, auth []byte) fasthttp.DialFunc {
|
||||
return func(addr string) (net.Conn, error) {
|
||||
var (
|
||||
conn net.Conn
|
||||
err error
|
||||
)
|
||||
if TCP6Enabled() {
|
||||
conn, err = fasthttp.DialDualStack(proxyAddr)
|
||||
} else {
|
||||
conn, err = fasthttp.Dial(proxyAddr)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot connect to the proxy=%q,err=%w", proxyAddr, err)
|
||||
}
|
||||
if err := MakeProxyConnectCall(conn, []byte(addr), auth); err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
}
|
||||
|
||||
// MakeBasicAuthHeader encodes and writes basic auth http header from url into given dst and returns it.
|
||||
func MakeBasicAuthHeader(dst []byte, url *url.URL) []byte {
|
||||
if url == nil || url.User == nil {
|
||||
return dst
|
||||
}
|
||||
if len(url.User.Username()) > 0 {
|
||||
dst = append(dst, "Proxy-Authorization: Basic "...)
|
||||
dst = append(dst, base64.StdEncoding.EncodeToString([]byte(url.User.String()))...)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// MakeProxyConnectCall execute CONNECT method to proxy with given destination address.
|
||||
func MakeProxyConnectCall(conn net.Conn, dstAddr, auth []byte) error {
|
||||
conReq := make([]byte, 0, 10)
|
||||
conReq = append(conReq, []byte("CONNECT ")...)
|
||||
conReq = append(conReq, dstAddr...)
|
||||
conReq = append(conReq, []byte(" HTTP/1.1\r\n")...)
|
||||
if len(auth) > 0 {
|
||||
conReq = append(conReq, auth...)
|
||||
conReq = append(conReq, []byte("\r\n")...)
|
||||
}
|
||||
conReq = append(conReq, []byte("\r\n")...)
|
||||
|
||||
res := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
res.SkipBody = true
|
||||
if _, err := conn.Write(conReq); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := res.Read(bufio.NewReader(conn)); err != nil {
|
||||
_ = conn.Close()
|
||||
return fmt.Errorf("cannot read CONNECT response from proxy, err=%w", err)
|
||||
}
|
||||
if res.Header.StatusCode() != 200 {
|
||||
_ = conn.Close()
|
||||
return fmt.Errorf("unexpected proxy response status code, want: 200, get: %d", res.Header.StatusCode())
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -69,7 +69,7 @@ func newClient(sw *ScrapeWork) *client {
|
|||
hc := &fasthttp.HostClient{
|
||||
Addr: host,
|
||||
Name: "vm_promscrape",
|
||||
Dial: statDial,
|
||||
Dial: getDialStatConn(sw.ProxyURL),
|
||||
IsTLS: isTLS,
|
||||
TLSConfig: tlsCfg,
|
||||
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
|
||||
|
@ -83,6 +83,7 @@ func newClient(sw *ScrapeWork) *client {
|
|||
sc = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
Proxy: http.ProxyURL(sw.ProxyURL),
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
IdleConnTimeout: 2 * sw.ScrapeInterval,
|
||||
DisableCompression: *disableCompression || sw.DisableCompression,
|
||||
|
@ -93,9 +94,8 @@ func newClient(sw *ScrapeWork) *client {
|
|||
}
|
||||
}
|
||||
return &client{
|
||||
hc: hc,
|
||||
sc: sc,
|
||||
|
||||
hc: hc,
|
||||
sc: sc,
|
||||
scrapeURL: sw.ScrapeURL,
|
||||
host: host,
|
||||
requestURI: requestURI,
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
|
@ -71,6 +72,7 @@ type ScrapeConfig struct {
|
|||
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
||||
BearerToken string `yaml:"bearer_token,omitempty"`
|
||||
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
|
||||
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"`
|
||||
|
@ -495,6 +497,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
|
|||
metricsPath: metricsPath,
|
||||
scheme: scheme,
|
||||
params: params,
|
||||
proxyURL: sc.ProxyURL.URL(),
|
||||
authConfig: ac,
|
||||
honorLabels: honorLabels,
|
||||
honorTimestamps: honorTimestamps,
|
||||
|
@ -516,6 +519,7 @@ type scrapeWorkConfig struct {
|
|||
metricsPath string
|
||||
scheme string
|
||||
params map[string][]string
|
||||
proxyURL *url.URL
|
||||
authConfig *promauth.Config
|
||||
honorLabels bool
|
||||
honorTimestamps bool
|
||||
|
@ -750,6 +754,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
|
|||
HonorTimestamps: swc.honorTimestamps,
|
||||
OriginalLabels: originalLabels,
|
||||
Labels: labels,
|
||||
ProxyURL: swc.proxyURL,
|
||||
AuthConfig: swc.authConfig,
|
||||
MetricRelabelConfigs: swc.metricRelabelConfigs,
|
||||
SampleLimit: swc.sampleLimit,
|
||||
|
|
|
@ -58,7 +58,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
}
|
||||
apiServer = scheme + "://" + apiServer
|
||||
}
|
||||
client, err := discoveryutils.NewClient(apiServer, ac)
|
||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package consul
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
)
|
||||
|
||||
|
@ -16,6 +17,7 @@ type SDConfig struct {
|
|||
Scheme string `yaml:"scheme,omitempty"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||
Services []string `yaml:"services,omitempty"`
|
||||
Tags []string `yaml:"tags,omitempty"`
|
||||
|
|
|
@ -34,11 +34,12 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
port: sdc.Port,
|
||||
filtersQueryArg: getFiltersQueryArg(sdc.Filters),
|
||||
}
|
||||
|
||||
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := discoveryutils.NewClient(sdc.Host, ac)
|
||||
client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL.URL())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package dockerswarm
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
)
|
||||
|
||||
|
@ -15,7 +16,7 @@ type SDConfig struct {
|
|||
Port int `yaml:"port,omitempty"`
|
||||
Filters []Filter `yaml:"filters,omitempty"`
|
||||
|
||||
// TODO: add support for proxy_url
|
||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||
// refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option
|
||||
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
||||
|
|
|
@ -43,7 +43,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
}
|
||||
apiServer = scheme + "://" + apiServer
|
||||
}
|
||||
client, err := discoveryutils.NewClient(apiServer, ac)
|
||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
const appsAPIPath = "/apps"
|
||||
|
@ -22,6 +22,7 @@ type SDConfig struct {
|
|||
Scheme string `yaml:"scheme,omitempty"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||
// RefreshInterval time.Duration `yaml:"refresh_interval"`
|
||||
// refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option.
|
||||
|
|
|
@ -56,7 +56,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
}
|
||||
ac = acNew
|
||||
}
|
||||
client, err := discoveryutils.NewClient(apiServer, ac)
|
||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package kubernetes
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
)
|
||||
|
||||
|
@ -15,6 +16,7 @@ type SDConfig struct {
|
|||
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
||||
BearerToken string `yaml:"bearer_token,omitempty"`
|
||||
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||
Namespaces Namespaces `yaml:"namespaces,omitempty"`
|
||||
Selectors []Selector `yaml:"selectors,omitempty"`
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -45,11 +46,12 @@ type Client struct {
|
|||
}
|
||||
|
||||
// NewClient returns new Client for the given apiServer and the given ac.
|
||||
func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
|
||||
func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Client, error) {
|
||||
var (
|
||||
dialFunc fasthttp.DialFunc
|
||||
tlsCfg *tls.Config
|
||||
u fasthttp.URI
|
||||
err error
|
||||
)
|
||||
u.Update(apiServer)
|
||||
|
||||
|
@ -61,6 +63,13 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
|
|||
return net.Dial("unix", dialAddr)
|
||||
}
|
||||
}
|
||||
if proxyURL != nil {
|
||||
dialFunc, err = netutil.GetProxyDialFunc(proxyURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
hostPort := string(u.Host())
|
||||
isTLS := string(u.Scheme()) == "https"
|
||||
if isTLS && ac != nil {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"math/bits"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -70,6 +71,9 @@ type ScrapeWork struct {
|
|||
// Auth config
|
||||
AuthConfig *promauth.Config
|
||||
|
||||
// ProxyURL HTTP proxy url
|
||||
ProxyURL *url.URL
|
||||
|
||||
// Optional `metric_relabel_configs`.
|
||||
MetricRelabelConfigs []promrelabel.ParsedRelabelConfig
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -47,25 +48,38 @@ var (
|
|||
stdDialerOnce sync.Once
|
||||
)
|
||||
|
||||
func statDial(addr string) (conn net.Conn, err error) {
|
||||
if netutil.TCP6Enabled() {
|
||||
conn, err = fasthttp.DialDualStack(addr)
|
||||
} else {
|
||||
conn, err = fasthttp.Dial(addr)
|
||||
}
|
||||
dialsTotal.Inc()
|
||||
if err != nil {
|
||||
dialErrors.Inc()
|
||||
if !netutil.TCP6Enabled() {
|
||||
err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err)
|
||||
func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc {
|
||||
auth := netutil.MakeBasicAuthHeader(nil, proxyURL)
|
||||
return func(addr string) (conn net.Conn, err error) {
|
||||
dialAddr := addr
|
||||
if proxyURL != nil {
|
||||
dialAddr = proxyURL.Host
|
||||
}
|
||||
return nil, err
|
||||
if netutil.TCP6Enabled() {
|
||||
conn, err = fasthttp.DialDualStack(dialAddr)
|
||||
} else {
|
||||
conn, err = fasthttp.Dial(dialAddr)
|
||||
}
|
||||
dialsTotal.Inc()
|
||||
if err != nil {
|
||||
dialErrors.Inc()
|
||||
if !netutil.TCP6Enabled() {
|
||||
err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
conns.Inc()
|
||||
if proxyURL != nil {
|
||||
if err := netutil.MakeProxyConnectCall(conn, []byte(addr), auth); err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
sc := &statConn{
|
||||
Conn: conn,
|
||||
}
|
||||
return sc, nil
|
||||
}
|
||||
conns.Inc()
|
||||
sc := &statConn{
|
||||
Conn: conn,
|
||||
}
|
||||
return sc, nil
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
Loading…
Reference in a new issue