package discoveryutils import ( "crypto/tls" "flag" "fmt" "net" "net/http" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/fasthttp" ) var ( maxConcurrency = flag.Int("promscrape.discovery.concurrency", 500, "The maximum number of concurrent requests to Prometheus autodiscovery API (Consul, Kubernetes, etc.)") maxWaitTime = flag.Duration("promscrape.discovery.concurrentWaitTime", time.Minute, "The maximum duration for waiting to perform API requests "+ "if more than -promscrape.discovery.concurrency requests are simultaneously performed") ) var defaultClient = &http.Client{ Timeout: 30 * time.Second, } // GetHTTPClient returns default client for http API requests. func GetHTTPClient() *http.Client { return defaultClient } // Client is http client, which talks to the given apiServer. type Client struct { hc *fasthttp.HostClient ac *promauth.Config apiServer string hostPort string } // NewClient returns new Client for the given apiServer and the given ac. func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { var u fasthttp.URI u.Update(apiServer) hostPort := string(u.Host()) isTLS := string(u.Scheme()) == "https" var tlsCfg *tls.Config if isTLS && ac != nil { tlsCfg = ac.NewTLSConfig() } if !strings.Contains(hostPort, ":") { port := "80" if isTLS { port = "443" } hostPort = net.JoinHostPort(hostPort, port) } hc := &fasthttp.HostClient{ Addr: hostPort, Name: "vm_promscrape/discovery", DialDualStack: netutil.TCP6Enabled(), IsTLS: isTLS, TLSConfig: tlsCfg, ReadTimeout: time.Minute, WriteTimeout: 10 * time.Second, MaxResponseBodySize: 300 * 1024 * 1024, MaxConns: *maxConcurrency, } return &Client{ hc: hc, ac: ac, apiServer: apiServer, hostPort: hostPort, }, nil } var ( concurrencyLimitCh chan struct{} concurrencyLimitChOnce sync.Once ) func concurrencyLimitChInit() { concurrencyLimitCh = make(chan struct{}, *maxConcurrency) } // GetAPIResponse returns response for the given absolute path. func (c *Client) GetAPIResponse(path string) ([]byte, error) { // Limit the number of concurrent API requests. concurrencyLimitChOnce.Do(concurrencyLimitChInit) t := timerpool.Get(*maxWaitTime) select { case concurrencyLimitCh <- struct{}{}: timerpool.Put(t) case <-t.C: timerpool.Put(t) return nil, fmt.Errorf("too many outstanding requests to %q; try increasing -promscrape.discovery.concurrentWaitTime=%s or -promscrape.discovery.concurrency=%d", c.apiServer, *maxWaitTime, *maxConcurrency) } defer func() { <-concurrencyLimitCh }() requestURL := c.apiServer + path var u fasthttp.URI u.Update(requestURL) var req fasthttp.Request req.SetRequestURIBytes(u.RequestURI()) req.SetHost(c.hostPort) req.Header.Set("Accept-Encoding", "gzip") if c.ac != nil && c.ac.Authorization != "" { req.Header.Set("Authorization", c.ac.Authorization) } var resp fasthttp.Response // There is no need in calling DoTimeout, since the timeout is already set in c.hc.ReadTimeout above. if err := c.hc.Do(&req, &resp); err != nil { return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err) } var data []byte if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" { dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body()) if err != nil { return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err) } data = dst } else { data = append(data[:0], resp.Body()...) } statusCode := resp.StatusCode() if statusCode != fasthttp.StatusOK { return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q", requestURL, statusCode, fasthttp.StatusOK, data) } return data, nil }