lib/{promauth,promscrape}: automatically refresh root CA certificates after changes on disk (#5725)

* lib/{promauth,promscrape}: automatically refresh root CA certificates after changes on disk

Added a custom `http.RoundTripper` implementation which checks for root CA content changes and updates `tls.Config` used by `http.RoundTripper` after detecting CA change.

Client certificate changes are not tracked by this implementation since `tls.Config` already supports passing certificate dynamically by overriding `tls.Config.GetClientCertificate`.

This change implements dynamic reload of root CA only for streaming client used for scraping. Blocking client (`fasthttp.HostClient`) does not support using custom transport so can't use this implementation.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5526

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promauth/config: update NewRoundTripper API

Update API to allow user to update only parameters required for transport.

Add warning log when reloading Root CA failed.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promauth/config: fix mutex acquire logic

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promauth/config: replace RWMutex with regular mutex to simplify the code

- remove additional mutex used for getRootCABytes - require callee to use mutex
- replace RWMutex with regular mutex

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/promauth/config: refactor

- hold the mutex lock to avoid round tripper being re-created twice
- move recreation logic into separate func to simplify the code

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2024-04-03 12:01:43 +04:00 committed by GitHub
parent 6910e72c99
commit b3b29ba6ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 410 additions and 89 deletions

View file

@ -64,6 +64,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [OpenTelemetry](https://docs.victoriametrics.com/#sending-data-via-opentelemetry): add `-opentelemetry.usePrometheusNaming` command-line flag, which can be used for enabling automatic conversion of the ingested metric names and labels into Prometheus-compatible format. See [these docs](https://docs.victoriametrics.com/#sending-data-via-opentelemetry) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6037). * FEATURE: [OpenTelemetry](https://docs.victoriametrics.com/#sending-data-via-opentelemetry): add `-opentelemetry.usePrometheusNaming` command-line flag, which can be used for enabling automatic conversion of the ingested metric names and labels into Prometheus-compatible format. See [these docs](https://docs.victoriametrics.com/#sending-data-via-opentelemetry) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6037).
* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue. * BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
* BUGFIX: properly reload root CA certificates from files for [service discovery](https://docs.victoriametrics.com/sd_configs/) and [scraping requests in stream parsing mode](https://docs.victoriametrics.com/vmagent/?highlight=stream&highlight=parse#stream-parsing-mode). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5526).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets when [`server_name` option at `tls_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config) is set. Previously the `Host` header was set incorrectly to the target hostname in this case.
* BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests. * BUGFIX: do not drop `match[]` filter at [`/api/v1/series`](https://docs.victoriametrics.com/url-examples/#apiv1series) if `-search.ignoreExtraFiltersAtLabelsAPI` command-line flag is set, since missing `match[]` filter breaks `/api/v1/series` requests.

View file

@ -1,6 +1,7 @@
package promauth package promauth
import ( import (
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
@ -11,12 +12,14 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
"golang.org/x/oauth2" "golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials" "golang.org/x/oauth2/clientcredentials"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
) )
// Secret represents a string secret such as password or auth token. // Secret represents a string secret such as password or auth token.
@ -289,6 +292,8 @@ type Config struct {
headers []keyValue headers []keyValue
headersDigest string headersDigest string
tctx *tlsContext
} }
type keyValue struct { type keyValue struct {
@ -371,6 +376,8 @@ func (ac *Config) String() string {
ac.authHeaderDigest, ac.headersDigest, ac.tlsRootCADigest, ac.tlsCertDigest, ac.tlsServerName, ac.tlsInsecureSkipVerify, ac.tlsMinVersion) ac.authHeaderDigest, ac.headersDigest, ac.tlsRootCADigest, ac.tlsCertDigest, ac.tlsServerName, ac.tlsInsecureSkipVerify, ac.tlsMinVersion)
} }
const tlsCertsCacheSeconds = 1
// getAuthHeaderFunc must return <value> for 'Authorization: <value>' http request header // getAuthHeaderFunc must return <value> for 'Authorization: <value>' http request header
type getAuthHeaderFunc func() (string, error) type getAuthHeaderFunc func() (string, error)
@ -390,7 +397,7 @@ func newGetAuthHeaderCached(getAuthHeader getAuthHeaderFunc) getAuthHeaderFunc {
defer mu.Unlock() defer mu.Unlock()
if fasttime.UnixTimestamp() > deadline { if fasttime.UnixTimestamp() > deadline {
ah, err = getAuthHeader() ah, err = getAuthHeader()
deadline = fasttime.UnixTimestamp() + 1 deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
} }
return ah, err return ah, err
} }
@ -413,7 +420,7 @@ func newGetTLSRootCACached(getTLSRootCA getTLSRootCAFunc) getTLSRootCAFunc {
defer mu.Unlock() defer mu.Unlock()
if fasttime.UnixTimestamp() > deadline { if fasttime.UnixTimestamp() > deadline {
rootCA, err = getTLSRootCA() rootCA, err = getTLSRootCA()
deadline = fasttime.UnixTimestamp() + 1 deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
} }
return rootCA, err return rootCA, err
} }
@ -436,7 +443,7 @@ func newGetTLSCertCached(getTLSCert getTLSCertFunc) getTLSCertFunc {
defer mu.Unlock() defer mu.Unlock()
if fasttime.UnixTimestamp() > deadline { if fasttime.UnixTimestamp() > deadline {
cert, err = getTLSCert(cri) cert, err = getTLSCert(cri)
deadline = fasttime.UnixTimestamp() + 1 deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
} }
return cert, err return cert, err
} }
@ -466,6 +473,24 @@ func (ac *Config) NewTLSConfig() (*tls.Config, error) {
return tlsCfg, nil return tlsCfg, nil
} }
// NewRoundTripper returns new http.RoundTripper for the given ac.
func (ac *Config) NewRoundTripper(builder func(*http.Transport)) (http.RoundTripper, error) {
cfg, err := ac.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("failed to initialize TLS config: %w", err)
}
if ac == nil {
tr := &http.Transport{
TLSClientConfig: cfg,
}
builder(tr)
return tr, nil
}
return ac.tctx.NewTLSRoundTripper(cfg, builder)
}
// NewConfig creates auth config for the given hcc. // NewConfig creates auth config for the given hcc.
func (hcc *HTTPClientConfig) NewConfig(baseDir string) (*Config, error) { func (hcc *HTTPClientConfig) NewConfig(baseDir string) (*Config, error) {
opts := &Options{ opts := &Options{
@ -613,6 +638,8 @@ func (opts *Options) NewConfig() (*Config, error) {
headers: headers, headers: headers,
headersDigest: headersDigest, headersDigest: headersDigest,
tctx: &tctx,
} }
return ac, nil return ac, nil
} }
@ -751,6 +778,7 @@ type tlsContext struct {
tlsCertDigest string tlsCertDigest string
getTLSRootCA getTLSRootCAFunc getTLSRootCA getTLSRootCAFunc
getRootCAPEM func() ([]byte, error)
tlsRootCADigest string tlsRootCADigest string
serverName string serverName string
@ -774,16 +802,26 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
} else if tc.CertFile != "" || tc.KeyFile != "" { } else if tc.CertFile != "" || tc.KeyFile != "" {
certPath := fscore.GetFilepath(baseDir, tc.CertFile) certPath := fscore.GetFilepath(baseDir, tc.CertFile)
keyPath := fscore.GetFilepath(baseDir, tc.KeyFile) keyPath := fscore.GetFilepath(baseDir, tc.KeyFile)
tctx.getTLSCert = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { getCertsPEM := func() ([]byte, []byte, error) {
// Re-read TLS certificate from disk. This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420 // Re-read TLS certificate from disk. This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420
certData, err := fscore.ReadFileOrHTTP(certPath) certData, err := fscore.ReadFileOrHTTP(certPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot read TLS certificate from %q: %w", certPath, err) return nil, nil, fmt.Errorf("cannot read TLS certificate from %q: %w", certPath, err)
} }
keyData, err := fscore.ReadFileOrHTTP(keyPath) keyData, err := fscore.ReadFileOrHTTP(keyPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot read TLS key from %q: %w", keyPath, err) return nil, nil, fmt.Errorf("cannot read TLS key from %q: %w", keyPath, err)
} }
return certData, keyData, nil
}
tctx.getTLSCert = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
certData, keyData, err := getCertsPEM()
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(certData, keyData) cert, err := tls.X509KeyPair(certData, keyData)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot load TLS certificate from `cert_file`=%q, `key_file`=%q: %w", tc.CertFile, tc.KeyFile, err) return nil, fmt.Errorf("cannot load TLS certificate from `cert_file`=%q, `key_file`=%q: %w", tc.CertFile, tc.KeyFile, err)
@ -797,35 +835,49 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
if !rootCA.AppendCertsFromPEM([]byte(tc.CA)) { if !rootCA.AppendCertsFromPEM([]byte(tc.CA)) {
return fmt.Errorf("cannot parse data from `ca` value") return fmt.Errorf("cannot parse data from `ca` value")
} }
tctx.getTLSRootCA = func() (*x509.CertPool, error) { tctx.getTLSRootCA = func() (*x509.CertPool, error) {
return rootCA, nil return rootCA, nil
} }
tctx.getRootCAPEM = func() ([]byte, error) {
return []byte(tc.CA), nil
}
h := xxhash.Sum64([]byte(tc.CA)) h := xxhash.Sum64([]byte(tc.CA))
tctx.tlsRootCADigest = fmt.Sprintf("digest(CA)=%d", h) tctx.tlsRootCADigest = fmt.Sprintf("digest(CA)=%d", h)
} else if tc.CAFile != "" { } else if tc.CAFile != "" {
path := fscore.GetFilepath(baseDir, tc.CAFile) path := fscore.GetFilepath(baseDir, tc.CAFile)
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
getRootCAPEM := func() ([]byte, error) {
data, err := fscore.ReadFileOrHTTP(path) data, err := fscore.ReadFileOrHTTP(path)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot read `ca_file`: %w", err) return nil, fmt.Errorf("cannot read `ca_file`: %w", err)
} }
return data, nil
}
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
data, err := getRootCAPEM()
if err != nil {
return nil, err
}
rootCA := x509.NewCertPool() rootCA := x509.NewCertPool()
if !rootCA.AppendCertsFromPEM(data) { if !rootCA.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("cannot parse data read from `ca_file` %q", tc.CAFile) return nil, fmt.Errorf("cannot parse data read from `ca_file` %q", tc.CAFile)
} }
return rootCA, nil return rootCA, nil
} }
// The Config.NewTLSConfig() is called only once per each scrape target initialization. tctx.getRootCAPEM = func() ([]byte, error) {
// So, the tlsRootCADigest must contain the hash of CAFile contents additionally to CAFile itself, data, err := getRootCAPEM()
// in order to properly reload scrape target configs when CAFile contents changes.
data, err := fscore.ReadFileOrHTTP(path)
if err != nil { if err != nil {
// Do not return the error to the caller, since this may result in fatal error. return nil, err
// The CAFile contents can become available on the next check of scrape configs.
data = []byte("read error")
} }
h := xxhash.Sum64(data) return data, nil
tctx.tlsRootCADigest = fmt.Sprintf("caFile=%q, digest(caFile)=%d", tc.CAFile, h) }
// Does not hash file contents, since they may change at any time.
// TLSRoundTripper must be used to automatically update the root CA.
tctx.tlsRootCADigest = fmt.Sprintf("caFile=%q", tc.CAFile)
} }
v, err := netutil.ParseTLSVersion(tc.MinVersion) v, err := netutil.ParseTLSVersion(tc.MinVersion)
if err != nil { if err != nil {
@ -834,3 +886,118 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
tctx.minVersion = v tctx.minVersion = v
return nil return nil
} }
// NewTLSRoundTripper returns new http.RoundTripper which automatically updates
// RootCA in tls.Config whenever it changes.
func (tctx *tlsContext) NewTLSRoundTripper(cfg *tls.Config, builder func(transport *http.Transport)) (http.RoundTripper, error) {
// TLS context is not initialized so use the provided RoundTripper without wrapper
if tctx == nil || tctx.getRootCAPEM == nil {
tr := &http.Transport{
TLSClientConfig: cfg,
}
builder(tr)
return tr, nil
}
var deadline uint64
var rootCA []byte
var err error
getTLSDigestsLocked := func() []byte {
if fasttime.UnixTimestamp() > deadline {
rootCA, err = tctx.getRootCAPEM()
if err != nil {
logger.Warnf("cannot load root CA: %s", err)
}
deadline = fasttime.UnixTimestamp() + tlsCertsCacheSeconds
}
return rootCA
}
tr := &http.Transport{
TLSClientConfig: cfg,
}
builder(tr)
return &TLSRoundTripper{
builder: builder,
getRootCABytesLocked: getTLSDigestsLocked,
getTLSRootCA: newGetTLSRootCACached(tctx.getTLSRootCA),
config: cfg,
tr: tr,
}, nil
}
var _ http.RoundTripper = &TLSRoundTripper{}
// TLSRoundTripper is an implementation of http.RoundTripper which automatically
// updates RootCA in tls.Config whenever it changes.
type TLSRoundTripper struct {
builder func(*http.Transport)
getRootCABytesLocked func() []byte
getTLSRootCA func() (*x509.CertPool, error)
config *tls.Config
tr http.RoundTripper
m sync.Mutex
rootCABytes []byte
}
// RoundTrip implements http.RoundTripper.RoundTrip.
// It automatically updates RootCA in tls.Config whenever it changes.
func (t *TLSRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
t.m.Lock()
rootCABytes := t.getRootCABytesLocked()
equal := bytes.Equal(rootCABytes, t.rootCABytes)
if equal {
t.m.Unlock()
return t.tr.RoundTrip(request)
}
err := t.recreateRoundTripperLocked(rootCABytes)
t.m.Unlock()
if err != nil {
return nil, fmt.Errorf("cannot recreate RoundTripper: %w", err)
}
return t.tr.RoundTrip(request)
}
func (t *TLSRoundTripper) recreateRoundTripperLocked(rootCABytes []byte) error {
newRootCaPool, err := t.getTLSRootCA()
if err != nil {
return fmt.Errorf("cannot load root CAs: %w", err)
}
newTLSConfig := t.config.Clone()
newTLSConfig.RootCAs = newRootCaPool
// Reset ClientSessionCache, since it may contain sessions for the old root CA.
newTLSConfig.ClientSessionCache = tls.NewLRUClientSessionCache(0)
tr := &http.Transport{
TLSClientConfig: newTLSConfig,
}
t.builder(tr)
oldRt := t.tr
t.tr = tr
t.rootCABytes = rootCABytes
t.config = newTLSConfig
if oldRt != nil {
closeIdleConnections(oldRt)
}
return nil
}
type closeIdler interface {
CloseIdleConnections()
}
func closeIdleConnections(t http.RoundTripper) {
if ci, ok := t.(closeIdler); ok {
ci.CloseIdleConnections()
}
}

View file

@ -1,9 +1,20 @@
package promauth package promauth
import ( import (
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"os"
"testing" "testing"
"time"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -593,3 +604,143 @@ func TestConfigHeaders(t *testing.T) {
f([]string{"foo: bar"}, "Foo: bar\r\n") f([]string{"foo: bar"}, "Foo: bar\r\n")
f([]string{"Foo-Bar: Baz s:sdf", "A:b", "X-Forwarded-For: A-B:c"}, "Foo-Bar: Baz s:sdf\r\nA: b\r\nX-Forwarded-For: A-B:c\r\n") f([]string{"Foo-Bar: Baz s:sdf", "A:b", "X-Forwarded-For: A-B:c"}, "Foo-Bar: Baz s:sdf\r\nA: b\r\nX-Forwarded-For: A-B:c\r\n")
} }
func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
// Generate and save a self-signed CA certificate and a certificate signed by the CA
caPEM, certPEM, keyPEM := generateCertificates(t)
_ = os.WriteFile("testdata/ca.pem", caPEM, 0644)
_ = os.WriteFile("testdata/cert.pem", certPEM, 0644)
_ = os.WriteFile("testdata/key.pem", keyPEM, 0644)
defer func() {
for _, p := range []string{
"testdata/ca.pem",
"testdata/cert.pem",
"testdata/key.pem",
} {
_ = os.Remove(p)
}
}()
cert, err := tls.LoadX509KeyPair("testdata/cert.pem", "testdata/key.pem")
if err != nil {
t.Fatalf("cannot load generated certificate: %s", err)
}
tlsConfig := &tls.Config{}
tlsConfig.Certificates = []tls.Certificate{cert}
s := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
s.TLS = tlsConfig
s.StartTLS()
serverURL, _ := url.Parse(s.URL)
opts := Options{
TLSConfig: &TLSConfig{
CAFile: "testdata/ca.pem",
},
}
ac, err := opts.NewConfig()
if err != nil {
t.Fatalf("unexpected error when parsing config: %s", err)
}
tr, err := ac.NewRoundTripper(func(tr *http.Transport) {})
if err != nil {
t.Fatalf("unexpected error when creating roundtripper: %s", err)
}
client := http.Client{
Transport: tr,
}
resp, err := client.Do(&http.Request{
Method: http.MethodGet,
URL: serverURL,
})
if err != nil {
t.Fatalf("unexpected error when making request: %s", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("expected status code %d; got %d", http.StatusOK, resp.StatusCode)
}
// Update CA file with new CA and get config
ca2PEM, _, _ := generateCertificates(t)
_ = os.WriteFile("testdata/ca.pem", ca2PEM, 0644)
// Wait for cert cache expiration
time.Sleep(2 * tlsCertsCacheSeconds * time.Second)
_, err = client.Do(&http.Request{
Method: http.MethodGet,
URL: serverURL,
})
if err == nil {
t.Fatal("expected TLS verification error, got nil")
}
}
func generateCertificates(t *testing.T) ([]byte, []byte, []byte) {
// Small key size for faster tests
const testCertificateBits = 1024
ca := &x509.Certificate{
SerialNumber: big.NewInt(2024),
Subject: pkix.Name{
Organization: []string{"Test CA"},
},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(10, 0, 0),
IsCA: true,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}
caPrivKey, err := rsa.GenerateKey(rand.Reader, testCertificateBits)
if err != nil {
t.Fatal(err)
}
caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey)
if err != nil {
t.Fatal(err)
}
caPEM := pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: caBytes,
})
cert := &x509.Certificate{
SerialNumber: big.NewInt(2020),
Subject: pkix.Name{
Organization: []string{"Test Cert"},
},
IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(10, 0, 0),
IsCA: false,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature,
BasicConstraintsValid: true,
}
key, err := rsa.GenerateKey(rand.Reader, testCertificateBits)
if err != nil {
t.Fatal(err)
}
certBytes, err := x509.CreateCertificate(rand.Reader, cert, ca, &key.PublicKey, caPrivKey)
if err != nil {
t.Fatal(err)
}
certPEM := pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: certBytes,
})
keyPEM := pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(key),
})
return caPEM, certPEM, keyPEM
}

View file

@ -11,9 +11,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/metrics"
) )
var ( var (
@ -43,20 +44,13 @@ type client struct {
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) { func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
isTLS := strings.HasPrefix(sw.ScrapeURL, "https://") isTLS := strings.HasPrefix(sw.ScrapeURL, "https://")
var tlsCfg *tls.Config
if isTLS {
var err error
tlsCfg, err = sw.AuthConfig.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
}
setHeaders := func(req *http.Request) error { setHeaders := func(req *http.Request) error {
return sw.AuthConfig.SetHeaders(req, true) return sw.AuthConfig.SetHeaders(req, true)
} }
setProxyHeaders := func(_ *http.Request) error { setProxyHeaders := func(_ *http.Request) error {
return nil return nil
} }
var tlsCfg *tls.Config
proxyURL := sw.ProxyURL proxyURL := sw.ProxyURL
if !isTLS && proxyURL.IsHTTPOrHTTPS() { if !isTLS && proxyURL.IsHTTPOrHTTPS() {
pu := proxyURL.GetURL() pu := proxyURL.GetURL()
@ -75,20 +69,29 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
if pu := sw.ProxyURL.GetURL(); pu != nil { if pu := sw.ProxyURL.GetURL(); pu != nil {
proxyURLFunc = http.ProxyURL(pu) proxyURLFunc = http.ProxyURL(pu)
} }
hc := &http.Client{
Transport: &http.Transport{ rt, err := sw.AuthConfig.NewRoundTripper(func(tr *http.Transport) {
TLSClientConfig: tlsCfg, if !isTLS && proxyURL.IsHTTPOrHTTPS() {
Proxy: proxyURLFunc, tr.TLSClientConfig = tlsCfg
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
DialContext: statStdDial,
MaxIdleConnsPerHost: 100,
MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N),
},
Timeout: sw.ScrapeTimeout,
} }
tr.Proxy = proxyURLFunc
tr.TLSHandshakeTimeout = 10 * time.Second
tr.IdleConnTimeout = 2 * sw.ScrapeInterval
tr.DisableCompression = *disableCompression || sw.DisableCompression
tr.DisableKeepAlives = *disableKeepAlive || sw.DisableKeepAlive
tr.DialContext = statStdDial
tr.MaxIdleConnsPerHost = 100
tr.MaxResponseHeaderBytes = int64(maxResponseHeadersSize.N)
})
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
hc := &http.Client{
Transport: rt,
}
if sw.DenyRedirects { if sw.DenyRedirects {
hc.CheckRedirect = func(_ *http.Request, _ []*http.Request) error { hc.CheckRedirect = func(_ *http.Request, _ []*http.Request) error {
return http.ErrUseLastResponse return http.ErrUseLastResponse

View file

@ -251,18 +251,19 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
if proxyURL != nil { if proxyURL != nil {
proxy = http.ProxyURL(proxyURL) proxy = http.ProxyURL(proxyURL)
} }
tlsConfig, err := ac.NewTLSConfig() tr, err := ac.NewRoundTripper(func(tr *http.Transport) {
tr.Proxy = proxy
tr.TLSHandshakeTimeout = 10 * time.Second
tr.IdleConnTimeout = *apiServerTimeout
tr.MaxIdleConnsPerHost = 100
})
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err) return nil, fmt.Errorf("cannot initialize tls config: %w", err)
} }
client := &http.Client{ client := &http.Client{
Transport: &http.Transport{ Transport: tr,
TLSClientConfig: tlsConfig,
Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: *apiServerTimeout,
MaxIdleConnsPerHost: 100,
},
Timeout: *apiServerTimeout, Timeout: *apiServerTimeout,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View file

@ -94,15 +94,14 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
cfg.client.CloseIdleConnections() cfg.client.CloseIdleConnections()
return nil, fmt.Errorf("cannot parse TLS config: %w", err) return nil, fmt.Errorf("cannot parse TLS config: %w", err)
} }
tlsConfig, err := ac.NewTLSConfig() tr, err := ac.NewRoundTripper(func(tr *http.Transport) {
tr.MaxIdleConnsPerHost = 100
})
if err != nil { if err != nil {
cfg.client.CloseIdleConnections() cfg.client.CloseIdleConnections()
return nil, fmt.Errorf("cannot initialize TLS config: %w", err) return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
} }
cfg.client.Transport = &http.Transport{ cfg.client.Transport = tr
TLSClientConfig: tlsConfig,
MaxIdleConnsPerHost: 100,
}
} }
// use public compute endpoint by default // use public compute endpoint by default
if len(cfg.availability) == 0 { if len(cfg.availability) == 0 {

View file

@ -47,7 +47,7 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
} }
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
transport := &http.Transport{ var transport http.RoundTripper = &http.Transport{
MaxIdleConnsPerHost: 100, MaxIdleConnsPerHost: 100,
} }
if sdc.TLSConfig != nil { if sdc.TLSConfig != nil {
@ -59,11 +59,12 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse TLS config: %w", err) return nil, fmt.Errorf("cannot parse TLS config: %w", err)
} }
tlsConfig, err := ac.NewTLSConfig() transport, err = ac.NewRoundTripper(func(tr *http.Transport) {
tr.MaxIdleConnsPerHost = 100
})
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot initialize TLS config: %w", err) return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
} }
transport.TLSClientConfig = tlsConfig
} }
cfg := &apiConfig{ cfg := &apiConfig{
client: &http.Client{ client: &http.Client{

View file

@ -2,7 +2,6 @@ package discoveryutils
import ( import (
"context" "context"
"crypto/tls"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
@ -14,10 +13,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
) )
var ( var (
@ -106,42 +106,40 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
} }
} }
isTLS := u.Scheme == "https"
var tlsCfg *tls.Config
if isTLS {
var err error
tlsCfg, err = ac.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
}
var proxyURLFunc func(*http.Request) (*url.URL, error) var proxyURLFunc func(*http.Request) (*url.URL, error)
if pu := proxyURL.GetURL(); pu != nil { if pu := proxyURL.GetURL(); pu != nil {
proxyURLFunc = http.ProxyURL(pu) proxyURLFunc = http.ProxyURL(pu)
} }
tr, err := ac.NewRoundTripper(func(tr *http.Transport) {
tr.Proxy = proxyURLFunc
tr.TLSHandshakeTimeout = 10 * time.Second
tr.MaxIdleConnsPerHost = *maxConcurrency
tr.ResponseHeaderTimeout = DefaultClientReadTimeout
tr.DialContext = dialFunc
})
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
blockingTR, err := ac.NewRoundTripper(func(tr *http.Transport) {
tr.Proxy = proxyURLFunc
tr.TLSHandshakeTimeout = 10 * time.Second
tr.MaxIdleConnsPerHost = 1000
tr.ResponseHeaderTimeout = BlockingClientReadTimeout
tr.DialContext = dialFunc
})
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
client := &http.Client{ client := &http.Client{
Timeout: DefaultClientReadTimeout, Timeout: DefaultClientReadTimeout,
Transport: &http.Transport{ Transport: tr,
TLSClientConfig: tlsCfg,
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: *maxConcurrency,
ResponseHeaderTimeout: DefaultClientReadTimeout,
DialContext: dialFunc,
},
} }
blockingClient := &http.Client{ blockingClient := &http.Client{
Timeout: BlockingClientReadTimeout, Timeout: BlockingClientReadTimeout,
Transport: &http.Transport{ Transport: blockingTR,
TLSClientConfig: tlsCfg,
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 1000,
ResponseHeaderTimeout: BlockingClientReadTimeout,
DialContext: dialFunc,
},
} }
setHTTPHeaders := func(_ *http.Request) error { return nil } setHTTPHeaders := func(_ *http.Request) error { return nil }