mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent: follow-up for b3b29ba6ac
- Automatically reload changed TLS root CA pointed by -remoteWrite.tlsCAFile command-line flag - Automatically reload changed TLS root CA configured via oauth2.tsl_config.ca_file option at -promscrape.config - Document the change as a feature instead of a bug at docs/CHANGELOG.md - Simplify the code at lib/promauth, which is responsible for reloading changed TLS root CA files. - Simplify the usage of lib/promauth.Config.NewRoundTripper() - now it accepts the base http.Transport instead of a callback, which can change the internal http.Transport. - Reuse the default tls config if lib/promauth.Config doesn't contain tls-specific configs. This should reduce memory usage a bit when tls isn't used for scraping big number of targets. - Do not re-read TLS root CA files on every processed request. Re-read them once per second. This should reduce CPU usage when scraping big number of targets over https. - Do not store cert.pem and key.pem files in TestTLSConfigWithCertificatesFilesUpdate, since they can be loaded from byte slices via crypto/tls.X509KeyPair(). - Remove obsolete comparisons of string representations for authConfig and proxyAuthConfig at areEqualScrapeConfigs(). Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5725 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5526 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2171
This commit is contained in:
parent
b958fb1e76
commit
967d5496cf
10 changed files with 178 additions and 365 deletions
|
@ -113,17 +113,12 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot initialize auth config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
logger.Fatalf("cannot initialize auth config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||||
}
|
}
|
||||||
tlsCfg, err := authCfg.NewTLSConfig()
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatalf("cannot initialize tls config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
|
||||||
}
|
|
||||||
awsCfg, err := getAWSAPIConfig(argIdx)
|
awsCfg, err := getAWSAPIConfig(argIdx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||||
}
|
}
|
||||||
tr := &http.Transport{
|
tr := &http.Transport{
|
||||||
DialContext: statDial,
|
DialContext: statDial,
|
||||||
TLSClientConfig: tlsCfg,
|
|
||||||
TLSHandshakeTimeout: tlsHandshakeTimeout.GetOptionalArg(argIdx),
|
TLSHandshakeTimeout: tlsHandshakeTimeout.GetOptionalArg(argIdx),
|
||||||
MaxConnsPerHost: 2 * concurrency,
|
MaxConnsPerHost: 2 * concurrency,
|
||||||
MaxIdleConnsPerHost: 2 * concurrency,
|
MaxIdleConnsPerHost: 2 * concurrency,
|
||||||
|
@ -142,7 +137,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||||
tr.Proxy = http.ProxyURL(pu)
|
tr.Proxy = http.ProxyURL(pu)
|
||||||
}
|
}
|
||||||
hc := &http.Client{
|
hc := &http.Client{
|
||||||
Transport: tr,
|
Transport: authCfg.NewRoundTripper(tr),
|
||||||
Timeout: sendTimeout.GetOptionalArg(argIdx),
|
Timeout: sendTimeout.GetOptionalArg(argIdx),
|
||||||
}
|
}
|
||||||
c := &client{
|
c := &client{
|
||||||
|
|
|
@ -56,6 +56,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): ability to limit the ingestion rate via `-maxIngestionRate` command-line flag. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5900).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): ability to limit the ingestion rate via `-maxIngestionRate` command-line flag. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5900).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): use the provided `-remoteWrite.tlsServerName` as `Host` header in requests to `-remoteWrite.url`. This allows sending data to https remote storage by IP address instead of hostname. Thanks to @minor-fixes for initial idea and [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5802).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): use the provided `-remoteWrite.tlsServerName` as `Host` header in requests to `-remoteWrite.url`. This allows sending data to https remote storage by IP address instead of hostname. Thanks to @minor-fixes for initial idea and [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5802).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.shardByURL.ignoreLabels` command-line flag, which can be used for specifying the ignored list of labels when [sharding by `-remoteWrite.url` is enabled](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages). Thanks to @edma2 for the idea and [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5938).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.shardByURL.ignoreLabels` command-line flag, which can be used for specifying the ignored list of labels when [sharding by `-remoteWrite.url` is enabled](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages). Thanks to @edma2 for the idea and [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5938).
|
||||||
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically reload updated root CA certificates from files without the need to restart `vmagent`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5526).
|
||||||
* FEATURE: optimize [`/api/v1/labels`](https://docs.victoriametrics.com/url-examples/#apiv1labels) and [`/api/v1/label/.../values`](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) when `match[]` filters contains metric name. For example, `/api/v1/label/instance/values?match[]=up` now works much faster than before. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5055).
|
* FEATURE: optimize [`/api/v1/labels`](https://docs.victoriametrics.com/url-examples/#apiv1labels) and [`/api/v1/label/.../values`](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) when `match[]` filters contains metric name. For example, `/api/v1/label/instance/values?match[]=up` now works much faster than before. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5055).
|
||||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [native protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5824).
|
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [native protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5824).
|
||||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
|
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
|
||||||
|
@ -65,7 +66,6 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* FEATURE: [OpenTelemetry](https://docs.victoriametrics.com/#sending-data-via-opentelemetry): add `-opentelemetry.usePrometheusNaming` command-line flag, which can be used for enabling automatic conversion of the ingested metric names and labels into Prometheus-compatible format. See [these docs](https://docs.victoriametrics.com/#sending-data-via-opentelemetry) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6037).
|
* FEATURE: [OpenTelemetry](https://docs.victoriametrics.com/#sending-data-via-opentelemetry): add `-opentelemetry.usePrometheusNaming` command-line flag, which can be used for enabling automatic conversion of the ingested metric names and labels into Prometheus-compatible format. See [these docs](https://docs.victoriametrics.com/#sending-data-via-opentelemetry) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6037).
|
||||||
|
|
||||||
* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
|
* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
|
||||||
* BUGFIX: properly reload root CA certificates from files for [service discovery](https://docs.victoriametrics.com/sd_configs/) and [scraping requests in stream parsing mode](https://docs.victoriametrics.com/vmagent/?highlight=stream&highlight=parse#stream-parsing-mode). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5526).
|
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case.
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case.
|
||||||
* BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests.
|
* BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests.
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package promauth
|
package promauth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
|
@ -18,7 +17,6 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -235,15 +233,10 @@ func urlValuesFromMap(m map[string]string) url.Values {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oi *oauth2ConfigInternal) initTokenSource() error {
|
func (oi *oauth2ConfigInternal) initTokenSource() error {
|
||||||
tlsCfg, err := oi.ac.NewTLSConfig()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot initialize TLS config for OAuth2: %w", err)
|
|
||||||
}
|
|
||||||
c := &http.Client{
|
c := &http.Client{
|
||||||
Transport: &http.Transport{
|
Transport: oi.ac.NewRoundTripper(&http.Transport{
|
||||||
TLSClientConfig: tlsCfg,
|
Proxy: oi.proxyURLFunc,
|
||||||
Proxy: oi.proxyURLFunc,
|
}),
|
||||||
},
|
|
||||||
}
|
}
|
||||||
oi.ctx = context.WithValue(context.Background(), oauth2.HTTPClient, c)
|
oi.ctx = context.WithValue(context.Background(), oauth2.HTTPClient, c)
|
||||||
oi.tokenSource = oi.cfg.TokenSource(oi.ctx)
|
oi.tokenSource = oi.cfg.TokenSource(oi.ctx)
|
||||||
|
@ -281,8 +274,10 @@ type Config struct {
|
||||||
tlsInsecureSkipVerify bool
|
tlsInsecureSkipVerify bool
|
||||||
tlsMinVersion uint16
|
tlsMinVersion uint16
|
||||||
|
|
||||||
getTLSRootCACached getTLSRootCAFunc
|
getTLSConfigCached getTLSConfigFunc
|
||||||
tlsRootCADigest string
|
|
||||||
|
getTLSRootCA getTLSRootCAFunc
|
||||||
|
tlsRootCADigest string
|
||||||
|
|
||||||
getTLSCertCached getTLSCertFunc
|
getTLSCertCached getTLSCertFunc
|
||||||
tlsCertDigest string
|
tlsCertDigest string
|
||||||
|
@ -292,8 +287,6 @@ type Config struct {
|
||||||
|
|
||||||
headers []keyValue
|
headers []keyValue
|
||||||
headersDigest string
|
headersDigest string
|
||||||
|
|
||||||
tctx *tlsContext
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type keyValue struct {
|
type keyValue struct {
|
||||||
|
@ -376,8 +369,6 @@ func (ac *Config) String() string {
|
||||||
ac.authHeaderDigest, ac.headersDigest, ac.tlsRootCADigest, ac.tlsCertDigest, ac.tlsServerName, ac.tlsInsecureSkipVerify, ac.tlsMinVersion)
|
ac.authHeaderDigest, ac.headersDigest, ac.tlsRootCADigest, ac.tlsCertDigest, ac.tlsServerName, ac.tlsInsecureSkipVerify, ac.tlsMinVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
const tlsCertsCacheSeconds = 1
|
|
||||||
|
|
||||||
// getAuthHeaderFunc must return <value> for 'Authorization: <value>' http request header
|
// getAuthHeaderFunc must return <value> for 'Authorization: <value>' http request header
|
||||||
type getAuthHeaderFunc func() (string, error)
|
type getAuthHeaderFunc func() (string, error)
|
||||||
|
|
||||||
|
@ -397,7 +388,7 @@ func newGetAuthHeaderCached(getAuthHeader getAuthHeaderFunc) getAuthHeaderFunc {
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
if fasttime.UnixTimestamp() > deadline {
|
if fasttime.UnixTimestamp() > deadline {
|
||||||
ah, err = getAuthHeader()
|
ah, err = getAuthHeader()
|
||||||
deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
|
deadline = fasttime.UnixTimestamp() + 1
|
||||||
}
|
}
|
||||||
return ah, err
|
return ah, err
|
||||||
}
|
}
|
||||||
|
@ -405,24 +396,22 @@ func newGetAuthHeaderCached(getAuthHeader getAuthHeaderFunc) getAuthHeaderFunc {
|
||||||
|
|
||||||
type getTLSRootCAFunc func() (*x509.CertPool, error)
|
type getTLSRootCAFunc func() (*x509.CertPool, error)
|
||||||
|
|
||||||
func newGetTLSRootCACached(getTLSRootCA getTLSRootCAFunc) getTLSRootCAFunc {
|
type getTLSConfigFunc func() (*tls.Config, error)
|
||||||
if getTLSRootCA == nil {
|
|
||||||
return nil
|
func newGetTLSConfigCached(getTLSConfig getTLSConfigFunc) getTLSConfigFunc {
|
||||||
}
|
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var deadline uint64
|
var deadline uint64
|
||||||
var rootCA *x509.CertPool
|
var tlsCfg *tls.Config
|
||||||
var err error
|
var err error
|
||||||
return func() (*x509.CertPool, error) {
|
return func() (*tls.Config, error) {
|
||||||
// Cache the root CA and the error for up to a second in order to save CPU time
|
// Cache the tlsCfg and the error for up to a second in order to save CPU time on getTLSConfig() call.
|
||||||
// on reading and parsing the root CA from files.
|
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
if fasttime.UnixTimestamp() > deadline {
|
if fasttime.UnixTimestamp() > deadline {
|
||||||
rootCA, err = getTLSRootCA()
|
tlsCfg, err = getTLSConfig()
|
||||||
deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
|
deadline = fasttime.UnixTimestamp() + 1
|
||||||
}
|
}
|
||||||
return rootCA, err
|
return tlsCfg, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,52 +432,103 @@ func newGetTLSCertCached(getTLSCert getTLSCertFunc) getTLSCertFunc {
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
if fasttime.UnixTimestamp() > deadline {
|
if fasttime.UnixTimestamp() > deadline {
|
||||||
cert, err = getTLSCert(cri)
|
cert, err = getTLSCert(cri)
|
||||||
deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
|
deadline = fasttime.UnixTimestamp() + 1
|
||||||
}
|
}
|
||||||
return cert, err
|
return cert, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTLSConfig returns new TLS config for the given ac.
|
// NewRoundTripper returns new http.RoundTripper for the given ac, which uses the given trBase as base transport.
|
||||||
func (ac *Config) NewTLSConfig() (*tls.Config, error) {
|
//
|
||||||
|
// The caller shouldn't change the trBase, since the returned RoundTripper owns it.
|
||||||
|
func (ac *Config) NewRoundTripper(trBase *http.Transport) http.RoundTripper {
|
||||||
|
rt := &roundTripper{
|
||||||
|
trBase: trBase,
|
||||||
|
}
|
||||||
|
if ac != nil {
|
||||||
|
rt.getTLSConfigCached = ac.getTLSConfigCached
|
||||||
|
}
|
||||||
|
return rt
|
||||||
|
}
|
||||||
|
|
||||||
|
type roundTripper struct {
|
||||||
|
trBase *http.Transport
|
||||||
|
getTLSConfigCached getTLSConfigFunc
|
||||||
|
|
||||||
|
rootCAPrev *x509.CertPool
|
||||||
|
trPrev *http.Transport
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// RoundTrip implements http.RoundTripper interface.
|
||||||
|
func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
tr, err := rt.getTransport()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot initialize Transport: %w", err)
|
||||||
|
}
|
||||||
|
return tr.RoundTrip(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *roundTripper) getTransport() (*http.Transport, error) {
|
||||||
|
if rt.getTLSConfigCached == nil {
|
||||||
|
return rt.trBase, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsCfg, err := rt.getTLSConfigCached()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rt.mu.Lock()
|
||||||
|
defer rt.mu.Unlock()
|
||||||
|
|
||||||
|
if rt.trPrev != nil && tlsCfg.RootCAs.Equal(rt.rootCAPrev) {
|
||||||
|
// Fast path - tlsCfg wasn't changed. Return the previously created transport.
|
||||||
|
return rt.trPrev, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path - tlsCfg has been changed.
|
||||||
|
// Close connections for the previous transport and create new transport for the updated tlsCfg.
|
||||||
|
if rt.trPrev != nil {
|
||||||
|
rt.trPrev.CloseIdleConnections()
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := rt.trBase.Clone()
|
||||||
|
tr.TLSClientConfig = tlsCfg
|
||||||
|
rt.trPrev = tr
|
||||||
|
rt.rootCAPrev = tlsCfg.RootCAs
|
||||||
|
|
||||||
|
return rt.trPrev, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ac *Config) getTLSConfig() (*tls.Config, error) {
|
||||||
|
if ac.getTLSCertCached == nil && ac.tlsServerName == "" && !ac.tlsInsecureSkipVerify && ac.tlsMinVersion == 0 && ac.getTLSRootCA == nil {
|
||||||
|
// Re-use zeroTLSConfig when ac doesn't contain tls-specific configs.
|
||||||
|
// This should reduce memory usage a bit.
|
||||||
|
return zeroTLSConfig, nil
|
||||||
|
}
|
||||||
|
|
||||||
tlsCfg := &tls.Config{
|
tlsCfg := &tls.Config{
|
||||||
ClientSessionCache: tls.NewLRUClientSessionCache(0),
|
ClientSessionCache: tls.NewLRUClientSessionCache(0),
|
||||||
|
GetClientCertificate: ac.getTLSCertCached,
|
||||||
|
ServerName: ac.tlsServerName,
|
||||||
|
InsecureSkipVerify: ac.tlsInsecureSkipVerify,
|
||||||
|
MinVersion: ac.tlsMinVersion,
|
||||||
|
// Do not set MaxVersion, since this has no sense from security PoV.
|
||||||
|
// This can only result in lower security level if improperly configured.
|
||||||
}
|
}
|
||||||
if ac == nil {
|
if f := ac.getTLSRootCA; f != nil {
|
||||||
return tlsCfg, nil
|
|
||||||
}
|
|
||||||
tlsCfg.GetClientCertificate = ac.getTLSCertCached
|
|
||||||
if f := ac.getTLSRootCACached; f != nil {
|
|
||||||
rootCA, err := f()
|
rootCA, err := f()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot load root CAs: %w", err)
|
return nil, fmt.Errorf("cannot load root CAs: %w", err)
|
||||||
}
|
}
|
||||||
tlsCfg.RootCAs = rootCA
|
tlsCfg.RootCAs = rootCA
|
||||||
}
|
}
|
||||||
tlsCfg.ServerName = ac.tlsServerName
|
|
||||||
tlsCfg.InsecureSkipVerify = ac.tlsInsecureSkipVerify
|
|
||||||
tlsCfg.MinVersion = ac.tlsMinVersion
|
|
||||||
// Do not set tlsCfg.MaxVersion, since this has no sense from security PoV.
|
|
||||||
// This can only result in lower security level if improperly set.
|
|
||||||
return tlsCfg, nil
|
return tlsCfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRoundTripper returns new http.RoundTripper for the given ac.
|
var zeroTLSConfig = &tls.Config{
|
||||||
func (ac *Config) NewRoundTripper(builder func(*http.Transport)) (http.RoundTripper, error) {
|
ClientSessionCache: tls.NewLRUClientSessionCache(0),
|
||||||
cfg, err := ac.NewTLSConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to initialize TLS config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ac == nil {
|
|
||||||
tr := &http.Transport{
|
|
||||||
TLSClientConfig: cfg,
|
|
||||||
}
|
|
||||||
builder(tr)
|
|
||||||
return tr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return ac.tctx.NewTLSRoundTripper(cfg, builder)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig creates auth config for the given hcc.
|
// NewConfig creates auth config for the given hcc.
|
||||||
|
@ -627,8 +667,8 @@ func (opts *Options) NewConfig() (*Config, error) {
|
||||||
tlsInsecureSkipVerify: tctx.insecureSkipVerify,
|
tlsInsecureSkipVerify: tctx.insecureSkipVerify,
|
||||||
tlsMinVersion: tctx.minVersion,
|
tlsMinVersion: tctx.minVersion,
|
||||||
|
|
||||||
getTLSRootCACached: newGetTLSRootCACached(tctx.getTLSRootCA),
|
getTLSRootCA: tctx.getTLSRootCA,
|
||||||
tlsRootCADigest: tctx.tlsRootCADigest,
|
tlsRootCADigest: tctx.tlsRootCADigest,
|
||||||
|
|
||||||
getTLSCertCached: newGetTLSCertCached(tctx.getTLSCert),
|
getTLSCertCached: newGetTLSCertCached(tctx.getTLSCert),
|
||||||
tlsCertDigest: tctx.tlsCertDigest,
|
tlsCertDigest: tctx.tlsCertDigest,
|
||||||
|
@ -638,9 +678,8 @@ func (opts *Options) NewConfig() (*Config, error) {
|
||||||
|
|
||||||
headers: headers,
|
headers: headers,
|
||||||
headersDigest: headersDigest,
|
headersDigest: headersDigest,
|
||||||
|
|
||||||
tctx: &tctx,
|
|
||||||
}
|
}
|
||||||
|
ac.getTLSConfigCached = newGetTLSConfigCached(ac.getTLSConfig)
|
||||||
return ac, nil
|
return ac, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -778,7 +817,6 @@ type tlsContext struct {
|
||||||
tlsCertDigest string
|
tlsCertDigest string
|
||||||
|
|
||||||
getTLSRootCA getTLSRootCAFunc
|
getTLSRootCA getTLSRootCAFunc
|
||||||
getRootCAPEM func() ([]byte, error)
|
|
||||||
tlsRootCADigest string
|
tlsRootCADigest string
|
||||||
|
|
||||||
serverName string
|
serverName string
|
||||||
|
@ -802,26 +840,16 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
|
||||||
} else if tc.CertFile != "" || tc.KeyFile != "" {
|
} else if tc.CertFile != "" || tc.KeyFile != "" {
|
||||||
certPath := fscore.GetFilepath(baseDir, tc.CertFile)
|
certPath := fscore.GetFilepath(baseDir, tc.CertFile)
|
||||||
keyPath := fscore.GetFilepath(baseDir, tc.KeyFile)
|
keyPath := fscore.GetFilepath(baseDir, tc.KeyFile)
|
||||||
getCertsPEM := func() ([]byte, []byte, error) {
|
tctx.getTLSCert = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||||
// Re-read TLS certificate from disk. This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420
|
// Re-read TLS certificate from disk. This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420
|
||||||
certData, err := fscore.ReadFileOrHTTP(certPath)
|
certData, err := fscore.ReadFileOrHTTP(certPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("cannot read TLS certificate from %q: %w", certPath, err)
|
return nil, fmt.Errorf("cannot read TLS certificate from %q: %w", certPath, err)
|
||||||
}
|
}
|
||||||
keyData, err := fscore.ReadFileOrHTTP(keyPath)
|
keyData, err := fscore.ReadFileOrHTTP(keyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("cannot read TLS key from %q: %w", keyPath, err)
|
return nil, fmt.Errorf("cannot read TLS key from %q: %w", keyPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return certData, keyData, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
tctx.getTLSCert = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
|
||||||
certData, keyData, err := getCertsPEM()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cert, err := tls.X509KeyPair(certData, keyData)
|
cert, err := tls.X509KeyPair(certData, keyData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot load TLS certificate from `cert_file`=%q, `key_file`=%q: %w", tc.CertFile, tc.KeyFile, err)
|
return nil, fmt.Errorf("cannot load TLS certificate from `cert_file`=%q, `key_file`=%q: %w", tc.CertFile, tc.KeyFile, err)
|
||||||
|
@ -835,48 +863,24 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
|
||||||
if !rootCA.AppendCertsFromPEM([]byte(tc.CA)) {
|
if !rootCA.AppendCertsFromPEM([]byte(tc.CA)) {
|
||||||
return fmt.Errorf("cannot parse data from `ca` value")
|
return fmt.Errorf("cannot parse data from `ca` value")
|
||||||
}
|
}
|
||||||
|
|
||||||
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
|
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
|
||||||
return rootCA, nil
|
return rootCA, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tctx.getRootCAPEM = func() ([]byte, error) {
|
|
||||||
return []byte(tc.CA), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
h := xxhash.Sum64([]byte(tc.CA))
|
h := xxhash.Sum64([]byte(tc.CA))
|
||||||
tctx.tlsRootCADigest = fmt.Sprintf("digest(CA)=%d", h)
|
tctx.tlsRootCADigest = fmt.Sprintf("digest(CA)=%d", h)
|
||||||
} else if tc.CAFile != "" {
|
} else if tc.CAFile != "" {
|
||||||
path := fscore.GetFilepath(baseDir, tc.CAFile)
|
path := fscore.GetFilepath(baseDir, tc.CAFile)
|
||||||
|
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
|
||||||
getRootCAPEM := func() ([]byte, error) {
|
|
||||||
data, err := fscore.ReadFileOrHTTP(path)
|
data, err := fscore.ReadFileOrHTTP(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot read `ca_file`: %w", err)
|
return nil, fmt.Errorf("cannot read `ca_file`: %w", err)
|
||||||
}
|
}
|
||||||
return data, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
|
|
||||||
data, err := getRootCAPEM()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
rootCA := x509.NewCertPool()
|
rootCA := x509.NewCertPool()
|
||||||
if !rootCA.AppendCertsFromPEM(data) {
|
if !rootCA.AppendCertsFromPEM(data) {
|
||||||
return nil, fmt.Errorf("cannot parse data read from `ca_file` %q", tc.CAFile)
|
return nil, fmt.Errorf("cannot parse data read from `ca_file` %q", tc.CAFile)
|
||||||
}
|
}
|
||||||
return rootCA, nil
|
return rootCA, nil
|
||||||
}
|
}
|
||||||
tctx.getRootCAPEM = func() ([]byte, error) {
|
|
||||||
data, err := getRootCAPEM()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return data, nil
|
|
||||||
}
|
|
||||||
// Does not hash file contents, since they may change at any time.
|
|
||||||
// TLSRoundTripper must be used to automatically update the root CA.
|
|
||||||
tctx.tlsRootCADigest = fmt.Sprintf("caFile=%q", tc.CAFile)
|
tctx.tlsRootCADigest = fmt.Sprintf("caFile=%q", tc.CAFile)
|
||||||
}
|
}
|
||||||
v, err := netutil.ParseTLSVersion(tc.MinVersion)
|
v, err := netutil.ParseTLSVersion(tc.MinVersion)
|
||||||
|
@ -886,118 +890,3 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
|
||||||
tctx.minVersion = v
|
tctx.minVersion = v
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTLSRoundTripper returns new http.RoundTripper which automatically updates
|
|
||||||
// RootCA in tls.Config whenever it changes.
|
|
||||||
func (tctx *tlsContext) NewTLSRoundTripper(cfg *tls.Config, builder func(transport *http.Transport)) (http.RoundTripper, error) {
|
|
||||||
// TLS context is not initialized so use the provided RoundTripper without wrapper
|
|
||||||
if tctx == nil || tctx.getRootCAPEM == nil {
|
|
||||||
tr := &http.Transport{
|
|
||||||
TLSClientConfig: cfg,
|
|
||||||
}
|
|
||||||
builder(tr)
|
|
||||||
return tr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var deadline uint64
|
|
||||||
var rootCA []byte
|
|
||||||
var err error
|
|
||||||
getTLSDigestsLocked := func() []byte {
|
|
||||||
if fasttime.UnixTimestamp() > deadline {
|
|
||||||
rootCA, err = tctx.getRootCAPEM()
|
|
||||||
if err != nil {
|
|
||||||
logger.Warnf("cannot load root CA: %s", err)
|
|
||||||
}
|
|
||||||
deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
|
|
||||||
}
|
|
||||||
|
|
||||||
return rootCA
|
|
||||||
}
|
|
||||||
tr := &http.Transport{
|
|
||||||
TLSClientConfig: cfg,
|
|
||||||
}
|
|
||||||
builder(tr)
|
|
||||||
|
|
||||||
return &TLSRoundTripper{
|
|
||||||
builder: builder,
|
|
||||||
getRootCABytesLocked: getTLSDigestsLocked,
|
|
||||||
getTLSRootCA: newGetTLSRootCACached(tctx.getTLSRootCA),
|
|
||||||
config: cfg,
|
|
||||||
tr: tr,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ http.RoundTripper = &TLSRoundTripper{}
|
|
||||||
|
|
||||||
// TLSRoundTripper is an implementation of http.RoundTripper which automatically
|
|
||||||
// updates RootCA in tls.Config whenever it changes.
|
|
||||||
type TLSRoundTripper struct {
|
|
||||||
builder func(*http.Transport)
|
|
||||||
getRootCABytesLocked func() []byte
|
|
||||||
getTLSRootCA func() (*x509.CertPool, error)
|
|
||||||
|
|
||||||
config *tls.Config
|
|
||||||
|
|
||||||
tr http.RoundTripper
|
|
||||||
|
|
||||||
m sync.Mutex
|
|
||||||
rootCABytes []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// RoundTrip implements http.RoundTripper.RoundTrip.
|
|
||||||
// It automatically updates RootCA in tls.Config whenever it changes.
|
|
||||||
func (t *TLSRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
|
|
||||||
t.m.Lock()
|
|
||||||
rootCABytes := t.getRootCABytesLocked()
|
|
||||||
equal := bytes.Equal(rootCABytes, t.rootCABytes)
|
|
||||||
|
|
||||||
if equal {
|
|
||||||
t.m.Unlock()
|
|
||||||
return t.tr.RoundTrip(request)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := t.recreateRoundTripperLocked(rootCABytes)
|
|
||||||
t.m.Unlock()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot recreate RoundTripper: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return t.tr.RoundTrip(request)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TLSRoundTripper) recreateRoundTripperLocked(rootCABytes []byte) error {
|
|
||||||
newRootCaPool, err := t.getTLSRootCA()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot load root CAs: %w", err)
|
|
||||||
}
|
|
||||||
newTLSConfig := t.config.Clone()
|
|
||||||
newTLSConfig.RootCAs = newRootCaPool
|
|
||||||
// Reset ClientSessionCache, since it may contain sessions for the old root CA.
|
|
||||||
newTLSConfig.ClientSessionCache = tls.NewLRUClientSessionCache(0)
|
|
||||||
|
|
||||||
tr := &http.Transport{
|
|
||||||
TLSClientConfig: newTLSConfig,
|
|
||||||
}
|
|
||||||
t.builder(tr)
|
|
||||||
|
|
||||||
oldRt := t.tr
|
|
||||||
t.tr = tr
|
|
||||||
t.rootCABytes = rootCABytes
|
|
||||||
t.config = newTLSConfig
|
|
||||||
|
|
||||||
if oldRt != nil {
|
|
||||||
closeIdleConnections(oldRt)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type closeIdler interface {
|
|
||||||
CloseIdleConnections()
|
|
||||||
}
|
|
||||||
|
|
||||||
func closeIdleConnections(t http.RoundTripper) {
|
|
||||||
if ci, ok := t.(closeIdler); ok {
|
|
||||||
ci.CloseIdleConnections()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"crypto/x509/pkix"
|
"crypto/x509/pkix"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -607,35 +608,30 @@ func TestConfigHeaders(t *testing.T) {
|
||||||
|
|
||||||
func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
|
func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
|
||||||
// Generate and save a self-signed CA certificate and a certificate signed by the CA
|
// Generate and save a self-signed CA certificate and a certificate signed by the CA
|
||||||
caPEM, certPEM, keyPEM := generateCertificates(t)
|
caPEM, certPEM, keyPEM := mustGenerateCertificates()
|
||||||
_ = os.WriteFile("testdata/ca.pem", caPEM, 0644)
|
_ = os.WriteFile("testdata/ca.pem", caPEM, 0644)
|
||||||
_ = os.WriteFile("testdata/cert.pem", certPEM, 0644)
|
|
||||||
_ = os.WriteFile("testdata/key.pem", keyPEM, 0644)
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, p := range []string{
|
_ = os.Remove("testdata/ca.pem")
|
||||||
"testdata/ca.pem",
|
|
||||||
"testdata/cert.pem",
|
|
||||||
"testdata/key.pem",
|
|
||||||
} {
|
|
||||||
_ = os.Remove(p)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
cert, err := tls.LoadX509KeyPair("testdata/cert.pem", "testdata/key.pem")
|
cert, err := tls.X509KeyPair(certPEM, keyPEM)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot load generated certificate: %s", err)
|
t.Fatalf("cannot load generated certificate: %s", err)
|
||||||
}
|
}
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
}
|
||||||
|
|
||||||
tlsConfig := &tls.Config{}
|
s := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
|
||||||
|
|
||||||
s := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}))
|
}))
|
||||||
s.TLS = tlsConfig
|
s.TLS = tlsConfig
|
||||||
s.StartTLS()
|
s.StartTLS()
|
||||||
serverURL, _ := url.Parse(s.URL)
|
serverURL, err := url.Parse(s.URL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing url=%q: %s", s.URL, err)
|
||||||
|
}
|
||||||
|
|
||||||
opts := Options{
|
opts := Options{
|
||||||
TLSConfig: &TLSConfig{
|
TLSConfig: &TLSConfig{
|
||||||
|
@ -646,13 +642,9 @@ func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error when parsing config: %s", err)
|
t.Fatalf("unexpected error when parsing config: %s", err)
|
||||||
}
|
}
|
||||||
tr, err := ac.NewRoundTripper(func(tr *http.Transport) {})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error when creating roundtripper: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
client := http.Client{
|
client := http.Client{
|
||||||
Transport: tr,
|
Transport: ac.NewRoundTripper(&http.Transport{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := client.Do(&http.Request{
|
resp, err := client.Do(&http.Request{
|
||||||
|
@ -662,17 +654,16 @@ func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error when making request: %s", err)
|
t.Fatalf("unexpected error when making request: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
t.Fatalf("expected status code %d; got %d", http.StatusOK, resp.StatusCode)
|
t.Fatalf("expected status code %d; got %d", http.StatusOK, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update CA file with new CA and get config
|
// Update CA file with new CA and get config
|
||||||
ca2PEM, _, _ := generateCertificates(t)
|
ca2PEM, _, _ := mustGenerateCertificates()
|
||||||
_ = os.WriteFile("testdata/ca.pem", ca2PEM, 0644)
|
_ = os.WriteFile("testdata/ca.pem", ca2PEM, 0644)
|
||||||
|
|
||||||
// Wait for cert cache expiration
|
// Wait for cert cache expiration
|
||||||
time.Sleep(2 * tlsCertsCacheSeconds * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
_, err = client.Do(&http.Request{
|
_, err = client.Do(&http.Request{
|
||||||
Method: http.MethodGet,
|
Method: http.MethodGet,
|
||||||
|
@ -683,7 +674,7 @@ func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateCertificates(t *testing.T) ([]byte, []byte, []byte) {
|
func mustGenerateCertificates() ([]byte, []byte, []byte) {
|
||||||
// Small key size for faster tests
|
// Small key size for faster tests
|
||||||
const testCertificateBits = 1024
|
const testCertificateBits = 1024
|
||||||
|
|
||||||
|
@ -701,11 +692,11 @@ func generateCertificates(t *testing.T) ([]byte, []byte, []byte) {
|
||||||
}
|
}
|
||||||
caPrivKey, err := rsa.GenerateKey(rand.Reader, testCertificateBits)
|
caPrivKey, err := rsa.GenerateKey(rand.Reader, testCertificateBits)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
panic(fmt.Errorf("cannot generate CA private key: %s", err))
|
||||||
}
|
}
|
||||||
caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey)
|
caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
panic(fmt.Errorf("cannot create CA certificate: %s", err))
|
||||||
}
|
}
|
||||||
caPEM := pem.EncodeToMemory(&pem.Block{
|
caPEM := pem.EncodeToMemory(&pem.Block{
|
||||||
Type: "CERTIFICATE",
|
Type: "CERTIFICATE",
|
||||||
|
@ -727,11 +718,11 @@ func generateCertificates(t *testing.T) ([]byte, []byte, []byte) {
|
||||||
}
|
}
|
||||||
key, err := rsa.GenerateKey(rand.Reader, testCertificateBits)
|
key, err := rsa.GenerateKey(rand.Reader, testCertificateBits)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
panic(fmt.Errorf("cannot generate certificate private key: %s", err))
|
||||||
}
|
}
|
||||||
certBytes, err := x509.CreateCertificate(rand.Reader, cert, ca, &key.PublicKey, caPrivKey)
|
certBytes, err := x509.CreateCertificate(rand.Reader, cert, ca, &key.PublicKey, caPrivKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
panic(fmt.Errorf("cannot generate certificate: %s", err))
|
||||||
}
|
}
|
||||||
certPEM := pem.EncodeToMemory(&pem.Block{
|
certPEM := pem.EncodeToMemory(&pem.Block{
|
||||||
Type: "CERTIFICATE",
|
Type: "CERTIFICATE",
|
||||||
|
|
|
@ -2,7 +2,6 @@ package promscrape
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -43,23 +42,18 @@ type client struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||||
isTLS := strings.HasPrefix(sw.ScrapeURL, "https://")
|
ac := sw.AuthConfig
|
||||||
setHeaders := func(req *http.Request) error {
|
setHeaders := func(req *http.Request) error {
|
||||||
return sw.AuthConfig.SetHeaders(req, true)
|
return sw.AuthConfig.SetHeaders(req, true)
|
||||||
}
|
}
|
||||||
setProxyHeaders := func(_ *http.Request) error {
|
setProxyHeaders := func(_ *http.Request) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var tlsCfg *tls.Config
|
|
||||||
proxyURL := sw.ProxyURL
|
proxyURL := sw.ProxyURL
|
||||||
if !isTLS && proxyURL.IsHTTPOrHTTPS() {
|
if !strings.HasPrefix(sw.ScrapeURL, "https://") && proxyURL.IsHTTPOrHTTPS() {
|
||||||
pu := proxyURL.GetURL()
|
pu := proxyURL.GetURL()
|
||||||
if pu.Scheme == "https" {
|
if pu.Scheme == "https" {
|
||||||
var err error
|
ac = sw.ProxyAuthConfig
|
||||||
tlsCfg, err = sw.ProxyAuthConfig.NewTLSConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot initialize proxy tls config: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
setProxyHeaders = func(req *http.Request) error {
|
setProxyHeaders = func(req *http.Request) error {
|
||||||
return proxyURL.SetHeaders(sw.ProxyAuthConfig, req)
|
return proxyURL.SetHeaders(sw.ProxyAuthConfig, req)
|
||||||
|
@ -69,30 +63,19 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||||
if pu := sw.ProxyURL.GetURL(); pu != nil {
|
if pu := sw.ProxyURL.GetURL(); pu != nil {
|
||||||
proxyURLFunc = http.ProxyURL(pu)
|
proxyURLFunc = http.ProxyURL(pu)
|
||||||
}
|
}
|
||||||
|
|
||||||
rt, err := sw.AuthConfig.NewRoundTripper(func(tr *http.Transport) {
|
|
||||||
if !isTLS && proxyURL.IsHTTPOrHTTPS() {
|
|
||||||
tr.TLSClientConfig = tlsCfg
|
|
||||||
}
|
|
||||||
|
|
||||||
tr.Proxy = proxyURLFunc
|
|
||||||
tr.TLSHandshakeTimeout = 10 * time.Second
|
|
||||||
tr.IdleConnTimeout = 2 * sw.ScrapeInterval
|
|
||||||
tr.DisableCompression = *disableCompression || sw.DisableCompression
|
|
||||||
tr.DisableKeepAlives = *disableKeepAlive || sw.DisableKeepAlive
|
|
||||||
tr.DialContext = statStdDial
|
|
||||||
tr.MaxIdleConnsPerHost = 100
|
|
||||||
tr.MaxResponseHeaderBytes = int64(maxResponseHeadersSize.N)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
hc := &http.Client{
|
hc := &http.Client{
|
||||||
Transport: rt,
|
Transport: ac.NewRoundTripper(&http.Transport{
|
||||||
Timeout: sw.ScrapeTimeout,
|
Proxy: proxyURLFunc,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
IdleConnTimeout: 2 * sw.ScrapeInterval,
|
||||||
|
DisableCompression: *disableCompression || sw.DisableCompression,
|
||||||
|
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
|
||||||
|
DialContext: statStdDial,
|
||||||
|
MaxIdleConnsPerHost: 100,
|
||||||
|
MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N),
|
||||||
|
}),
|
||||||
|
Timeout: sw.ScrapeTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
if sw.DenyRedirects {
|
if sw.DenyRedirects {
|
||||||
hc.CheckRedirect = func(_ *http.Request, _ []*http.Request) error {
|
hc.CheckRedirect = func(_ *http.Request, _ []*http.Request) error {
|
||||||
return http.ErrUseLastResponse
|
return http.ErrUseLastResponse
|
||||||
|
|
|
@ -211,19 +211,7 @@ func areEqualGlobalConfigs(a, b *GlobalConfig) bool {
|
||||||
func areEqualScrapeConfigs(a, b *ScrapeConfig) bool {
|
func areEqualScrapeConfigs(a, b *ScrapeConfig) bool {
|
||||||
sa := a.marshalJSON()
|
sa := a.marshalJSON()
|
||||||
sb := b.marshalJSON()
|
sb := b.marshalJSON()
|
||||||
if string(sa) != string(sb) {
|
return string(sa) == string(sb)
|
||||||
return false
|
|
||||||
}
|
|
||||||
// Compare auth configs for a and b, since they may differ by TLS CA file contents,
|
|
||||||
// which is missing in the marshaled JSON of a and b,
|
|
||||||
// but it existis in the string representation of auth configs.
|
|
||||||
if a.swc.authConfig.String() != b.swc.authConfig.String() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if a.swc.proxyAuthConfig.String() != b.swc.proxyAuthConfig.String() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *ScrapeConfig) unmarshalJSON(data []byte) error {
|
func (sc *ScrapeConfig) unmarshalJSON(data []byte) error {
|
||||||
|
|
|
@ -88,10 +88,7 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
|
||||||
attachNodeMetadata = sdc.AttachMetadata.Node
|
attachNodeMetadata = sdc.AttachMetadata.Node
|
||||||
}
|
}
|
||||||
proxyURL := sdc.ProxyURL.GetURL()
|
proxyURL := sdc.ProxyURL.GetURL()
|
||||||
gw, err := getGroupWatcher(apiServer, ac, namespaces, selectors, attachNodeMetadata, proxyURL)
|
gw := getGroupWatcher(apiServer, ac, namespaces, selectors, attachNodeMetadata, proxyURL)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
role := sdc.role()
|
role := sdc.role()
|
||||||
aw := &apiWatcher{
|
aw := &apiWatcher{
|
||||||
role: role,
|
role: role,
|
||||||
|
@ -246,25 +243,19 @@ type groupWatcher struct {
|
||||||
noAPIWatchers bool
|
noAPIWatchers bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) (*groupWatcher, error) {
|
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher {
|
||||||
var proxy func(*http.Request) (*url.URL, error)
|
var proxy func(*http.Request) (*url.URL, error)
|
||||||
if proxyURL != nil {
|
if proxyURL != nil {
|
||||||
proxy = http.ProxyURL(proxyURL)
|
proxy = http.ProxyURL(proxyURL)
|
||||||
}
|
}
|
||||||
tr, err := ac.NewRoundTripper(func(tr *http.Transport) {
|
|
||||||
tr.Proxy = proxy
|
|
||||||
tr.TLSHandshakeTimeout = 10 * time.Second
|
|
||||||
tr.IdleConnTimeout = *apiServerTimeout
|
|
||||||
tr.MaxIdleConnsPerHost = 100
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: tr,
|
Transport: ac.NewRoundTripper(&http.Transport{
|
||||||
Timeout: *apiServerTimeout,
|
Proxy: proxy,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
IdleConnTimeout: *apiServerTimeout,
|
||||||
|
MaxIdleConnsPerHost: 100,
|
||||||
|
}),
|
||||||
|
Timeout: *apiServerTimeout,
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
gw := &groupWatcher{
|
gw := &groupWatcher{
|
||||||
|
@ -282,10 +273,10 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
return gw, nil
|
return gw
|
||||||
}
|
}
|
||||||
|
|
||||||
func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) (*groupWatcher, error) {
|
func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher {
|
||||||
proxyURLStr := "<nil>"
|
proxyURLStr := "<nil>"
|
||||||
if proxyURL != nil {
|
if proxyURL != nil {
|
||||||
proxyURLStr = proxyURL.String()
|
proxyURLStr = proxyURL.String()
|
||||||
|
@ -294,17 +285,12 @@ func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
|
||||||
apiServer, namespaces, selectorsKey(selectors), attachNodeMetadata, proxyURLStr, ac.String())
|
apiServer, namespaces, selectorsKey(selectors), attachNodeMetadata, proxyURLStr, ac.String())
|
||||||
groupWatchersLock.Lock()
|
groupWatchersLock.Lock()
|
||||||
gw := groupWatchers[key]
|
gw := groupWatchers[key]
|
||||||
var err error
|
|
||||||
if gw == nil {
|
if gw == nil {
|
||||||
gw, err = newGroupWatcher(apiServer, ac, namespaces, selectors, attachNodeMetadata, proxyURL)
|
gw = newGroupWatcher(apiServer, ac, namespaces, selectors, attachNodeMetadata, proxyURL)
|
||||||
if err != nil {
|
groupWatchers[key] = gw
|
||||||
err = fmt.Errorf("cannot initialize watcher for key={%s}: %w", key, err)
|
|
||||||
} else {
|
|
||||||
groupWatchers[key] = gw
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
groupWatchersLock.Unlock()
|
groupWatchersLock.Unlock()
|
||||||
return gw, err
|
return gw
|
||||||
}
|
}
|
||||||
|
|
||||||
func selectorsKey(selectors []Selector) string {
|
func selectorsKey(selectors []Selector) string {
|
||||||
|
|
|
@ -94,14 +94,9 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
cfg.client.CloseIdleConnections()
|
cfg.client.CloseIdleConnections()
|
||||||
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
|
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
|
||||||
}
|
}
|
||||||
tr, err := ac.NewRoundTripper(func(tr *http.Transport) {
|
cfg.client.Transport = ac.NewRoundTripper(&http.Transport{
|
||||||
tr.MaxIdleConnsPerHost = 100
|
MaxIdleConnsPerHost: 100,
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
cfg.client.CloseIdleConnections()
|
|
||||||
return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
|
|
||||||
}
|
|
||||||
cfg.client.Transport = tr
|
|
||||||
}
|
}
|
||||||
// use public compute endpoint by default
|
// use public compute endpoint by default
|
||||||
if len(cfg.availability) == 0 {
|
if len(cfg.availability) == 0 {
|
||||||
|
|
|
@ -47,9 +47,10 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
var transport http.RoundTripper = &http.Transport{
|
tr := &http.Transport{
|
||||||
MaxIdleConnsPerHost: 100,
|
MaxIdleConnsPerHost: 100,
|
||||||
}
|
}
|
||||||
|
rt := http.RoundTripper(tr)
|
||||||
if sdc.TLSConfig != nil {
|
if sdc.TLSConfig != nil {
|
||||||
opts := &promauth.Options{
|
opts := &promauth.Options{
|
||||||
BaseDir: baseDir,
|
BaseDir: baseDir,
|
||||||
|
@ -59,16 +60,11 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
|
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
|
||||||
}
|
}
|
||||||
transport, err = ac.NewRoundTripper(func(tr *http.Transport) {
|
rt = ac.NewRoundTripper(tr)
|
||||||
tr.MaxIdleConnsPerHost = 100
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
cfg := &apiConfig{
|
cfg := &apiConfig{
|
||||||
client: &http.Client{
|
client: &http.Client{
|
||||||
Transport: transport,
|
Transport: rt,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
apiEndpoint := sdc.APIEndpoint
|
apiEndpoint := sdc.APIEndpoint
|
||||||
|
|
|
@ -111,35 +111,25 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
|
||||||
proxyURLFunc = http.ProxyURL(pu)
|
proxyURLFunc = http.ProxyURL(pu)
|
||||||
}
|
}
|
||||||
|
|
||||||
tr, err := ac.NewRoundTripper(func(tr *http.Transport) {
|
|
||||||
tr.Proxy = proxyURLFunc
|
|
||||||
tr.TLSHandshakeTimeout = 10 * time.Second
|
|
||||||
tr.MaxIdleConnsPerHost = *maxConcurrency
|
|
||||||
tr.ResponseHeaderTimeout = DefaultClientReadTimeout
|
|
||||||
tr.DialContext = dialFunc
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
blockingTR, err := ac.NewRoundTripper(func(tr *http.Transport) {
|
|
||||||
tr.Proxy = proxyURLFunc
|
|
||||||
tr.TLSHandshakeTimeout = 10 * time.Second
|
|
||||||
tr.MaxIdleConnsPerHost = 1000
|
|
||||||
tr.ResponseHeaderTimeout = BlockingClientReadTimeout
|
|
||||||
tr.DialContext = dialFunc
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Timeout: DefaultClientReadTimeout,
|
Timeout: DefaultClientReadTimeout,
|
||||||
Transport: tr,
|
Transport: ac.NewRoundTripper(&http.Transport{
|
||||||
|
Proxy: proxyURLFunc,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
MaxIdleConnsPerHost: *maxConcurrency,
|
||||||
|
ResponseHeaderTimeout: DefaultClientReadTimeout,
|
||||||
|
DialContext: dialFunc,
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
blockingClient := &http.Client{
|
blockingClient := &http.Client{
|
||||||
Timeout: BlockingClientReadTimeout,
|
Timeout: BlockingClientReadTimeout,
|
||||||
Transport: blockingTR,
|
Transport: ac.NewRoundTripper(&http.Transport{
|
||||||
|
Proxy: proxyURLFunc,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
MaxIdleConnsPerHost: 1000,
|
||||||
|
ResponseHeaderTimeout: BlockingClientReadTimeout,
|
||||||
|
DialContext: dialFunc,
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
setHTTPHeaders := func(_ *http.Request) error { return nil }
|
setHTTPHeaders := func(_ *http.Request) error { return nil }
|
||||||
|
|
Loading…
Reference in a new issue