VictoriaMetrics/lib/promscrape/client.go
Aliaksandr Valialkin f03e81c693
lib/promauth: follow-up for e16d3f5639
- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup
  don't prevent from processing the corresponding scrape targets after the file becomes correct,
  without the need to restart vmagent.
  Previously scrape targets with invalid TLS CA file or TLS client certificate files
  were permanently dropped after the first attempt to initialize them, and they didn't
  appear until the next vmagent reload or the next change in other places of the loaded scrape configs.

- Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent.
  Previously the old TLS CA was used until vmagent restart.

- Properly handle errors during http request creation for the second attempt to send data to remote system
  at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing,
  since the returned request is nil on error.

- Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent.
  Previously it could miss details on the source of the request.

- Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header
  of every http request issued by vmagent during service discovery or target scraping.
  Re-use the HTTP client instead until the corresponding scrape config changes.

- Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached,
  e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server
  when auth header cannot be obtained because of temporary error.

- Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config.
  Cache the loaded certificate and the error for one second. This should significantly reduce CPU load
  when scraping big number of targets with the same tls_config.

- Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`.

- Improve test coverage at lib/promauth

- Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later.
  Previously vmagent was exitting in this case.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
2023-10-26 09:55:47 +02:00

443 lines
17 KiB
Go

package promscrape
import (
"context"
"crypto/tls"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics"
)
var (
maxScrapeSize = flagutil.NewBytes("promscrape.maxScrapeSize", 16*1024*1024, "The maximum size of scrape response in bytes to process from Prometheus targets. "+
"Bigger responses are rejected")
maxResponseHeadersSize = flagutil.NewBytes("promscrape.maxResponseHeadersSize", 4096, "The maximum size of http response headers from Prometheus scrape targets")
disableCompression = flag.Bool("promscrape.disableCompression", false, "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. "+
"This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. "+
"It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
disableKeepAlive = flag.Bool("promscrape.disableKeepAlive", false, "Whether to disable HTTP keep-alive connections when scraping all the targets. "+
"This may be useful when targets has no support for HTTP keep-alive connection. "+
"It is possible to set 'disable_keepalive: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control. "+
"Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets")
streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+
"for reducing memory usage when millions of metrics are exposed per each scrape target. "+
"It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
)
type client struct {
// hc is the default client optimized for common case of scraping targets with moderate number of metrics.
hc *fasthttp.HostClient
// sc (aka `stream client`) is used instead of hc if ScrapeWork.StreamParse is set.
// It may be useful for scraping targets with millions of metrics per target.
sc *http.Client
ctx context.Context
scrapeURL string
scrapeTimeoutSecondsStr string
hostPort string
requestURI string
setHeaders func(req *http.Request) error
setProxyHeaders func(req *http.Request) error
setFasthttpHeaders func(req *fasthttp.Request) error
setFasthttpProxyHeaders func(req *fasthttp.Request) error
denyRedirects bool
disableCompression bool
disableKeepAlive bool
}
func addMissingPort(addr string, isTLS bool) string {
if strings.Contains(addr, ":") {
return addr
}
if isTLS {
return concatTwoStrings(addr, ":443")
}
return concatTwoStrings(addr, ":80")
}
func concatTwoStrings(x, y string) string {
bb := bbPool.Get()
b := bb.B[:0]
b = append(b, x...)
b = append(b, y...)
s := bytesutil.InternBytes(b)
bb.B = b
bbPool.Put(bb)
return s
}
const scrapeUserAgent = "vm_promscrape"
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
var u fasthttp.URI
u.Update(sw.ScrapeURL)
hostPort := string(u.Host())
dialAddr := hostPort
requestURI := string(u.RequestURI())
isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config
if isTLS {
var err error
tlsCfg, err = sw.AuthConfig.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
}
setProxyHeaders := func(req *http.Request) error { return nil }
setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil }
proxyURL := sw.ProxyURL
if !isTLS && proxyURL.IsHTTPOrHTTPS() {
// Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets
// like net/http package from Go does.
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
pu := proxyURL.GetURL()
dialAddr = pu.Host
requestURI = sw.ScrapeURL
isTLS = pu.Scheme == "https"
if isTLS {
var err error
tlsCfg, err = sw.ProxyAuthConfig.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize proxy tls config: %w", err)
}
}
proxyURLOrig := proxyURL
setProxyHeaders = func(req *http.Request) error {
return proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req)
}
setFasthttpProxyHeaders = func(req *fasthttp.Request) error {
return proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req)
}
proxyURL = &proxy.URL{}
}
hostPort = addMissingPort(hostPort, isTLS)
dialAddr = addMissingPort(dialAddr, isTLS)
dialFunc, err := newStatDialFunc(proxyURL, sw.ProxyAuthConfig)
if err != nil {
return nil, fmt.Errorf("cannot create dial func: %w", err)
}
hc := &fasthttp.HostClient{
Addr: dialAddr,
// Name used in User-Agent request header
Name: scrapeUserAgent,
Dial: dialFunc,
IsTLS: isTLS,
TLSConfig: tlsCfg,
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
ReadTimeout: sw.ScrapeTimeout,
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: maxScrapeSize.IntN(),
MaxIdempotentRequestAttempts: 1,
ReadBufferSize: maxResponseHeadersSize.IntN(),
}
var sc *http.Client
var proxyURLFunc func(*http.Request) (*url.URL, error)
if pu := sw.ProxyURL.GetURL(); pu != nil {
proxyURLFunc = http.ProxyURL(pu)
}
sc = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
DialContext: statStdDial,
MaxIdleConnsPerHost: 100,
MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N),
},
Timeout: sw.ScrapeTimeout,
}
if sw.DenyRedirects {
sc.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
}
c := &client{
hc: hc,
ctx: ctx,
sc: sc,
scrapeURL: sw.ScrapeURL,
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
hostPort: hostPort,
requestURI: requestURI,
setHeaders: func(req *http.Request) error {
return sw.AuthConfig.SetHeaders(req, true)
},
setProxyHeaders: setProxyHeaders,
setFasthttpHeaders: func(req *fasthttp.Request) error {
return sw.AuthConfig.SetFasthttpHeaders(req, true)
},
setFasthttpProxyHeaders: setFasthttpProxyHeaders,
denyRedirects: sw.DenyRedirects,
disableCompression: sw.DisableCompression,
disableKeepAlive: sw.DisableKeepAlive,
}
return c, nil
}
func (c *client) GetStreamReader() (*streamReader, error) {
deadline := time.Now().Add(c.sc.Timeout)
ctx, cancel := context.WithDeadline(c.ctx, deadline)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.scrapeURL, nil)
if err != nil {
cancel()
return nil, fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err)
}
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
req.Header.Set("User-Agent", scrapeUserAgent)
if err := c.setHeaders(req); err != nil {
cancel()
return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
}
if err := c.setProxyHeaders(req); err != nil {
cancel()
return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
}
scrapeRequests.Inc()
resp, err := c.sc.Do(req)
if err != nil {
cancel()
return nil, fmt.Errorf("cannot scrape %q: %w", c.scrapeURL, err)
}
if resp.StatusCode != http.StatusOK {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc()
respBody, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
cancel()
return nil, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
c.scrapeURL, resp.StatusCode, http.StatusOK, respBody)
}
scrapesOK.Inc()
sr := &streamReader{
r: resp.Body,
cancel: cancel,
scrapeURL: c.scrapeURL,
maxBodySize: int64(c.hc.MaxResponseBodySize),
}
return sr, nil
}
// checks fasthttp status code for redirect as standard http/client does.
func isStatusRedirect(statusCode int) bool {
switch statusCode {
case 301, 302, 303, 307, 308:
return true
}
return false
}
func (c *client) ReadData(dst []byte) ([]byte, error) {
deadline := time.Now().Add(c.hc.ReadTimeout)
req := fasthttp.AcquireRequest()
req.SetRequestURI(c.requestURI)
req.Header.SetHost(c.hostPort)
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
if err := c.setFasthttpHeaders(req); err != nil {
return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
}
if err := c.setFasthttpProxyHeaders(req); err != nil {
return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
}
if !*disableCompression && !c.disableCompression {
req.Header.Set("Accept-Encoding", "gzip")
}
if *disableKeepAlive || c.disableKeepAlive {
req.SetConnectionClose()
}
resp := fasthttp.AcquireResponse()
swapResponseBodies := len(dst) == 0
if swapResponseBodies {
// An optimization: write response directly to dst.
// This should reduce memory usage when scraping big targets.
dst = resp.SwapBody(dst)
}
ctx, cancel := context.WithDeadline(c.ctx, deadline)
defer cancel()
err := doRequestWithPossibleRetry(ctx, c.hc, req, resp)
statusCode := resp.StatusCode()
redirectsCount := 0
for err == nil && isStatusRedirect(statusCode) {
if redirectsCount > 5 {
err = fmt.Errorf("too many redirects")
break
}
if c.denyRedirects {
err = fmt.Errorf("cannot follow redirects if `follow_redirects: false` is set")
break
}
// It is expected that the redirect is made on the same host.
// Otherwise it won't work.
location := resp.Header.Peek("Location")
if len(location) == 0 {
err = fmt.Errorf("missing Location header")
break
}
req.URI().UpdateBytes(location)
err = doRequestWithPossibleRetry(ctx, c.hc, req, resp)
statusCode = resp.StatusCode()
redirectsCount++
}
if swapResponseBodies {
dst = resp.SwapBody(dst)
}
fasthttp.ReleaseRequest(req)
if err != nil {
fasthttp.ReleaseResponse(resp)
if err == fasthttp.ErrTimeout {
scrapesTimedout.Inc()
return dst, fmt.Errorf("error when scraping %q with timeout %s: %w", c.scrapeURL, c.hc.ReadTimeout, err)
}
if err == fasthttp.ErrBodyTooLarge {
maxScrapeSizeExceeded.Inc()
return dst, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d; "+
"either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, maxScrapeSize.N)
}
return dst, fmt.Errorf("error when scraping %q: %w", c.scrapeURL, err)
}
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
var err error
if swapResponseBodies {
zb := gunzipBufPool.Get()
zb.B, err = fasthttp.AppendGunzipBytes(zb.B[:0], dst)
dst = append(dst[:0], zb.B...)
gunzipBufPool.Put(zb)
} else {
dst, err = fasthttp.AppendGunzipBytes(dst, resp.Body())
}
if err != nil {
fasthttp.ReleaseResponse(resp)
scrapesGunzipFailed.Inc()
return dst, fmt.Errorf("cannot ungzip response from %q: %w", c.scrapeURL, err)
}
scrapesGunzipped.Inc()
} else if !swapResponseBodies {
dst = append(dst, resp.Body()...)
}
fasthttp.ReleaseResponse(resp)
if len(dst) > c.hc.MaxResponseBodySize {
maxScrapeSizeExceeded.Inc()
return dst, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d (the actual response size is %d bytes); "+
"either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, maxScrapeSize.N, len(dst))
}
if statusCode != fasthttp.StatusOK {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, statusCode)).Inc()
return dst, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
c.scrapeURL, statusCode, fasthttp.StatusOK, dst)
}
scrapesOK.Inc()
return dst, nil
}
var gunzipBufPool bytesutil.ByteBufferPool
var (
maxScrapeSizeExceeded = metrics.NewCounter(`vm_promscrape_max_scrape_size_exceeded_errors_total`)
scrapesTimedout = metrics.NewCounter(`vm_promscrape_scrapes_timed_out_total`)
scrapesOK = metrics.NewCounter(`vm_promscrape_scrapes_total{status_code="200"}`)
scrapesGunzipped = metrics.NewCounter(`vm_promscrape_scrapes_gunziped_total`)
scrapesGunzipFailed = metrics.NewCounter(`vm_promscrape_scrapes_gunzip_failed_total`)
scrapeRequests = metrics.NewCounter(`vm_promscrape_scrape_requests_total`)
scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`)
)
func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) error {
scrapeRequests.Inc()
var reqErr error
// Return true if the request execution is completed and retry is not required
attempt := func() bool {
// Use DoCtx instead of Do in order to support context cancellation
reqErr = hc.DoCtx(ctx, req, resp)
if reqErr == nil {
statusCode := resp.StatusCode()
if statusCode != fasthttp.StatusTooManyRequests {
return true
}
} else if reqErr != fasthttp.ErrConnectionClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
return true
}
return false
}
if attempt() {
return reqErr
}
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
// Perform the second attempt immediately after the first attempt - this should help
// in cases when the remote side closes the keep-alive connection before the first attempt.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
sleepTime := time.Second
// It is expected that the deadline is already set to ctx, so the loop below
// should eventually finish if all the attempt() calls are unsuccessful.
for {
scrapeRetries.Inc()
if attempt() {
return reqErr
}
sleepTime += sleepTime
if !discoveryutils.SleepCtx(ctx, sleepTime) {
return reqErr
}
}
}
type streamReader struct {
r io.ReadCloser
cancel context.CancelFunc
bytesRead int64
scrapeURL string
maxBodySize int64
}
func (sr *streamReader) Read(p []byte) (int, error) {
n, err := sr.r.Read(p)
sr.bytesRead += int64(n)
if err == nil && sr.bytesRead > sr.maxBodySize {
maxScrapeSizeExceeded.Inc()
err = fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d; "+
"either reduce the response size for the target or increase -promscrape.maxScrapeSize", sr.scrapeURL, sr.maxBodySize)
}
return n, err
}
func (sr *streamReader) MustClose() {
sr.cancel()
if err := sr.r.Close(); err != nil {
logger.Errorf("cannot close reader: %s", err)
}
}