2020-02-23 11:35:47 +00:00
package promscrape
import (
2020-11-01 21:12:13 +00:00
"context"
2020-02-23 11:35:47 +00:00
"crypto/tls"
"flag"
"fmt"
2020-11-01 21:12:13 +00:00
"io"
"io/ioutil"
"net/http"
2020-12-24 08:56:10 +00:00
"net/url"
2020-02-23 11:35:47 +00:00
"strings"
"time"
2020-11-26 16:08:39 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-08-16 14:05:52 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-11-01 21:12:13 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-04-29 13:20:23 +00:00
"github.com/VictoriaMetrics/fasthttp"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
2020-08-16 14:05:52 +00:00
maxScrapeSize = flagutil . NewBytes ( "promscrape.maxScrapeSize" , 16 * 1024 * 1024 , "The maximum size of scrape response in bytes to process from Prometheus targets. " +
2020-02-23 11:35:47 +00:00
"Bigger responses are rejected" )
2020-07-02 11:19:11 +00:00
disableCompression = flag . Bool ( "promscrape.disableCompression" , false , "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. " +
"This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. " +
"It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control" )
disableKeepAlive = flag . Bool ( "promscrape.disableKeepAlive" , false , "Whether to disable HTTP keep-alive connections when scraping all the targets. " +
"This may be useful when targets has no support for HTTP keep-alive connection. " +
"It is possible to set `disable_keepalive: true` individually per each 'scrape_config` section in '-promscrape.config' for fine grained control. " +
"Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets" )
2020-11-01 21:12:13 +00:00
streamParse = flag . Bool ( "promscrape.streamParse" , false , "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful " +
"for reducing memory usage when millions of metrics are exposed per each scrape target. " +
"It is posible to set `stream_parse: true` individually per each `scrape_config` section in `-promscrape.config` for fine grained control" )
2020-02-23 11:35:47 +00:00
)
type client struct {
2020-11-01 21:12:13 +00:00
// hc is the default client optimized for common case of scraping targets with moderate number of metrics.
2020-02-23 11:35:47 +00:00
hc * fasthttp . HostClient
2020-11-01 21:12:13 +00:00
// sc (aka `stream client`) is used instead of hc if ScrapeWork.ParseStream is set.
// It may be useful for scraping targets with millions of metrics per target.
sc * http . Client
2020-07-02 11:19:11 +00:00
scrapeURL string
host string
requestURI string
authHeader string
disableCompression bool
disableKeepAlive bool
2020-02-23 11:35:47 +00:00
}
func newClient ( sw * ScrapeWork ) * client {
var u fasthttp . URI
u . Update ( sw . ScrapeURL )
host := string ( u . Host ( ) )
requestURI := string ( u . RequestURI ( ) )
isTLS := string ( u . Scheme ( ) ) == "https"
var tlsCfg * tls . Config
if isTLS {
2020-04-13 09:59:05 +00:00
tlsCfg = sw . AuthConfig . NewTLSConfig ( )
2020-02-23 11:35:47 +00:00
}
if ! strings . Contains ( host , ":" ) {
if ! isTLS {
host += ":80"
} else {
host += ":443"
}
}
2020-12-24 08:56:10 +00:00
dialFunc , err := newStatDialFunc ( sw . ProxyURL , tlsCfg )
if err != nil {
logger . Fatalf ( "cannot create dial func: %s" , err )
}
2020-02-23 11:35:47 +00:00
hc := & fasthttp . HostClient {
2020-04-29 13:20:23 +00:00
Addr : host ,
Name : "vm_promscrape" ,
2020-12-24 08:56:10 +00:00
Dial : dialFunc ,
2020-04-29 13:20:23 +00:00
IsTLS : isTLS ,
TLSConfig : tlsCfg ,
MaxIdleConnDuration : 2 * sw . ScrapeInterval ,
ReadTimeout : sw . ScrapeTimeout ,
WriteTimeout : 10 * time . Second ,
2020-08-16 14:05:52 +00:00
MaxResponseBodySize : maxScrapeSize . N ,
2020-04-29 13:20:23 +00:00
MaxIdempotentRequestAttempts : 1 ,
2020-02-23 11:35:47 +00:00
}
2020-11-01 21:12:13 +00:00
var sc * http . Client
if * streamParse || sw . StreamParse {
2020-12-24 08:56:10 +00:00
var proxy func ( * http . Request ) ( * url . URL , error )
if proxyURL := sw . ProxyURL . URL ( ) ; proxyURL != nil {
proxy = http . ProxyURL ( proxyURL )
}
2020-11-01 21:12:13 +00:00
sc = & http . Client {
Transport : & http . Transport {
TLSClientConfig : tlsCfg ,
2020-12-24 08:56:10 +00:00
Proxy : proxy ,
2020-11-01 21:12:13 +00:00
TLSHandshakeTimeout : 10 * time . Second ,
IdleConnTimeout : 2 * sw . ScrapeInterval ,
DisableCompression : * disableCompression || sw . DisableCompression ,
DisableKeepAlives : * disableKeepAlive || sw . DisableKeepAlive ,
DialContext : statStdDial ,
} ,
Timeout : sw . ScrapeTimeout ,
}
}
2020-02-23 11:35:47 +00:00
return & client {
2020-12-24 08:52:37 +00:00
hc : hc ,
sc : sc ,
2020-07-02 11:19:11 +00:00
scrapeURL : sw . ScrapeURL ,
host : host ,
requestURI : requestURI ,
authHeader : sw . AuthConfig . Authorization ,
disableCompression : sw . DisableCompression ,
disableKeepAlive : sw . DisableKeepAlive ,
2020-02-23 11:35:47 +00:00
}
}
2020-11-01 21:12:13 +00:00
func ( c * client ) GetStreamReader ( ) ( * streamReader , error ) {
deadline := time . Now ( ) . Add ( c . hc . ReadTimeout )
ctx , cancel := context . WithDeadline ( context . Background ( ) , deadline )
req , err := http . NewRequestWithContext ( ctx , "GET" , c . scrapeURL , nil )
if err != nil {
cancel ( )
return nil , fmt . Errorf ( "cannot create request for %q: %w" , c . scrapeURL , err )
}
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req . Header . Set ( "Accept" , "text/plain;version=0.0.4;q=1,*/*;q=0.1" )
if c . authHeader != "" {
req . Header . Set ( "Authorization" , c . authHeader )
}
resp , err := c . sc . Do ( req )
if err != nil {
cancel ( )
return nil , fmt . Errorf ( "cannot scrape %q: %w" , c . scrapeURL , err )
}
if resp . StatusCode != http . StatusOK {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_scrapes_total { status_code="%d"} ` , resp . StatusCode ) ) . Inc ( )
respBody , _ := ioutil . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
cancel ( )
return nil , fmt . Errorf ( "unexpected status code returned when scraping %q: %d; expecting %d; response body: %q" ,
c . scrapeURL , resp . StatusCode , http . StatusOK , respBody )
}
scrapesOK . Inc ( )
return & streamReader {
r : resp . Body ,
cancel : cancel ,
} , nil
}
2020-02-23 11:35:47 +00:00
func ( c * client ) ReadData ( dst [ ] byte ) ( [ ] byte , error ) {
2020-07-03 18:33:17 +00:00
deadline := time . Now ( ) . Add ( c . hc . ReadTimeout )
2020-02-23 11:35:47 +00:00
req := fasthttp . AcquireRequest ( )
req . SetRequestURI ( c . requestURI )
req . SetHost ( c . host )
2020-07-08 16:47:08 +00:00
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req . Header . Set ( "Accept" , "text/plain;version=0.0.4;q=1,*/*;q=0.1" )
2020-11-01 21:12:13 +00:00
if ! * disableCompression && ! c . disableCompression {
2020-02-23 11:35:47 +00:00
req . Header . Set ( "Accept-Encoding" , "gzip" )
}
2020-07-02 11:19:11 +00:00
if * disableKeepAlive || c . disableKeepAlive {
2020-06-30 23:19:58 +00:00
req . SetConnectionClose ( )
}
2020-02-23 11:35:47 +00:00
if c . authHeader != "" {
req . Header . Set ( "Authorization" , c . authHeader )
}
resp := fasthttp . AcquireResponse ( )
2020-09-18 09:31:16 +00:00
swapResponseBodies := len ( dst ) == 0
if swapResponseBodies {
// An optimization: write response directly to dst.
// This should reduce memory uage when scraping big targets.
dst = resp . SwapBody ( dst )
}
2020-07-03 18:33:17 +00:00
err := doRequestWithPossibleRetry ( c . hc , req , resp , deadline )
2020-04-27 23:13:02 +00:00
statusCode := resp . StatusCode ( )
2020-07-03 15:36:21 +00:00
if err == nil && ( statusCode == fasthttp . StatusMovedPermanently || statusCode == fasthttp . StatusFound ) {
2020-04-27 23:13:02 +00:00
// Allow a single redirect.
// It is expected that the redirect is made on the same host.
// Otherwise it won't work.
if location := resp . Header . Peek ( "Location" ) ; len ( location ) > 0 {
req . URI ( ) . UpdateBytes ( location )
2020-07-03 18:33:17 +00:00
err = c . hc . DoDeadline ( req , resp , deadline )
2020-04-27 23:13:02 +00:00
statusCode = resp . StatusCode ( )
}
}
2020-11-01 23:09:37 +00:00
if swapResponseBodies {
dst = resp . SwapBody ( dst )
}
2020-02-23 11:35:47 +00:00
fasthttp . ReleaseRequest ( req )
if err != nil {
fasthttp . ReleaseResponse ( resp )
if err == fasthttp . ErrTimeout {
scrapesTimedout . Inc ( )
2020-06-30 19:58:18 +00:00
return dst , fmt . Errorf ( "error when scraping %q with timeout %s: %w" , c . scrapeURL , c . hc . ReadTimeout , err )
2020-02-23 11:35:47 +00:00
}
2020-05-24 11:41:08 +00:00
if err == fasthttp . ErrBodyTooLarge {
return dst , fmt . Errorf ( "the response from %q exceeds -promscrape.maxScrapeSize=%d; " +
2020-08-16 14:05:52 +00:00
"either reduce the response size for the target or increase -promscrape.maxScrapeSize" , c . scrapeURL , maxScrapeSize . N )
2020-05-24 11:41:08 +00:00
}
2020-06-30 19:58:18 +00:00
return dst , fmt . Errorf ( "error when scraping %q: %w" , c . scrapeURL , err )
2020-02-23 11:35:47 +00:00
}
if ce := resp . Header . Peek ( "Content-Encoding" ) ; string ( ce ) == "gzip" {
var err error
2020-09-18 09:31:16 +00:00
if swapResponseBodies {
2020-11-26 16:08:39 +00:00
zb := gunzipBufPool . Get ( )
zb . B , err = fasthttp . AppendGunzipBytes ( zb . B [ : 0 ] , dst )
dst = append ( dst [ : 0 ] , zb . B ... )
gunzipBufPool . Put ( zb )
2020-09-18 09:31:16 +00:00
} else {
2020-11-26 16:08:39 +00:00
dst , err = fasthttp . AppendGunzipBytes ( dst , resp . Body ( ) )
2020-09-18 09:31:16 +00:00
}
2020-02-23 11:35:47 +00:00
if err != nil {
fasthttp . ReleaseResponse ( resp )
scrapesGunzipFailed . Inc ( )
2020-06-30 19:58:18 +00:00
return dst , fmt . Errorf ( "cannot ungzip response from %q: %w" , c . scrapeURL , err )
2020-02-23 11:35:47 +00:00
}
scrapesGunzipped . Inc ( )
2020-09-18 09:31:16 +00:00
} else if ! swapResponseBodies {
2020-02-23 11:35:47 +00:00
dst = append ( dst , resp . Body ( ) ... )
}
2020-11-26 11:25:00 +00:00
fasthttp . ReleaseResponse ( resp )
2020-02-23 11:35:47 +00:00
if statusCode != fasthttp . StatusOK {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_promscrape_scrapes_total { status_code="%d"} ` , statusCode ) ) . Inc ( )
return dst , fmt . Errorf ( "unexpected status code returned when scraping %q: %d; expecting %d; response body: %q" ,
2020-10-29 14:17:52 +00:00
c . scrapeURL , statusCode , fasthttp . StatusOK , dst )
2020-02-23 11:35:47 +00:00
}
scrapesOK . Inc ( )
return dst , nil
}
2020-11-26 16:08:39 +00:00
var gunzipBufPool bytesutil . ByteBufferPool
2020-02-23 11:35:47 +00:00
var (
scrapesTimedout = metrics . NewCounter ( ` vm_promscrape_scrapes_timed_out_total ` )
scrapesOK = metrics . NewCounter ( ` vm_promscrape_scrapes_total { status_code="200"} ` )
scrapesGunzipped = metrics . NewCounter ( ` vm_promscrape_scrapes_gunziped_total ` )
scrapesGunzipFailed = metrics . NewCounter ( ` vm_promscrape_scrapes_gunzip_failed_total ` )
)
2020-04-16 20:24:33 +00:00
2020-07-03 18:33:17 +00:00
func doRequestWithPossibleRetry ( hc * fasthttp . HostClient , req * fasthttp . Request , resp * fasthttp . Response , deadline time . Time ) error {
2020-08-13 19:31:42 +00:00
for {
// Use DoDeadline instead of Do even if hc.ReadTimeout is already set in order to guarantee the given deadline
// across multiple retries.
err := hc . DoDeadline ( req , resp , deadline )
if err == nil {
return nil
}
if err != fasthttp . ErrConnectionClosed {
return err
}
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
if time . Since ( deadline ) >= 0 {
return fmt . Errorf ( "the server closes all the connection attempts: %w" , err )
}
2020-06-23 09:25:02 +00:00
}
2020-04-16 20:24:33 +00:00
}
2020-11-01 21:12:13 +00:00
type streamReader struct {
r io . ReadCloser
cancel context . CancelFunc
bytesRead int64
}
func ( sr * streamReader ) Read ( p [ ] byte ) ( int , error ) {
n , err := sr . r . Read ( p )
sr . bytesRead += int64 ( n )
return n , err
}
func ( sr * streamReader ) MustClose ( ) {
sr . cancel ( )
if err := sr . r . Close ( ) ; err != nil {
logger . Errorf ( "cannot close reader: %s" , err )
}
}