mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
f03e81c693
- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup don't prevent from processing the corresponding scrape targets after the file becomes correct, without the need to restart vmagent. Previously scrape targets with invalid TLS CA file or TLS client certificate files were permanently dropped after the first attempt to initialize them, and they didn't appear until the next vmagent reload or the next change in other places of the loaded scrape configs. - Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent. Previously the old TLS CA was used until vmagent restart. - Properly handle errors during http request creation for the second attempt to send data to remote system at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing, since the returned request is nil on error. - Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent. Previously it could miss details on the source of the request. - Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header of every http request issued by vmagent during service discovery or target scraping. Re-use the HTTP client instead until the corresponding scrape config changes. - Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached, e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server when auth header cannot be obtained because of temporary error. - Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config. Cache the loaded certificate and the error for one second. This should significantly reduce CPU load when scraping big number of targets with the same tls_config. - Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`. - Improve test coverage at lib/promauth - Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later. Previously vmagent was exitting in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
359 lines
11 KiB
Go
359 lines
11 KiB
Go
package discoveryutils
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
maxConcurrency = flag.Int("promscrape.discovery.concurrency", 100, "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)")
|
|
maxWaitTime = flag.Duration("promscrape.discovery.concurrentWaitTime", time.Minute, "The maximum duration for waiting to perform API requests "+
|
|
"if more than -promscrape.discovery.concurrency requests are simultaneously performed")
|
|
)
|
|
|
|
var defaultClient = &http.Client{
|
|
Timeout: 30 * time.Second,
|
|
}
|
|
|
|
var (
|
|
concurrencyLimitCh chan struct{}
|
|
concurrencyLimitChOnce sync.Once
|
|
)
|
|
|
|
const (
|
|
// BlockingClientReadTimeout is the maximum duration for waiting the response from GetBlockingAPI*
|
|
BlockingClientReadTimeout = 10 * time.Minute
|
|
|
|
// DefaultClientReadTimeout is the maximum duration for waiting the response from GetAPI*
|
|
DefaultClientReadTimeout = time.Minute
|
|
)
|
|
|
|
// RequestCallback is called on the request before sending the request to the server.
|
|
type RequestCallback func(req *http.Request)
|
|
|
|
// ResponseCallback is called on the response before validating and returning the response to the caller.
|
|
type ResponseCallback func(resp *http.Response)
|
|
|
|
func concurrencyLimitChInit() {
|
|
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
|
|
}
|
|
|
|
// GetHTTPClient returns default client for http API requests.
|
|
func GetHTTPClient() *http.Client {
|
|
return defaultClient
|
|
}
|
|
|
|
// Client is http client, which talks to the given apiServer passed to NewClient().
|
|
type Client struct {
|
|
// client is used for short requests.
|
|
client *HTTPClient
|
|
|
|
// blockingClient is used for long-polling requests.
|
|
blockingClient *HTTPClient
|
|
|
|
apiServer string
|
|
|
|
setHTTPHeaders func(req *http.Request) error
|
|
setHTTPProxyHeaders func(req *http.Request) error
|
|
|
|
clientCtx context.Context
|
|
clientCancel context.CancelFunc
|
|
}
|
|
|
|
// HTTPClient is a wrapper around http.Client with timeouts.
|
|
type HTTPClient struct {
|
|
client *http.Client
|
|
ReadTimeout time.Duration
|
|
}
|
|
|
|
func (hc *HTTPClient) stop() {
|
|
// Close idle connections to server in order to free up resources.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4724
|
|
hc.client.CloseIdleConnections()
|
|
}
|
|
|
|
var defaultDialer = &net.Dialer{}
|
|
|
|
// NewClient returns new Client for the given args.
|
|
func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxyAC *promauth.Config, httpCfg *promauth.HTTPClientConfig) (*Client, error) {
|
|
u, err := url.Parse(apiServer)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse apiServer=%q: %w", apiServer, err)
|
|
}
|
|
|
|
dialFunc := defaultDialer.DialContext
|
|
if u.Scheme == "unix" {
|
|
// special case for unix socket connection
|
|
dialAddr := u.Path
|
|
apiServer = "http://unix"
|
|
dialFunc = func(ctx context.Context, _, _ string) (net.Conn, error) {
|
|
return defaultDialer.DialContext(ctx, "unix", dialAddr)
|
|
}
|
|
}
|
|
|
|
isTLS := u.Scheme == "https"
|
|
var tlsCfg *tls.Config
|
|
if isTLS {
|
|
var err error
|
|
tlsCfg, err = ac.NewTLSConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
|
}
|
|
}
|
|
|
|
var proxyURLFunc func(*http.Request) (*url.URL, error)
|
|
if pu := proxyURL.GetURL(); pu != nil {
|
|
proxyURLFunc = http.ProxyURL(pu)
|
|
}
|
|
|
|
client := &http.Client{
|
|
Timeout: DefaultClientReadTimeout,
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: tlsCfg,
|
|
Proxy: proxyURLFunc,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
MaxIdleConnsPerHost: *maxConcurrency,
|
|
ResponseHeaderTimeout: DefaultClientReadTimeout,
|
|
DialContext: dialFunc,
|
|
},
|
|
}
|
|
blockingClient := &http.Client{
|
|
Timeout: BlockingClientReadTimeout,
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: tlsCfg,
|
|
Proxy: proxyURLFunc,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
MaxIdleConnsPerHost: 1000,
|
|
ResponseHeaderTimeout: BlockingClientReadTimeout,
|
|
DialContext: dialFunc,
|
|
},
|
|
}
|
|
|
|
setHTTPHeaders := func(req *http.Request) error { return nil }
|
|
if ac != nil {
|
|
setHTTPHeaders = func(req *http.Request) error {
|
|
return ac.SetHeaders(req, true)
|
|
}
|
|
}
|
|
if httpCfg.FollowRedirects != nil && !*httpCfg.FollowRedirects {
|
|
checkRedirect := func(req *http.Request, via []*http.Request) error {
|
|
return http.ErrUseLastResponse
|
|
}
|
|
client.CheckRedirect = checkRedirect
|
|
blockingClient.CheckRedirect = checkRedirect
|
|
}
|
|
setHTTPProxyHeaders := func(req *http.Request) error { return nil }
|
|
if proxyAC != nil {
|
|
setHTTPProxyHeaders = func(req *http.Request) error {
|
|
return proxyURL.SetHeaders(proxyAC, req)
|
|
}
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
c := &Client{
|
|
client: &HTTPClient{
|
|
client: client,
|
|
ReadTimeout: DefaultClientReadTimeout,
|
|
},
|
|
blockingClient: &HTTPClient{
|
|
client: blockingClient,
|
|
ReadTimeout: BlockingClientReadTimeout,
|
|
},
|
|
apiServer: apiServer,
|
|
setHTTPHeaders: setHTTPHeaders,
|
|
setHTTPProxyHeaders: setHTTPProxyHeaders,
|
|
clientCtx: ctx,
|
|
clientCancel: cancel,
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// Context returns context for the client requests.
|
|
func (c *Client) Context() context.Context {
|
|
return c.clientCtx
|
|
}
|
|
|
|
// GetAPIResponseWithParamsCtx returns response for given absolute path with blocking client and optional callback for api response,
|
|
func (c *Client) GetAPIResponseWithParamsCtx(ctx context.Context, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
|
|
return c.getAPIResponseWithConcurrencyLimit(ctx, c.client, path, modifyRequest, inspectResponse)
|
|
}
|
|
|
|
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
|
|
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest RequestCallback) ([]byte, error) {
|
|
return c.getAPIResponseWithConcurrencyLimit(c.clientCtx, c.client, path, modifyRequest, nil)
|
|
}
|
|
|
|
// GetAPIResponse returns response for the given absolute path.
|
|
func (c *Client) GetAPIResponse(path string) ([]byte, error) {
|
|
return c.getAPIResponseWithConcurrencyLimit(c.clientCtx, c.client, path, nil, nil)
|
|
}
|
|
|
|
func (c *Client) getAPIResponseWithConcurrencyLimit(ctx context.Context, client *HTTPClient, path string,
|
|
modifyRequest RequestCallback, inspectResponse ResponseCallback,
|
|
) ([]byte, error) {
|
|
// Limit the number of concurrent API requests.
|
|
concurrencyLimitChOnce.Do(concurrencyLimitChInit)
|
|
t := timerpool.Get(*maxWaitTime)
|
|
select {
|
|
case concurrencyLimitCh <- struct{}{}:
|
|
timerpool.Put(t)
|
|
case <-t.C:
|
|
timerpool.Put(t)
|
|
return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d",
|
|
c.apiServer, *maxWaitTime, *maxConcurrency)
|
|
case <-ctx.Done():
|
|
timerpool.Put(t)
|
|
return nil, ctx.Err()
|
|
}
|
|
data, err := c.getAPIResponseWithParamsAndClientCtx(ctx, client, path, modifyRequest, inspectResponse)
|
|
<-concurrencyLimitCh
|
|
return data, err
|
|
}
|
|
|
|
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
|
|
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse ResponseCallback) ([]byte, error) {
|
|
return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.blockingClient, path, nil, inspectResponse)
|
|
}
|
|
|
|
// GetBlockingAPIResponseCtx returns response for given absolute path with blocking client and optional callback for api response,
|
|
func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, inspectResponse ResponseCallback) ([]byte, error) {
|
|
return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, nil, inspectResponse)
|
|
}
|
|
|
|
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
|
|
func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
|
|
requestURL := c.apiServer + path
|
|
u, err := url.Parse(requestURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse %q: %w", requestURL, err)
|
|
}
|
|
|
|
deadline := time.Now().Add(client.ReadTimeout)
|
|
ctx, cancel := context.WithDeadline(ctx, deadline)
|
|
defer cancel()
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err)
|
|
}
|
|
|
|
if err := c.setHTTPHeaders(req); err != nil {
|
|
return nil, fmt.Errorf("cannot set request headers for %q: %w", requestURL, err)
|
|
}
|
|
if err := c.setHTTPProxyHeaders(req); err != nil {
|
|
return nil, fmt.Errorf("cannot set request proxy headers for %q: %w", requestURL, err)
|
|
}
|
|
if modifyRequest != nil {
|
|
modifyRequest(req)
|
|
}
|
|
|
|
resp, err := doRequestWithPossibleRetry(client, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot fetch %q: %w", requestURL, err)
|
|
}
|
|
data, err := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read response from %q: %w", requestURL, err)
|
|
}
|
|
|
|
if inspectResponse != nil {
|
|
inspectResponse(resp)
|
|
}
|
|
statusCode := resp.StatusCode
|
|
if statusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q",
|
|
requestURL, statusCode, http.StatusOK, data)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// APIServer returns the API server address
|
|
func (c *Client) APIServer() string {
|
|
return c.apiServer
|
|
}
|
|
|
|
// Stop cancels all in-flight requests
|
|
func (c *Client) Stop() {
|
|
c.clientCancel()
|
|
c.client.stop()
|
|
c.blockingClient.stop()
|
|
}
|
|
|
|
func doRequestWithPossibleRetry(hc *HTTPClient, req *http.Request) (*http.Response, error) {
|
|
discoveryRequests.Inc()
|
|
|
|
var (
|
|
reqErr error
|
|
resp *http.Response
|
|
)
|
|
// Return true if the request execution is completed and retry is not required
|
|
attempt := func() bool {
|
|
resp, reqErr = hc.client.Do(req)
|
|
if reqErr == nil {
|
|
statusCode := resp.StatusCode
|
|
if statusCode != http.StatusTooManyRequests {
|
|
return true
|
|
}
|
|
} else if !errors.Is(reqErr, net.ErrClosed) && !strings.Contains(reqErr.Error(), "broken pipe") {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
if attempt() {
|
|
return resp, reqErr
|
|
}
|
|
|
|
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
|
|
// Perform the second attempt immediately after the first attempt - this should help
|
|
// in cases when the remote side closes the keep-alive connection before the first attempt.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
|
|
sleepTime := time.Second
|
|
// It is expected that the deadline is already set to req.Context(), so the loop below
|
|
// should eventually finish if all the attempt() calls are unsuccessful.
|
|
ctx := req.Context()
|
|
for {
|
|
discoveryRetries.Inc()
|
|
if attempt() {
|
|
return resp, reqErr
|
|
}
|
|
sleepTime += sleepTime
|
|
if !SleepCtx(ctx, sleepTime) {
|
|
return resp, reqErr
|
|
}
|
|
}
|
|
}
|
|
|
|
var (
|
|
discoveryRequests = metrics.NewCounter(`vm_promscrape_discovery_requests_total`)
|
|
discoveryRetries = metrics.NewCounter(`vm_promscrape_discovery_retries_total`)
|
|
)
|
|
|
|
// SleepCtx sleeps for sleepDuration.
|
|
//
|
|
// It immediately returns false on ctx cancel or deadline, without waiting for sleepDuration.
|
|
func SleepCtx(ctx context.Context, sleepDuration time.Duration) bool {
|
|
t := timerpool.Get(sleepDuration)
|
|
defer timerpool.Put(t)
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case <-t.C:
|
|
return true
|
|
}
|
|
}
|