VictoriaMetrics/lib/promscrape/client.go
Nikolay 08cbbf8134
lib/promscrape: fixes proxy autorization (#6783)
* Adds custom dial func for HTTP-Connect and socks5 proxy tunnels.
  Standard golang http.transport exposes GetProxyConnectHeader function,
  but it doesn't allow to use separate tls config for proxy.
  It also not possible to enforce HTTP-Connect with standard http lib.
* For http scrape targets, by default http.Transport.Proxy function must
  be used. Since it has special case with full uri forward.
* Adds proxy.URL json methods that allow to properly copy internal
fields, like User/Password.
It should fix bug with proxy_url. When credentials specified at URL was
ignored.
* Adds tests for scrape client proxy requests

related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6771
2024-08-19 22:50:39 +02:00

184 lines
7.5 KiB
Go

package promscrape
import (
"context"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
var (
maxResponseHeadersSize = flagutil.NewBytes("promscrape.maxResponseHeadersSize", 4096, "The maximum size of http response headers from Prometheus scrape targets")
disableCompression = flag.Bool("promscrape.disableCompression", false, "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. "+
"This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. "+
"It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
disableKeepAlive = flag.Bool("promscrape.disableKeepAlive", false, "Whether to disable HTTP keep-alive connections when scraping all the targets. "+
"This may be useful when targets has no support for HTTP keep-alive connection. "+
"It is possible to set 'disable_keepalive: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control. "+
"Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets")
streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+
"for reducing memory usage when millions of metrics are exposed per each scrape target. "+
"It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
)
type client struct {
c *http.Client
ctx context.Context
scrapeURL string
scrapeTimeoutSecondsStr string
setHeaders func(req *http.Request) error
setProxyHeaders func(req *http.Request) error
maxScrapeSize int64
}
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
ac := sw.AuthConfig
setHeaders := func(req *http.Request) error {
return sw.AuthConfig.SetHeaders(req, true)
}
setProxyHeaders := func(_ *http.Request) error {
return nil
}
dialFunc := netutil.NewStatDialFunc("vm_promscrape")
proxyURL := sw.ProxyURL
var proxyURLFunc func(*http.Request) (*url.URL, error)
if proxyURL != nil {
// case for direct http proxy connection.
// must be used for http based scrape targets
// since standard golang http.transport has special case for it
if strings.HasPrefix(sw.ScrapeURL, "http://") {
if proxyURL.URL.Scheme == "https" {
ac = sw.ProxyAuthConfig
}
proxyURLFunc = http.ProxyURL(proxyURL.URL)
setProxyHeaders = func(req *http.Request) error {
return proxyURL.SetHeaders(sw.ProxyAuthConfig, req)
}
} else {
// HTTP-Connect or socks5 proxy tunnel
// it makes possible to use separate tls configurations
// for proxy and backend connections
proxyDial, err := proxyURL.NewDialFunc(sw.ProxyAuthConfig)
if err != nil {
return nil, fmt.Errorf("cannot create dialer for proxy_url=%q connection: %w", proxyURL, err)
}
dialFunc = netutil.NewStatDialFuncWithDial("vm_promscrape", proxyDial)
}
}
hc := &http.Client{
Transport: ac.NewRoundTripper(&http.Transport{
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
DialContext: dialFunc,
MaxIdleConnsPerHost: 100,
MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N),
}),
Timeout: sw.ScrapeTimeout,
}
if sw.DenyRedirects {
hc.CheckRedirect = func(_ *http.Request, _ []*http.Request) error {
return http.ErrUseLastResponse
}
}
c := &client{
c: hc,
ctx: ctx,
scrapeURL: sw.ScrapeURL,
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
setHeaders: setHeaders,
setProxyHeaders: setProxyHeaders,
maxScrapeSize: sw.MaxScrapeSize,
}
return c, nil
}
func (c *client) ReadData(dst *bytesutil.ByteBuffer) error {
deadline := time.Now().Add(c.c.Timeout)
ctx, cancel := context.WithDeadline(c.ctx, deadline)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.scrapeURL, nil)
if err != nil {
cancel()
return fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err)
}
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
req.Header.Set("User-Agent", "vm_promscrape")
if err := c.setHeaders(req); err != nil {
cancel()
return fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
}
if err := c.setProxyHeaders(req); err != nil {
cancel()
return fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
}
scrapeRequests.Inc()
resp, err := c.c.Do(req)
if err != nil {
cancel()
if ue, ok := err.(*url.Error); ok && ue.Timeout() {
scrapesTimedout.Inc()
}
return fmt.Errorf("cannot perform request to %q: %w", c.scrapeURL, err)
}
if resp.StatusCode != http.StatusOK {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc()
respBody, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
cancel()
return fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
c.scrapeURL, resp.StatusCode, http.StatusOK, respBody)
}
scrapesOK.Inc()
// Read the data from resp.Body
r := &io.LimitedReader{
R: resp.Body,
N: c.maxScrapeSize,
}
_, err = dst.ReadFrom(r)
_ = resp.Body.Close()
cancel()
if err != nil {
if ue, ok := err.(*url.Error); ok && ue.Timeout() {
scrapesTimedout.Inc()
}
return fmt.Errorf("cannot read data from %s: %w", c.scrapeURL, err)
}
if int64(len(dst.B)) >= c.maxScrapeSize {
maxScrapeSizeExceeded.Inc()
return fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize or max_scrape_size in the scrape config (%d bytes). "+
"Possible solutions are: reduce the response size for the target, increase -promscrape.maxScrapeSize command-line flag, "+
"increase max_scrape_size value in scrape config for the given target", c.scrapeURL, maxScrapeSize.N)
}
return nil
}
var (
maxScrapeSizeExceeded = metrics.NewCounter(`vm_promscrape_max_scrape_size_exceeded_errors_total`)
scrapesTimedout = metrics.NewCounter(`vm_promscrape_scrapes_timed_out_total`)
scrapesOK = metrics.NewCounter(`vm_promscrape_scrapes_total{status_code="200"}`)
scrapeRequests = metrics.NewCounter(`vm_promscrape_scrape_requests_total`)
)