2020-02-23 11:35:47 +00:00
package promscrape
import (
2020-11-01 21:12:13 +00:00
"context"
2020-02-23 11:35:47 +00:00
"crypto/tls"
"flag"
"fmt"
2020-11-01 21:12:13 +00:00
"io"
"io/ioutil"
"net/http"
2020-12-24 08:56:10 +00:00
"net/url"
2020-02-23 11:35:47 +00:00
"strings"
"time"
2020-11-26 16:08:39 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-08-16 14:05:52 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-11-01 21:12:13 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2021-04-03 21:40:08 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
2020-04-29 13:20:23 +00:00
"github.com/VictoriaMetrics/fasthttp"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
2020-08-16 14:05:52 +00:00
maxScrapeSize = flagutil . NewBytes ( "promscrape.maxScrapeSize" , 16 * 1024 * 1024 , "The maximum size of scrape response in bytes to process from Prometheus targets. " +
2020-02-23 11:35:47 +00:00
"Bigger responses are rejected" )
2020-07-02 11:19:11 +00:00
disableCompression = flag . Bool ( "promscrape.disableCompression" , false , "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. " +
"This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. " +
"It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control" )
disableKeepAlive = flag . Bool ( "promscrape.disableKeepAlive" , false , "Whether to disable HTTP keep-alive connections when scraping all the targets. " +
"This may be useful when targets has no support for HTTP keep-alive connection. " +
2021-03-15 19:59:25 +00:00
"It is possible to set 'disable_keepalive: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control. " +
2020-07-02 11:19:11 +00:00
"Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets" )
2020-11-01 21:12:13 +00:00
streamParse = flag . Bool ( "promscrape.streamParse" , false , "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful " +
"for reducing memory usage when millions of metrics are exposed per each scrape target. " +
2021-03-15 19:59:25 +00:00
"It is posible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control" )
2020-02-23 11:35:47 +00:00
)
type client struct {
2020-11-01 21:12:13 +00:00
// hc is the default client optimized for common case of scraping targets with moderate number of metrics.
2020-02-23 11:35:47 +00:00
hc * fasthttp . HostClient
2020-11-01 21:12:13 +00:00
// sc (aka `stream client`) is used instead of hc if ScrapeWork.ParseStream is set.
// It may be useful for scraping targets with millions of metrics per target.
sc * http . Client
2021-04-05 09:15:07 +00:00
scrapeURL string
scrapeTimeoutSecondsStr string
host string
requestURI string
2021-05-14 17:00:05 +00:00
getAuthHeader func ( ) string
getProxyAuthHeader func ( ) string
2021-04-05 09:15:07 +00:00
denyRedirects bool
disableCompression bool
disableKeepAlive bool
2020-02-23 11:35:47 +00:00
}
func newClient ( sw * ScrapeWork ) * client {
var u fasthttp . URI
u . Update ( sw . ScrapeURL )
host := string ( u . Host ( ) )
requestURI := string ( u . RequestURI ( ) )
isTLS := string ( u . Scheme ( ) ) == "https"
var tlsCfg * tls . Config
2021-03-09 16:54:09 +00:00
if isTLS {
2020-04-13 09:59:05 +00:00
tlsCfg = sw . AuthConfig . NewTLSConfig ( )
2020-02-23 11:35:47 +00:00
}
2021-05-14 17:00:05 +00:00
getProxyAuthHeader := func ( ) string { return "" }
2021-04-03 21:40:08 +00:00
proxyURL := sw . ProxyURL
if ! isTLS && proxyURL . IsHTTPOrHTTPS ( ) {
// Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets
// like net/http package from Go does.
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
pu := proxyURL . URL ( )
host = pu . Host
requestURI = sw . ScrapeURL
isTLS = pu . Scheme == "https"
if isTLS {
tlsCfg = sw . ProxyAuthConfig . NewTLSConfig ( )
}
2021-05-14 17:00:05 +00:00
getProxyAuthHeader = func ( ) string {
return proxyURL . GetAuthHeader ( sw . ProxyAuthConfig )
}
2021-04-03 21:40:08 +00:00
proxyURL = proxy . URL { }
}
2020-02-23 11:35:47 +00:00
if ! strings . Contains ( host , ":" ) {
if ! isTLS {
host += ":80"
} else {
host += ":443"
}
}
2021-04-03 21:40:08 +00:00
dialFunc , err := newStatDialFunc ( proxyURL , sw . ProxyAuthConfig )
2020-12-24 08:56:10 +00:00
if err != nil {
logger . Fatalf ( "cannot create dial func: %s" , err )
}
2020-02-23 11:35:47 +00:00
hc := & fasthttp . HostClient {
2020-04-29 13:20:23 +00:00
Addr : host ,
Name : "vm_promscrape" ,
2020-12-24 08:56:10 +00:00
Dial : dialFunc ,
2020-04-29 13:20:23 +00:00
IsTLS : isTLS ,
TLSConfig : tlsCfg ,
MaxIdleConnDuration : 2 * sw . ScrapeInterval ,
ReadTimeout : sw . ScrapeTimeout ,
WriteTimeout : 10 * time . Second ,
2020-08-16 14:05:52 +00:00
MaxResponseBodySize : maxScrapeSize . N ,
2020-04-29 13:20:23 +00:00
MaxIdempotentRequestAttempts : 1 ,
2020-02-23 11:35:47 +00:00
}
2020-11-01 21:12:13 +00:00
var sc * http . Client
if * streamParse || sw . StreamParse {
2021-04-03 21:40:08 +00:00
var proxyURLFunc func ( * http . Request ) ( * url . URL , error )
2020-12-24 08:56:10 +00:00
if proxyURL := sw . ProxyURL . URL ( ) ; proxyURL != nil {
2021-04-03 21:40:08 +00:00
proxyURLFunc = http . ProxyURL ( proxyURL )
2020-12-24 08:56:10 +00:00
}
2020-11-01 21:12:13 +00:00
sc = & http . Client {
Transport : & http . Transport {
TLSClientConfig : tlsCfg ,
2021-04-03 21:40:08 +00:00
Proxy : proxyURLFunc ,
2020-11-01 21:12:13 +00:00
TLSHandshakeTimeout : 10 * time . Second ,
IdleConnTimeout : 2 * sw . ScrapeInterval ,
DisableCompression : * disableCompression || sw . DisableCompression ,
DisableKeepAlives : * disableKeepAlive || sw . DisableKeepAlive ,
DialContext : statStdDial ,
2021-05-14 15:10:19 +00:00
MaxIdleConnsPerHost : 100 ,
2021-04-23 18:53:32 +00:00
// Set timeout for receiving the first response byte,
// since the duration for reading the full response can be much bigger because of stream parsing.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047
ResponseHeaderTimeout : sw . ScrapeTimeout ,
2020-11-01 21:12:13 +00:00
} ,
2021-04-23 18:53:32 +00:00
// Set 10x bigger timeout than the sw.ScrapeTimeout, since the duration for reading the full response
// can be much bigger because of stream parsing.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047
Timeout : 10 * sw . ScrapeTimeout ,
2020-11-01 21:12:13 +00:00
}
2021-04-02 16:56:38 +00:00
if sw . DenyRedirects {
sc . CheckRedirect = func ( req * http . Request , via [ ] * http . Request ) error {
return http . ErrUseLastResponse
}
}
2020-11-01 21:12:13 +00:00
}
2020-02-23 11:35:47 +00:00
return & client {
2021-04-05 09:15:07 +00:00
hc : hc ,
sc : sc ,
scrapeURL : sw . ScrapeURL ,
scrapeTimeoutSecondsStr : fmt . Sprintf ( "%.3f" , sw . ScrapeTimeout . Seconds ( ) ) ,
host : host ,
requestURI : requestURI ,
2021-05-14 17:00:05 +00:00
getAuthHeader : sw . AuthConfig . GetAuthHeader ,
getProxyAuthHeader : getProxyAuthHeader ,
2021-04-05 09:15:07 +00:00
denyRedirects : sw . DenyRedirects ,
disableCompression : sw . DisableCompression ,
disableKeepAlive : sw . DisableKeepAlive ,
2020-02-23 11:35:47 +00:00
}
}
2020-11-01 21:12:13 +00:00
func ( c * client ) GetStreamReader ( ) ( * streamReader , error ) {
deadline := time . Now ( ) . Add ( c . hc . ReadTimeout )
ctx , cancel := context . WithDeadline ( context . Background ( ) , deadline )
req , err := http . NewRequestWithContext ( ctx , "GET" , c . scrapeURL , nil )
if err != nil {
cancel ( )
return nil , fmt . Errorf ( "cannot create request for %q: %w" , c . scrapeURL , err )
}
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req . Header . Set ( "Accept" , "text/plain;version=0.0.4;q=1,*/*;q=0.1" )
2021-04-05 09:15:07 +00:00
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req . Header . Set ( "X-Prometheus-Scrape-Timeout-Seconds" , c . scrapeTimeoutSecondsStr )
2021-05-14 17:00:05 +00:00
if ah := c . getAuthHeader ( ) ; ah != "" {
req . Header . Set ( "Authorization" , ah )
2020-11-01 21:12:13 +00:00
}
2021-05-14 17:00:05 +00:00
if ah := c . getProxyAuthHeader ( ) ; ah != "" {
req . Header . Set ( "Proxy-Authorization" , ah )
2021-04-03 21:40:08 +00:00
}
2020-11-01 21:12:13 +00:00
resp , err := c . sc . Do ( req )
if err != nil {
cancel ( )
return nil , fmt . Errorf ( "cannot scrape %q: %w" , c . scrapeURL , err )
}
if resp . StatusCode != http . StatusOK {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_scrapes_total { status_code="%d"} ` , resp . StatusCode ) ) . Inc ( )
respBody , _ := ioutil . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
cancel ( )
return nil , fmt . Errorf ( "unexpected status code returned when scraping %q: %d; expecting %d; response body: %q" ,
c . scrapeURL , resp . StatusCode , http . StatusOK , respBody )
}
scrapesOK . Inc ( )
return & streamReader {
r : resp . Body ,
cancel : cancel ,
} , nil
}
2020-02-23 11:35:47 +00:00
func ( c * client ) ReadData ( dst [ ] byte ) ( [ ] byte , error ) {
2020-07-03 18:33:17 +00:00
deadline := time . Now ( ) . Add ( c . hc . ReadTimeout )
2020-02-23 11:35:47 +00:00
req := fasthttp . AcquireRequest ( )
req . SetRequestURI ( c . requestURI )
2021-04-03 22:18:24 +00:00
req . Header . SetHost ( c . host )
2020-07-08 16:47:08 +00:00
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req . Header . Set ( "Accept" , "text/plain;version=0.0.4;q=1,*/*;q=0.1" )
2021-04-05 09:15:07 +00:00
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req . Header . Set ( "X-Prometheus-Scrape-Timeout-Seconds" , c . scrapeTimeoutSecondsStr )
2021-05-14 17:00:05 +00:00
if ah := c . getAuthHeader ( ) ; ah != "" {
req . Header . Set ( "Authorization" , ah )
2021-04-03 21:40:08 +00:00
}
2021-05-14 17:00:05 +00:00
if ah := c . getProxyAuthHeader ( ) ; ah != "" {
req . Header . Set ( "Proxy-Authorization" , ah )
2021-04-03 21:40:08 +00:00
}
2020-11-01 21:12:13 +00:00
if ! * disableCompression && ! c . disableCompression {
2020-02-23 11:35:47 +00:00
req . Header . Set ( "Accept-Encoding" , "gzip" )
}
2020-07-02 11:19:11 +00:00
if * disableKeepAlive || c . disableKeepAlive {
2020-06-30 23:19:58 +00:00
req . SetConnectionClose ( )
}
2020-02-23 11:35:47 +00:00
resp := fasthttp . AcquireResponse ( )
2020-09-18 09:31:16 +00:00
swapResponseBodies := len ( dst ) == 0
if swapResponseBodies {
// An optimization: write response directly to dst.
// This should reduce memory uage when scraping big targets.
dst = resp . SwapBody ( dst )
}
2020-07-03 18:33:17 +00:00
err := doRequestWithPossibleRetry ( c . hc , req , resp , deadline )
2020-04-27 23:13:02 +00:00
statusCode := resp . StatusCode ( )
2020-07-03 15:36:21 +00:00
if err == nil && ( statusCode == fasthttp . StatusMovedPermanently || statusCode == fasthttp . StatusFound ) {
2021-04-02 16:56:38 +00:00
if c . denyRedirects {
err = fmt . Errorf ( "cannot follow redirects if `follow_redirects: false` is set" )
} else {
// Allow a single redirect.
// It is expected that the redirect is made on the same host.
// Otherwise it won't work.
if location := resp . Header . Peek ( "Location" ) ; len ( location ) > 0 {
req . URI ( ) . UpdateBytes ( location )
err = c . hc . DoDeadline ( req , resp , deadline )
statusCode = resp . StatusCode ( )
}
2020-04-27 23:13:02 +00:00
}
}
2020-11-01 23:09:37 +00:00
if swapResponseBodies {
dst = resp . SwapBody ( dst )
}
2020-02-23 11:35:47 +00:00
fasthttp . ReleaseRequest ( req )
if err != nil {
fasthttp . ReleaseResponse ( resp )
if err == fasthttp . ErrTimeout {
scrapesTimedout . Inc ( )
2020-06-30 19:58:18 +00:00
return dst , fmt . Errorf ( "error when scraping %q with timeout %s: %w" , c . scrapeURL , c . hc . ReadTimeout , err )
2020-02-23 11:35:47 +00:00
}
2020-05-24 11:41:08 +00:00
if err == fasthttp . ErrBodyTooLarge {
return dst , fmt . Errorf ( "the response from %q exceeds -promscrape.maxScrapeSize=%d; " +
2020-08-16 14:05:52 +00:00
"either reduce the response size for the target or increase -promscrape.maxScrapeSize" , c . scrapeURL , maxScrapeSize . N )
2020-05-24 11:41:08 +00:00
}
2020-06-30 19:58:18 +00:00
return dst , fmt . Errorf ( "error when scraping %q: %w" , c . scrapeURL , err )
2020-02-23 11:35:47 +00:00
}
if ce := resp . Header . Peek ( "Content-Encoding" ) ; string ( ce ) == "gzip" {
var err error
2020-09-18 09:31:16 +00:00
if swapResponseBodies {
2020-11-26 16:08:39 +00:00
zb := gunzipBufPool . Get ( )
zb . B , err = fasthttp . AppendGunzipBytes ( zb . B [ : 0 ] , dst )
dst = append ( dst [ : 0 ] , zb . B ... )
gunzipBufPool . Put ( zb )
2020-09-18 09:31:16 +00:00
} else {
2020-11-26 16:08:39 +00:00
dst , err = fasthttp . AppendGunzipBytes ( dst , resp . Body ( ) )
2020-09-18 09:31:16 +00:00
}
2020-02-23 11:35:47 +00:00
if err != nil {
fasthttp . ReleaseResponse ( resp )
scrapesGunzipFailed . Inc ( )
2020-06-30 19:58:18 +00:00
return dst , fmt . Errorf ( "cannot ungzip response from %q: %w" , c . scrapeURL , err )
2020-02-23 11:35:47 +00:00
}
scrapesGunzipped . Inc ( )
2020-09-18 09:31:16 +00:00
} else if ! swapResponseBodies {
2020-02-23 11:35:47 +00:00
dst = append ( dst , resp . Body ( ) ... )
}
2020-11-26 11:25:00 +00:00
fasthttp . ReleaseResponse ( resp )
2020-02-23 11:35:47 +00:00
if statusCode != fasthttp . StatusOK {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_scrapes_total { status_code="%d"} ` , statusCode ) ) . Inc ( )
return dst , fmt . Errorf ( "unexpected status code returned when scraping %q: %d; expecting %d; response body: %q" ,
2020-10-29 14:17:52 +00:00
c . scrapeURL , statusCode , fasthttp . StatusOK , dst )
2020-02-23 11:35:47 +00:00
}
scrapesOK . Inc ( )
return dst , nil
}
2020-11-26 16:08:39 +00:00
var gunzipBufPool bytesutil . ByteBufferPool
2020-02-23 11:35:47 +00:00
var (
scrapesTimedout = metrics . NewCounter ( ` vm_promscrape_scrapes_timed_out_total ` )
scrapesOK = metrics . NewCounter ( ` vm_promscrape_scrapes_total { status_code="200"} ` )
scrapesGunzipped = metrics . NewCounter ( ` vm_promscrape_scrapes_gunziped_total ` )
scrapesGunzipFailed = metrics . NewCounter ( ` vm_promscrape_scrapes_gunzip_failed_total ` )
2021-02-01 18:02:51 +00:00
scrapeRetries = metrics . NewCounter ( ` vm_promscrape_scrape_retries_total ` )
2020-02-23 11:35:47 +00:00
)
2020-04-16 20:24:33 +00:00
2020-07-03 18:33:17 +00:00
func doRequestWithPossibleRetry ( hc * fasthttp . HostClient , req * fasthttp . Request , resp * fasthttp . Response , deadline time . Time ) error {
2021-05-13 07:38:43 +00:00
sleepTime := time . Second
2020-08-13 19:31:42 +00:00
for {
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline
// across multiple retries.
err := hc . DoDeadline ( req , resp , deadline )
if err == nil {
return nil
}
2021-01-22 11:22:20 +00:00
if err != fasthttp . ErrConnectionClosed && ! strings . Contains ( err . Error ( ) , "broken pipe" ) {
2020-08-13 19:31:42 +00:00
return err
}
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
2021-05-13 07:38:43 +00:00
maxSleepTime := time . Until ( deadline )
if sleepTime > maxSleepTime {
2020-08-13 19:31:42 +00:00
return fmt . Errorf ( "the server closes all the connection attempts: %w" , err )
}
2021-05-13 07:38:43 +00:00
sleepTime += sleepTime
if sleepTime > maxSleepTime {
2021-05-13 07:41:40 +00:00
sleepTime = maxSleepTime
2021-05-13 07:38:43 +00:00
}
time . Sleep ( sleepTime )
2021-02-01 18:02:51 +00:00
scrapeRetries . Inc ( )
2020-06-23 09:25:02 +00:00
}
2020-04-16 20:24:33 +00:00
}
2020-11-01 21:12:13 +00:00
type streamReader struct {
r io . ReadCloser
cancel context . CancelFunc
bytesRead int64
}
func ( sr * streamReader ) Read ( p [ ] byte ) ( int , error ) {
n , err := sr . r . Read ( p )
sr . bytesRead += int64 ( n )
return n , err
}
func ( sr * streamReader ) MustClose ( ) {
sr . cancel ( )
if err := sr . r . Close ( ) ; err != nil {
logger . Errorf ( "cannot close reader: %s" , err )
}
}