mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape: code prettifying for 8dd03ecf19
This commit is contained in:
parent
8dd03ecf19
commit
820669da69
16 changed files with 165 additions and 174 deletions
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
# tip
|
# tip
|
||||||
|
|
||||||
|
* FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503
|
||||||
* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month.
|
* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month.
|
||||||
* FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`.
|
* FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`.
|
||||||
|
|
||||||
|
|
|
@ -1,124 +0,0 @@
|
||||||
package netutil
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"encoding/base64"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/fasthttp"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ProxyURL implements marshal interfaces for url.URL.
|
|
||||||
type ProxyURL struct {
|
|
||||||
url *url.URL
|
|
||||||
}
|
|
||||||
|
|
||||||
// URL returns *url.URL.
|
|
||||||
func (pu ProxyURL) URL() *url.URL {
|
|
||||||
return pu.url
|
|
||||||
}
|
|
||||||
|
|
||||||
// String implements String interface.
|
|
||||||
func (pu ProxyURL) String() string {
|
|
||||||
if pu.url == nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return pu.url.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarshalYAML implements yaml.Marshaler interface.
|
|
||||||
func (pu ProxyURL) MarshalYAML() (interface{}, error) {
|
|
||||||
if pu.url == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return pu.url.String(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnmarshalYAML implements yaml.Unmarshaler interface.
|
|
||||||
func (pu *ProxyURL) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|
||||||
var s string
|
|
||||||
if err := unmarshal(&s); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
parsedURL, err := url.Parse(s)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed parse proxy_url=%q as *url.URL, err=%w", s, err)
|
|
||||||
}
|
|
||||||
pu.url = parsedURL
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetProxyDialFunc returns dial proxy func for the given proxy url.
|
|
||||||
// currently only http based proxy is supported.
|
|
||||||
func GetProxyDialFunc(proxyURL *url.URL) (fasthttp.DialFunc, error) {
|
|
||||||
if strings.HasPrefix(proxyURL.Scheme, "http") {
|
|
||||||
return httpProxy(proxyURL.Host, MakeBasicAuthHeader(nil, proxyURL)), nil
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("unknown scheme=%q for proxy_url: %q, must be http or https", proxyURL.Scheme, proxyURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
func httpProxy(proxyAddr string, auth []byte) fasthttp.DialFunc {
|
|
||||||
return func(addr string) (net.Conn, error) {
|
|
||||||
var (
|
|
||||||
conn net.Conn
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
if TCP6Enabled() {
|
|
||||||
conn, err = fasthttp.DialDualStack(proxyAddr)
|
|
||||||
} else {
|
|
||||||
conn, err = fasthttp.Dial(proxyAddr)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot connect to the proxy=%q,err=%w", proxyAddr, err)
|
|
||||||
}
|
|
||||||
if err := MakeProxyConnectCall(conn, []byte(addr), auth); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MakeBasicAuthHeader encodes and writes basic auth http header from url into given dst and returns it.
|
|
||||||
func MakeBasicAuthHeader(dst []byte, url *url.URL) []byte {
|
|
||||||
if url == nil || url.User == nil {
|
|
||||||
return dst
|
|
||||||
}
|
|
||||||
if len(url.User.Username()) > 0 {
|
|
||||||
dst = append(dst, "Proxy-Authorization: Basic "...)
|
|
||||||
dst = append(dst, base64.StdEncoding.EncodeToString([]byte(url.User.String()))...)
|
|
||||||
}
|
|
||||||
return dst
|
|
||||||
}
|
|
||||||
|
|
||||||
// MakeProxyConnectCall execute CONNECT method to proxy with given destination address.
|
|
||||||
func MakeProxyConnectCall(conn net.Conn, dstAddr, auth []byte) error {
|
|
||||||
conReq := make([]byte, 0, 10)
|
|
||||||
conReq = append(conReq, []byte("CONNECT ")...)
|
|
||||||
conReq = append(conReq, dstAddr...)
|
|
||||||
conReq = append(conReq, []byte(" HTTP/1.1\r\n")...)
|
|
||||||
if len(auth) > 0 {
|
|
||||||
conReq = append(conReq, auth...)
|
|
||||||
conReq = append(conReq, []byte("\r\n")...)
|
|
||||||
}
|
|
||||||
conReq = append(conReq, []byte("\r\n")...)
|
|
||||||
|
|
||||||
res := fasthttp.AcquireResponse()
|
|
||||||
defer fasthttp.ReleaseResponse(res)
|
|
||||||
res.SkipBody = true
|
|
||||||
if _, err := conn.Write(conReq); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := res.Read(bufio.NewReader(conn)); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return fmt.Errorf("cannot read CONNECT response from proxy, err=%w", err)
|
|
||||||
}
|
|
||||||
if res.Header.StatusCode() != 200 {
|
|
||||||
_ = conn.Close()
|
|
||||||
return fmt.Errorf("unexpected proxy response status code, want: 200, get: %d", res.Header.StatusCode())
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -66,10 +67,14 @@ func newClient(sw *ScrapeWork) *client {
|
||||||
host += ":443"
|
host += ":443"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
dialFunc, err := newStatDialFunc(sw.ProxyURL, tlsCfg)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot create dial func: %s", err)
|
||||||
|
}
|
||||||
hc := &fasthttp.HostClient{
|
hc := &fasthttp.HostClient{
|
||||||
Addr: host,
|
Addr: host,
|
||||||
Name: "vm_promscrape",
|
Name: "vm_promscrape",
|
||||||
Dial: getDialStatConn(sw.ProxyURL),
|
Dial: dialFunc,
|
||||||
IsTLS: isTLS,
|
IsTLS: isTLS,
|
||||||
TLSConfig: tlsCfg,
|
TLSConfig: tlsCfg,
|
||||||
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
|
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
|
||||||
|
@ -80,10 +85,14 @@ func newClient(sw *ScrapeWork) *client {
|
||||||
}
|
}
|
||||||
var sc *http.Client
|
var sc *http.Client
|
||||||
if *streamParse || sw.StreamParse {
|
if *streamParse || sw.StreamParse {
|
||||||
|
var proxy func(*http.Request) (*url.URL, error)
|
||||||
|
if proxyURL := sw.ProxyURL.URL(); proxyURL != nil {
|
||||||
|
proxy = http.ProxyURL(proxyURL)
|
||||||
|
}
|
||||||
sc = &http.Client{
|
sc = &http.Client{
|
||||||
Transport: &http.Transport{
|
Transport: &http.Transport{
|
||||||
TLSClientConfig: tlsCfg,
|
TLSClientConfig: tlsCfg,
|
||||||
Proxy: http.ProxyURL(sw.ProxyURL),
|
Proxy: proxy,
|
||||||
TLSHandshakeTimeout: 10 * time.Second,
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
IdleConnTimeout: 2 * sw.ScrapeInterval,
|
IdleConnTimeout: 2 * sw.ScrapeInterval,
|
||||||
DisableCompression: *disableCompression || sw.DisableCompression,
|
DisableCompression: *disableCompression || sw.DisableCompression,
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
|
@ -24,6 +23,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ type ScrapeConfig struct {
|
||||||
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
||||||
BearerToken string `yaml:"bearer_token,omitempty"`
|
BearerToken string `yaml:"bearer_token,omitempty"`
|
||||||
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
||||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
|
||||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||||
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
|
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
|
||||||
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"`
|
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"`
|
||||||
|
@ -497,7 +497,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
|
||||||
metricsPath: metricsPath,
|
metricsPath: metricsPath,
|
||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
params: params,
|
params: params,
|
||||||
proxyURL: sc.ProxyURL.URL(),
|
proxyURL: sc.ProxyURL,
|
||||||
authConfig: ac,
|
authConfig: ac,
|
||||||
honorLabels: honorLabels,
|
honorLabels: honorLabels,
|
||||||
honorTimestamps: honorTimestamps,
|
honorTimestamps: honorTimestamps,
|
||||||
|
@ -519,7 +519,7 @@ type scrapeWorkConfig struct {
|
||||||
metricsPath string
|
metricsPath string
|
||||||
scheme string
|
scheme string
|
||||||
params map[string][]string
|
params map[string][]string
|
||||||
proxyURL *url.URL
|
proxyURL proxy.URL
|
||||||
authConfig *promauth.Config
|
authConfig *promauth.Config
|
||||||
honorLabels bool
|
honorLabels bool
|
||||||
honorTimestamps bool
|
honorTimestamps bool
|
||||||
|
|
|
@ -58,7 +58,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
}
|
}
|
||||||
apiServer = scheme + "://" + apiServer
|
apiServer = scheme + "://" + apiServer
|
||||||
}
|
}
|
||||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL())
|
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,8 @@ package consul
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SDConfig represents service discovery config for Consul.
|
// SDConfig represents service discovery config for Consul.
|
||||||
|
@ -17,7 +17,7 @@ type SDConfig struct {
|
||||||
Scheme string `yaml:"scheme,omitempty"`
|
Scheme string `yaml:"scheme,omitempty"`
|
||||||
Username string `yaml:"username"`
|
Username string `yaml:"username"`
|
||||||
Password string `yaml:"password"`
|
Password string `yaml:"password"`
|
||||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
|
||||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||||
Services []string `yaml:"services,omitempty"`
|
Services []string `yaml:"services,omitempty"`
|
||||||
Tags []string `yaml:"tags,omitempty"`
|
Tags []string `yaml:"tags,omitempty"`
|
||||||
|
|
|
@ -39,7 +39,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL.URL())
|
client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err)
|
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,8 @@ package dockerswarm
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SDConfig represents docker swarm service discovery configuration
|
// SDConfig represents docker swarm service discovery configuration
|
||||||
|
@ -16,7 +16,7 @@ type SDConfig struct {
|
||||||
Port int `yaml:"port,omitempty"`
|
Port int `yaml:"port,omitempty"`
|
||||||
Filters []Filter `yaml:"filters,omitempty"`
|
Filters []Filter `yaml:"filters,omitempty"`
|
||||||
|
|
||||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
|
||||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||||
// refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option
|
// refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option
|
||||||
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
||||||
|
|
|
@ -43,7 +43,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
}
|
}
|
||||||
apiServer = scheme + "://" + apiServer
|
apiServer = scheme + "://" + apiServer
|
||||||
}
|
}
|
||||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL())
|
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
const appsAPIPath = "/apps"
|
const appsAPIPath = "/apps"
|
||||||
|
@ -22,7 +22,7 @@ type SDConfig struct {
|
||||||
Scheme string `yaml:"scheme,omitempty"`
|
Scheme string `yaml:"scheme,omitempty"`
|
||||||
Username string `yaml:"username"`
|
Username string `yaml:"username"`
|
||||||
Password string `yaml:"password"`
|
Password string `yaml:"password"`
|
||||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
|
||||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||||
// RefreshInterval time.Duration `yaml:"refresh_interval"`
|
// RefreshInterval time.Duration `yaml:"refresh_interval"`
|
||||||
// refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option.
|
// refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option.
|
||||||
|
|
|
@ -56,7 +56,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
}
|
}
|
||||||
ac = acNew
|
ac = acNew
|
||||||
}
|
}
|
||||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL())
|
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,8 @@ package kubernetes
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SDConfig represents kubernetes-based service discovery config.
|
// SDConfig represents kubernetes-based service discovery config.
|
||||||
|
@ -16,7 +16,7 @@ type SDConfig struct {
|
||||||
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
|
||||||
BearerToken string `yaml:"bearer_token,omitempty"`
|
BearerToken string `yaml:"bearer_token,omitempty"`
|
||||||
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
|
||||||
ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"`
|
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
|
||||||
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
|
||||||
Namespaces Namespaces `yaml:"namespaces,omitempty"`
|
Namespaces Namespaces `yaml:"namespaces,omitempty"`
|
||||||
Selectors []Selector `yaml:"selectors,omitempty"`
|
Selectors []Selector `yaml:"selectors,omitempty"`
|
||||||
|
|
|
@ -6,13 +6,12 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||||
"github.com/VictoriaMetrics/fasthttp"
|
"github.com/VictoriaMetrics/fasthttp"
|
||||||
)
|
)
|
||||||
|
@ -46,7 +45,7 @@ type Client struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient returns new Client for the given apiServer and the given ac.
|
// NewClient returns new Client for the given apiServer and the given ac.
|
||||||
func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Client, error) {
|
func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Client, error) {
|
||||||
var (
|
var (
|
||||||
dialFunc fasthttp.DialFunc
|
dialFunc fasthttp.DialFunc
|
||||||
tlsCfg *tls.Config
|
tlsCfg *tls.Config
|
||||||
|
@ -63,12 +62,6 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien
|
||||||
return net.Dial("unix", dialAddr)
|
return net.Dial("unix", dialAddr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if proxyURL != nil {
|
|
||||||
dialFunc, err = netutil.GetProxyDialFunc(proxyURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
hostPort := string(u.Host())
|
hostPort := string(u.Host())
|
||||||
isTLS := string(u.Scheme()) == "https"
|
isTLS := string(u.Scheme()) == "https"
|
||||||
|
@ -82,10 +75,15 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien
|
||||||
}
|
}
|
||||||
hostPort = net.JoinHostPort(hostPort, port)
|
hostPort = net.JoinHostPort(hostPort, port)
|
||||||
}
|
}
|
||||||
|
if dialFunc == nil {
|
||||||
|
dialFunc, err = proxyURL.NewDialFunc(tlsCfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
hc := &fasthttp.HostClient{
|
hc := &fasthttp.HostClient{
|
||||||
Addr: hostPort,
|
Addr: hostPort,
|
||||||
Name: "vm_promscrape/discovery",
|
Name: "vm_promscrape/discovery",
|
||||||
DialDualStack: netutil.TCP6Enabled(),
|
|
||||||
IsTLS: isTLS,
|
IsTLS: isTLS,
|
||||||
TLSConfig: tlsCfg,
|
TLSConfig: tlsCfg,
|
||||||
ReadTimeout: time.Minute,
|
ReadTimeout: time.Minute,
|
||||||
|
@ -97,7 +95,6 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien
|
||||||
blockingClient := &fasthttp.HostClient{
|
blockingClient := &fasthttp.HostClient{
|
||||||
Addr: hostPort,
|
Addr: hostPort,
|
||||||
Name: "vm_promscrape/discovery",
|
Name: "vm_promscrape/discovery",
|
||||||
DialDualStack: netutil.TCP6Enabled(),
|
|
||||||
IsTLS: isTLS,
|
IsTLS: isTLS,
|
||||||
TLSConfig: tlsCfg,
|
TLSConfig: tlsCfg,
|
||||||
ReadTimeout: BlockingClientReadTimeout,
|
ReadTimeout: BlockingClientReadTimeout,
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/bits"
|
"math/bits"
|
||||||
"net/url"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -18,6 +17,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
xxhash "github.com/cespare/xxhash/v2"
|
xxhash "github.com/cespare/xxhash/v2"
|
||||||
)
|
)
|
||||||
|
@ -72,7 +72,7 @@ type ScrapeWork struct {
|
||||||
AuthConfig *promauth.Config
|
AuthConfig *promauth.Config
|
||||||
|
|
||||||
// ProxyURL HTTP proxy url
|
// ProxyURL HTTP proxy url
|
||||||
ProxyURL *url.URL
|
ProxyURL proxy.URL
|
||||||
|
|
||||||
// Optional `metric_relabel_configs`.
|
// Optional `metric_relabel_configs`.
|
||||||
MetricRelabelConfigs []promrelabel.ParsedRelabelConfig
|
MetricRelabelConfigs []promrelabel.ParsedRelabelConfig
|
||||||
|
|
|
@ -2,14 +2,15 @@ package promscrape
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||||
"github.com/VictoriaMetrics/fasthttp"
|
"github.com/VictoriaMetrics/fasthttp"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
@ -48,18 +49,13 @@ var (
|
||||||
stdDialerOnce sync.Once
|
stdDialerOnce sync.Once
|
||||||
)
|
)
|
||||||
|
|
||||||
func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc {
|
func newStatDialFunc(proxyURL proxy.URL, tlsConfig *tls.Config) (fasthttp.DialFunc, error) {
|
||||||
auth := netutil.MakeBasicAuthHeader(nil, proxyURL)
|
dialFunc, err := proxyURL.NewDialFunc(tlsConfig)
|
||||||
return func(addr string) (conn net.Conn, err error) {
|
if err != nil {
|
||||||
dialAddr := addr
|
return nil, err
|
||||||
if proxyURL != nil {
|
|
||||||
dialAddr = proxyURL.Host
|
|
||||||
}
|
|
||||||
if netutil.TCP6Enabled() {
|
|
||||||
conn, err = fasthttp.DialDualStack(dialAddr)
|
|
||||||
} else {
|
|
||||||
conn, err = fasthttp.Dial(dialAddr)
|
|
||||||
}
|
}
|
||||||
|
statDialFunc := func(addr string) (net.Conn, error) {
|
||||||
|
conn, err := dialFunc(addr)
|
||||||
dialsTotal.Inc()
|
dialsTotal.Inc()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
dialErrors.Inc()
|
dialErrors.Inc()
|
||||||
|
@ -69,17 +65,12 @@ func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
conns.Inc()
|
conns.Inc()
|
||||||
if proxyURL != nil {
|
|
||||||
if err := netutil.MakeProxyConnectCall(conn, []byte(addr), auth); err != nil {
|
|
||||||
_ = conn.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sc := &statConn{
|
sc := &statConn{
|
||||||
Conn: conn,
|
Conn: conn,
|
||||||
}
|
}
|
||||||
return sc, nil
|
return sc, nil
|
||||||
}
|
}
|
||||||
|
return statDialFunc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
117
lib/proxy/proxy.go
Normal file
117
lib/proxy/proxy.go
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/base64"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
|
"github.com/VictoriaMetrics/fasthttp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// URL implements YAML.Marshaler and yaml.Unmarshaler interfaces for url.URL.
|
||||||
|
type URL struct {
|
||||||
|
url *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
// URL return the underlying url.
|
||||||
|
func (u *URL) URL() *url.URL {
|
||||||
|
if u == nil || u.url == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return u.url
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalYAML implements yaml.Marshaler interface.
|
||||||
|
func (u *URL) MarshalYAML() (interface{}, error) {
|
||||||
|
if u.url == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return u.url.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalYAML implements yaml.Unmarshaler interface.
|
||||||
|
func (u *URL) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||||
|
var s string
|
||||||
|
if err := unmarshal(&s); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
parsedURL, err := url.Parse(s)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot parse proxy_url=%q as *url.URL: %w", s, err)
|
||||||
|
}
|
||||||
|
u.url = parsedURL
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDialFunc returns dial func for the given pu and tlsConfig.
|
||||||
|
func (u *URL) NewDialFunc(tlsConfig *tls.Config) (fasthttp.DialFunc, error) {
|
||||||
|
if u == nil || u.url == nil {
|
||||||
|
return defaultDialFunc, nil
|
||||||
|
}
|
||||||
|
pu := u.url
|
||||||
|
if pu.Scheme != "http" && pu.Scheme != "https" {
|
||||||
|
return nil, fmt.Errorf("unknown scheme=%q for proxy_url=%q, must be http or https", pu.Scheme, pu)
|
||||||
|
}
|
||||||
|
var authHeader string
|
||||||
|
if pu.User != nil && len(pu.User.Username()) > 0 {
|
||||||
|
userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String()))
|
||||||
|
authHeader = "Proxy-Authorization: Basic " + userPasswordEncoded + "\r\n"
|
||||||
|
}
|
||||||
|
dialFunc := func(addr string) (net.Conn, error) {
|
||||||
|
proxyConn, err := defaultDialFunc(pu.Host)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot connect to proxy %q: %w", pu, err)
|
||||||
|
}
|
||||||
|
if pu.Scheme == "https" {
|
||||||
|
proxyConn = tls.Client(proxyConn, tlsConfig)
|
||||||
|
}
|
||||||
|
conn, err := sendConnectRequest(proxyConn, addr, authHeader)
|
||||||
|
if err != nil {
|
||||||
|
_ = proxyConn.Close()
|
||||||
|
return nil, fmt.Errorf("error when sending CONNECT request to proxy %q: %w", pu, err)
|
||||||
|
}
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
return dialFunc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultDialFunc(addr string) (net.Conn, error) {
|
||||||
|
if netutil.TCP6Enabled() {
|
||||||
|
return fasthttp.DialDualStack(addr)
|
||||||
|
}
|
||||||
|
return fasthttp.Dial(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendConnectRequest sends CONNECT request to proxyConn for the given addr and authHeader and returns the established connection to dstAddr.
|
||||||
|
func sendConnectRequest(proxyConn net.Conn, dstAddr, authHeader string) (net.Conn, error) {
|
||||||
|
req := "CONNECT " + dstAddr + " HTTP/1.1\r\nHost: " + dstAddr + "\r\n" + authHeader + "\r\n"
|
||||||
|
if _, err := proxyConn.Write([]byte(req)); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot send CONNECT request for dstAddr=%q: %w", dstAddr, err)
|
||||||
|
}
|
||||||
|
var res fasthttp.Response
|
||||||
|
res.SkipBody = true
|
||||||
|
conn := &bufferedReaderConn{
|
||||||
|
br: bufio.NewReader(proxyConn),
|
||||||
|
Conn: proxyConn,
|
||||||
|
}
|
||||||
|
if err := res.Read(conn.br); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot read CONNECT response for dstAddr=%q: %w", dstAddr, err)
|
||||||
|
}
|
||||||
|
if statusCode := res.Header.StatusCode(); statusCode != 200 {
|
||||||
|
return nil, fmt.Errorf("unexpected status code received: %d; want: 200", statusCode)
|
||||||
|
}
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type bufferedReaderConn struct {
|
||||||
|
net.Conn
|
||||||
|
br *bufio.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (brc *bufferedReaderConn) Read(p []byte) (int, error) {
|
||||||
|
return brc.br.Read(p)
|
||||||
|
}
|
Loading…
Reference in a new issue