Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-12-24 11:49:03 +02:00
commit a0ea5777f0
23 changed files with 293 additions and 42 deletions

View file

@ -175,9 +175,9 @@ The shortlist of configuration flags is the following:
-datasource.basicAuth.username string
Optional basic auth username for -datasource.url
-datasource.lookback duration
Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query.
Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query.
-datasource.maxIdleConnections int
Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100)
Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100)
-datasource.tlsCAFile string
Optional path to TLS CA file to use for verifying connections to -datasource.url. By default system CA is used
-datasource.tlsCertFile string
@ -190,6 +190,8 @@ The shortlist of configuration flags is the following:
Optional TLS server name to use for connections to -datasource.url. By default the server name from -datasource.url is used
-datasource.url string
Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428
-dryRun -rule
Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP is used
-envflag.enable
@ -224,14 +226,18 @@ The shortlist of configuration flags is the following:
Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
-httpListenAddr string
Address to listen for http connections (default ":8880")
-loggerDisableTimestamps
Whether to disable writing timestamps in logs
-loggerErrorsPerSecondLimit int
Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit (default 10)
Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit
-loggerFormat string
Format for logs. Possible values: default, json (default "default")
-loggerLevel string
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
-loggerOutput string
Output for the logs. Supported values: stderr, stdout (default "stderr")
-loggerWarnsPerSecondLimit int
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero value disables the rate limit
-memory.allowedBytes value
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to non-zero value. Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. Too high value may evict too much data from OS page cache, which will result in higher disk IO usage
Supports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB (default 0)
@ -251,8 +257,9 @@ The shortlist of configuration flags is the following:
-notifier.tlsCertFile array
Optional path to client-side TLS certificate file to use when connecting to -notifier.url
Supports array of values separated by comma or specified via multiple flags.
-notifier.tlsInsecureSkipVerify
-notifier.tlsInsecureSkipVerify array
Whether to skip tls verification when connecting to -notifier.url
Supports array of values separated by comma or specified via multiple flags.
-notifier.tlsKeyFile array
Optional path to client-side TLS certificate key to use when connecting to -notifier.url
Supports array of values separated by comma or specified via multiple flags.

View file

@ -1,7 +1,6 @@
package notifier
import (
"flag"
"fmt"
"net/http"
@ -14,7 +13,7 @@ var (
basicAuthUsername = flagutil.NewArray("notifier.basicAuth.username", "Optional basic auth username for -datasource.url")
basicAuthPassword = flagutil.NewArray("notifier.basicAuth.password", "Optional basic auth password for -datasource.url")
tlsInsecureSkipVerify = flag.Bool("notifier.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -notifier.url")
tlsInsecureSkipVerify = flagutil.NewArrayBool("notifier.tlsInsecureSkipVerify", "Whether to skip tls verification when connecting to -notifier.url")
tlsCertFile = flagutil.NewArray("notifier.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url")
tlsKeyFile = flagutil.NewArray("notifier.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url")
tlsCAFile = flagutil.NewArray("notifier.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+
@ -33,7 +32,7 @@ func Init(gen AlertURLGenerator) ([]Notifier, error) {
for i, addr := range *addrs {
cert, key := tlsCertFile.GetOptionalArg(i), tlsKeyFile.GetOptionalArg(i)
ca, serverName := tlsCAFile.GetOptionalArg(i), tlsServerName.GetOptionalArg(i)
tr, err := utils.Transport(addr, cert, key, ca, serverName, *tlsInsecureSkipVerify)
tr, err := utils.Transport(addr, cert, key, ca, serverName, tlsInsecureSkipVerify.GetOptionalArg(i))
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}

View file

@ -2,6 +2,12 @@
# tip
* FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503
* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month.
* FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`.
* BUGFIX: vmagent: set missing `__meta_kubernetes_service_*` labels in `kubernetes_sd_config` for `endpoints` and `endpointslices` roles. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/982
# [v1.50.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.50.2)

View file

@ -15,6 +15,7 @@ Release process guidance
1. Publish message in slack (victoriametrics.slack.com, general channel)
2. Post twit with release notes URL
3. Post in subreddit https://www.reddit.com/r/VictoriaMetrics/
4. Post in linkedin
## Helm Charts

View file

@ -175,9 +175,9 @@ The shortlist of configuration flags is the following:
-datasource.basicAuth.username string
Optional basic auth username for -datasource.url
-datasource.lookback duration
Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query.
Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query.
-datasource.maxIdleConnections int
Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100)
Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100)
-datasource.tlsCAFile string
Optional path to TLS CA file to use for verifying connections to -datasource.url. By default system CA is used
-datasource.tlsCertFile string
@ -190,6 +190,8 @@ The shortlist of configuration flags is the following:
Optional TLS server name to use for connections to -datasource.url. By default the server name from -datasource.url is used
-datasource.url string
Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428
-dryRun -rule
Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP is used
-envflag.enable
@ -224,14 +226,18 @@ The shortlist of configuration flags is the following:
Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
-httpListenAddr string
Address to listen for http connections (default ":8880")
-loggerDisableTimestamps
Whether to disable writing timestamps in logs
-loggerErrorsPerSecondLimit int
Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit (default 10)
Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit
-loggerFormat string
Format for logs. Possible values: default, json (default "default")
-loggerLevel string
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
-loggerOutput string
Output for the logs. Supported values: stderr, stdout (default "stderr")
-loggerWarnsPerSecondLimit int
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero value disables the rate limit
-memory.allowedBytes value
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to non-zero value. Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. Too high value may evict too much data from OS page cache, which will result in higher disk IO usage
Supports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB (default 0)
@ -251,8 +257,9 @@ The shortlist of configuration flags is the following:
-notifier.tlsCertFile array
Optional path to client-side TLS certificate file to use when connecting to -notifier.url
Supports array of values separated by comma or specified via multiple flags.
-notifier.tlsInsecureSkipVerify
-notifier.tlsInsecureSkipVerify array
Whether to skip tls verification when connecting to -notifier.url
Supports array of values separated by comma or specified via multiple flags.
-notifier.tlsKeyFile array
Optional path to client-side TLS certificate key to use when connecting to -notifier.url
Supports array of values separated by comma or specified via multiple flags.

View file

@ -8,6 +8,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
@ -66,10 +67,14 @@ func newClient(sw *ScrapeWork) *client {
host += ":443"
}
}
dialFunc, err := newStatDialFunc(sw.ProxyURL, tlsCfg)
if err != nil {
logger.Fatalf("cannot create dial func: %s", err)
}
hc := &fasthttp.HostClient{
Addr: host,
Name: "vm_promscrape",
Dial: statDial,
Dial: dialFunc,
IsTLS: isTLS,
TLSConfig: tlsCfg,
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
@ -80,9 +85,14 @@ func newClient(sw *ScrapeWork) *client {
}
var sc *http.Client
if *streamParse || sw.StreamParse {
var proxy func(*http.Request) (*url.URL, error)
if proxyURL := sw.ProxyURL.URL(); proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
}
sc = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
@ -93,9 +103,8 @@ func newClient(sw *ScrapeWork) *client {
}
}
return &client{
hc: hc,
sc: sc,
hc: hc,
sc: sc,
scrapeURL: sw.ScrapeURL,
host: host,
requestURI: requestURI,

View file

@ -23,6 +23,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"gopkg.in/yaml.v2"
)
@ -71,6 +72,7 @@ type ScrapeConfig struct {
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"`
@ -495,6 +497,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
metricsPath: metricsPath,
scheme: scheme,
params: params,
proxyURL: sc.ProxyURL,
authConfig: ac,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
@ -516,6 +519,7 @@ type scrapeWorkConfig struct {
metricsPath string
scheme string
params map[string][]string
proxyURL proxy.URL
authConfig *promauth.Config
honorLabels bool
honorTimestamps bool
@ -750,6 +754,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
HonorTimestamps: swc.honorTimestamps,
OriginalLabels: originalLabels,
Labels: labels,
ProxyURL: swc.proxyURL,
AuthConfig: swc.authConfig,
MetricRelabelConfigs: swc.metricRelabelConfigs,
SampleLimit: swc.sampleLimit,

View file

@ -58,7 +58,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
apiServer = scheme + "://" + apiServer
}
client, err := discoveryutils.NewClient(apiServer, ac)
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
// SDConfig represents service discovery config for Consul.
@ -16,6 +17,7 @@ type SDConfig struct {
Scheme string `yaml:"scheme,omitempty"`
Username string `yaml:"username"`
Password string `yaml:"password"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
Services []string `yaml:"services,omitempty"`
Tags []string `yaml:"tags,omitempty"`

View file

@ -34,11 +34,12 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
port: sdc.Port,
filtersQueryArg: getFiltersQueryArg(sdc.Filters),
}
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
return nil, err
}
client, err := discoveryutils.NewClient(sdc.Host, ac)
client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err)
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
// SDConfig represents docker swarm service discovery configuration
@ -15,7 +16,7 @@ type SDConfig struct {
Port int `yaml:"port,omitempty"`
Filters []Filter `yaml:"filters,omitempty"`
// TODO: add support for proxy_url
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
// refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`

View file

@ -43,7 +43,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
apiServer = scheme + "://" + apiServer
}
client, err := discoveryutils.NewClient(apiServer, ac)
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}

View file

@ -5,9 +5,9 @@ import (
"fmt"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
const appsAPIPath = "/apps"
@ -22,6 +22,7 @@ type SDConfig struct {
Scheme string `yaml:"scheme,omitempty"`
Username string `yaml:"username"`
Password string `yaml:"password"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option.

View file

@ -56,7 +56,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
ac = acNew
}
client, err := discoveryutils.NewClient(apiServer, ac)
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}

View file

@ -158,6 +158,9 @@ func (eps *Endpoints) appendTargetLabels(ms []map[string]string, pods []Pod, svc
}
p.appendCommonLabels(m)
p.appendContainerLabels(m, c, &cp)
if svc != nil {
svc.appendCommonLabels(m)
}
ms = append(ms, m)
}
}

View file

@ -113,6 +113,9 @@ func (eps *EndpointSlice) appendTargetLabels(ms []map[string]string, pods []Pod,
}
p.appendCommonLabels(m)
p.appendContainerLabels(m, c, &cp)
if svc != nil {
svc.appendCommonLabels(m)
}
ms = append(ms, m)
}
}

View file

@ -420,6 +420,13 @@ func TestEndpointSlice_appendTargetLabels(t *testing.T) {
"__meta_kubernetes_pod_phase": "",
"__meta_kubernetes_pod_ready": "unknown",
"__meta_kubernetes_pod_uid": "some-pod-uuid",
"__meta_kubernetes_service_cluster_ip": "",
"__meta_kubernetes_service_label_service_label_1": "value-1",
"__meta_kubernetes_service_label_service_label_2": "value-2",
"__meta_kubernetes_service_labelpresent_service_label_1": "true",
"__meta_kubernetes_service_labelpresent_service_label_2": "true",
"__meta_kubernetes_service_name": "custom-esl",
"__meta_kubernetes_service_type": "ClusterIP",
}),
},
},

View file

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
// SDConfig represents kubernetes-based service discovery config.
@ -15,6 +16,7 @@ type SDConfig struct {
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
ProxyURL proxy.URL `yaml:"proxy_url,omitempty"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
Namespaces Namespaces `yaml:"namespaces,omitempty"`
Selectors []Selector `yaml:"selectors,omitempty"`

View file

@ -10,8 +10,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/fasthttp"
)
@ -45,11 +45,12 @@ type Client struct {
}
// NewClient returns new Client for the given apiServer and the given ac.
func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Client, error) {
var (
dialFunc fasthttp.DialFunc
tlsCfg *tls.Config
u fasthttp.URI
err error
)
u.Update(apiServer)
@ -61,6 +62,7 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
return net.Dial("unix", dialAddr)
}
}
hostPort := string(u.Host())
isTLS := string(u.Scheme()) == "https"
if isTLS && ac != nil {
@ -73,10 +75,15 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
}
hostPort = net.JoinHostPort(hostPort, port)
}
if dialFunc == nil {
dialFunc, err = proxyURL.NewDialFunc(tlsCfg)
if err != nil {
return nil, err
}
}
hc := &fasthttp.HostClient{
Addr: hostPort,
Name: "vm_promscrape/discovery",
DialDualStack: netutil.TCP6Enabled(),
IsTLS: isTLS,
TLSConfig: tlsCfg,
ReadTimeout: time.Minute,
@ -88,7 +95,6 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) {
blockingClient := &fasthttp.HostClient{
Addr: hostPort,
Name: "vm_promscrape/discovery",
DialDualStack: netutil.TCP6Enabled(),
IsTLS: isTLS,
TLSConfig: tlsCfg,
ReadTimeout: BlockingClientReadTimeout,

View file

@ -17,6 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/metrics"
xxhash "github.com/cespare/xxhash/v2"
)
@ -70,6 +71,9 @@ type ScrapeWork struct {
// Auth config
AuthConfig *promauth.Config
// ProxyURL HTTP proxy url
ProxyURL proxy.URL
// Optional `metric_relabel_configs`.
MetricRelabelConfigs []promrelabel.ParsedRelabelConfig

View file

@ -2,6 +2,7 @@ package promscrape
import (
"context"
"crypto/tls"
"fmt"
"net"
"sync"
@ -9,6 +10,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics"
)
@ -47,25 +49,28 @@ var (
stdDialerOnce sync.Once
)
func statDial(addr string) (conn net.Conn, err error) {
if netutil.TCP6Enabled() {
conn, err = fasthttp.DialDualStack(addr)
} else {
conn, err = fasthttp.Dial(addr)
}
dialsTotal.Inc()
func newStatDialFunc(proxyURL proxy.URL, tlsConfig *tls.Config) (fasthttp.DialFunc, error) {
dialFunc, err := proxyURL.NewDialFunc(tlsConfig)
if err != nil {
dialErrors.Inc()
if !netutil.TCP6Enabled() {
err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err)
}
return nil, err
}
conns.Inc()
sc := &statConn{
Conn: conn,
statDialFunc := func(addr string) (net.Conn, error) {
conn, err := dialFunc(addr)
dialsTotal.Inc()
if err != nil {
dialErrors.Inc()
if !netutil.TCP6Enabled() {
err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err)
}
return nil, err
}
conns.Inc()
sc := &statConn{
Conn: conn,
}
return sc, nil
}
return sc, nil
return statDialFunc, nil
}
var (

117
lib/proxy/proxy.go Normal file
View file

@ -0,0 +1,117 @@
package proxy
import (
"bufio"
"crypto/tls"
"encoding/base64"
"fmt"
"net"
"net/url"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/fasthttp"
)
// URL implements YAML.Marshaler and yaml.Unmarshaler interfaces for url.URL.
type URL struct {
url *url.URL
}
// URL return the underlying url.
func (u *URL) URL() *url.URL {
if u == nil || u.url == nil {
return nil
}
return u.url
}
// MarshalYAML implements yaml.Marshaler interface.
func (u *URL) MarshalYAML() (interface{}, error) {
if u.url == nil {
return nil, nil
}
return u.url.String(), nil
}
// UnmarshalYAML implements yaml.Unmarshaler interface.
func (u *URL) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return err
}
parsedURL, err := url.Parse(s)
if err != nil {
return fmt.Errorf("cannot parse proxy_url=%q as *url.URL: %w", s, err)
}
u.url = parsedURL
return nil
}
// NewDialFunc returns dial func for the given pu and tlsConfig.
func (u *URL) NewDialFunc(tlsConfig *tls.Config) (fasthttp.DialFunc, error) {
if u == nil || u.url == nil {
return defaultDialFunc, nil
}
pu := u.url
if pu.Scheme != "http" && pu.Scheme != "https" {
return nil, fmt.Errorf("unknown scheme=%q for proxy_url=%q, must be http or https", pu.Scheme, pu)
}
var authHeader string
if pu.User != nil && len(pu.User.Username()) > 0 {
userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String()))
authHeader = "Proxy-Authorization: Basic " + userPasswordEncoded + "\r\n"
}
dialFunc := func(addr string) (net.Conn, error) {
proxyConn, err := defaultDialFunc(pu.Host)
if err != nil {
return nil, fmt.Errorf("cannot connect to proxy %q: %w", pu, err)
}
if pu.Scheme == "https" {
proxyConn = tls.Client(proxyConn, tlsConfig)
}
conn, err := sendConnectRequest(proxyConn, addr, authHeader)
if err != nil {
_ = proxyConn.Close()
return nil, fmt.Errorf("error when sending CONNECT request to proxy %q: %w", pu, err)
}
return conn, nil
}
return dialFunc, nil
}
func defaultDialFunc(addr string) (net.Conn, error) {
if netutil.TCP6Enabled() {
return fasthttp.DialDualStack(addr)
}
return fasthttp.Dial(addr)
}
// sendConnectRequest sends CONNECT request to proxyConn for the given addr and authHeader and returns the established connection to dstAddr.
func sendConnectRequest(proxyConn net.Conn, dstAddr, authHeader string) (net.Conn, error) {
req := "CONNECT " + dstAddr + " HTTP/1.1\r\nHost: " + dstAddr + "\r\n" + authHeader + "\r\n"
if _, err := proxyConn.Write([]byte(req)); err != nil {
return nil, fmt.Errorf("cannot send CONNECT request for dstAddr=%q: %w", dstAddr, err)
}
var res fasthttp.Response
res.SkipBody = true
conn := &bufferedReaderConn{
br: bufio.NewReader(proxyConn),
Conn: proxyConn,
}
if err := res.Read(conn.br); err != nil {
return nil, fmt.Errorf("cannot read CONNECT response for dstAddr=%q: %w", dstAddr, err)
}
if statusCode := res.Header.StatusCode(); statusCode != 200 {
return nil, fmt.Errorf("unexpected status code received: %d; want: 200", statusCode)
}
return conn, nil
}
type bufferedReaderConn struct {
net.Conn
br *bufio.Reader
}
func (brc *bufferedReaderConn) Read(p []byte) (int, error) {
return brc.br.Read(p)
}

View file

@ -167,6 +167,7 @@ type partition struct {
bigPartsMergerWG sync.WaitGroup
rawRowsFlusherWG sync.WaitGroup
inmemoryPartsFlusherWG sync.WaitGroup
stalePartsRemoverWG sync.WaitGroup
}
// partWrapper is a wrapper for the part.
@ -278,6 +279,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
pt.startMergeWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
return pt, nil
}
@ -641,8 +643,13 @@ func (pt *partition) PutParts(pws []*partWrapper) {
func (pt *partition) MustClose() {
close(pt.stopCh)
logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath)
logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath)
startTime := time.Now()
pt.stalePartsRemoverWG.Wait()
logger.Infof("stale parts remover stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath)
startTime = time.Now()
pt.inmemoryPartsFlusherWG.Wait()
logger.Infof("inmemory parts flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
@ -1289,6 +1296,64 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
return dst, removedParts
}
func (pt *partition) startStalePartsRemover() {
pt.stalePartsRemoverWG.Add(1)
go func() {
pt.stalePartsRemover()
pt.stalePartsRemoverWG.Done()
}()
}
func (pt *partition) stalePartsRemover() {
ticker := time.NewTicker(7 * time.Minute)
defer ticker.Stop()
for {
select {
case <-pt.stopCh:
return
case <-ticker.C:
pt.removeStaleParts()
}
}
}
func (pt *partition) removeStaleParts() {
m := make(map[*partWrapper]bool)
startTime := time.Now()
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
pt.partsLock.Lock()
for _, pw := range pt.bigParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount)
m[pw] = true
}
}
for _, pw := range pt.smallParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount)
m[pw] = true
}
}
removedSmallParts := 0
removedBigParts := 0
if len(m) > 0 {
pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m, false)
pt.bigParts, removedBigParts = removeParts(pt.bigParts, m, true)
}
pt.partsLock.Unlock()
if removedSmallParts+removedBigParts != len(m) {
logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedSmallParts+removedBigParts, len(m))
}
// Remove partition references from removed parts, so they are eventually deleted when nobody reads from them.
for pw := range m {
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000)
pw.decRef()
}
}
// getPartsToMerge returns optimal parts to merge from pws.
//
// The returned parts will contain less than maxRows rows.