fix inconsistent behaviors with prometheus when scraping (#5153)

* fix inconsistent behaviors with prometheus when scraping

1. address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959. skip job with wrong syntax in `scrape_configs` with error logs instead of exiting;
2. show error messages on vmagent /targets ui if there are wrong auth configs in `scrape_configs`, previously will print error logs and do scrape without auth header;
3. don't send requests if there are wrong auth configs in:
    1. vmagent remoteWrite;
    2. vmalert datasource/remoteRead/remoteWrite/notifier.

* add changelogs

* address review comments

* fix ut
This commit is contained in:
Hui Wang 2023-10-17 17:58:19 +08:00 committed by GitHub
parent 3e2f09541e
commit e16d3f5639
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 293 additions and 156 deletions

View file

@ -323,26 +323,32 @@ func (c *client) runWorker() {
}
func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
req := c.newRequest(url, body)
req, err := c.newRequest(url, body)
if err != nil {
return nil, err
}
resp, err := c.hc.Do(req)
if err != nil && errors.Is(err, io.EOF) {
// it is likely connection became stale.
// So we do one more attempt in hope request will succeed.
// If not, the error should be handled by the caller as usual.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
req = c.newRequest(url, body)
req, _ = c.newRequest(url, body)
resp, err = c.hc.Do(req)
}
return resp, err
}
func (c *client) newRequest(url string, body []byte) *http.Request {
func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
reqBody := bytes.NewBuffer(body)
req, err := http.NewRequest(http.MethodPost, url, reqBody)
if err != nil {
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", url, err)
}
c.authCfg.SetHeaders(req, true)
err = c.authCfg.SetHeaders(req, true)
if err != nil {
return nil, err
}
h := req.Header
h.Set("User-Agent", "vmagent")
h.Set("Content-Type", "application/x-protobuf")
@ -360,7 +366,7 @@ func (c *client) newRequest(url string, body []byte) *http.Request {
logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err)
}
}
return req
return req, nil
}
// sendBlockHTTP sends the given block to c.remoteWriteURL.

View file

@ -111,6 +111,10 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
if err != nil {
return nil, fmt.Errorf("failed to configure auth: %w", err)
}
_, err = authCfg.GetAuthHeader()
if err != nil {
return nil, fmt.Errorf("failed to set request auth header to datasource %q: %s", *addr, err)
}
return &VMStorage{
c: &http.Client{Transport: tr},

View file

@ -137,12 +137,15 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati
// Query executes the given query and returns parsed response
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
req := s.newQueryRequest(query, ts)
req, err := s.newQueryRequest(query, ts)
if err != nil {
return Result{}, nil, err
}
resp, err := s.do(ctx, req)
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req = s.newQueryRequest(query, ts)
req, _ = s.newQueryRequest(query, ts)
resp, err = s.do(ctx, req)
}
if err != nil {
@ -173,12 +176,15 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim
if end.IsZero() {
return res, fmt.Errorf("end param is missing")
}
req := s.newQueryRangeRequest(query, start, end)
req, err := s.newQueryRangeRequest(query, start, end)
if err != nil {
return Result{}, err
}
resp, err := s.do(ctx, req)
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req = s.newQueryRangeRequest(query, start, end)
req, _ = s.newQueryRangeRequest(query, start, end)
resp, err = s.do(ctx, req)
}
if err != nil {
@ -210,14 +216,20 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response,
return resp, nil
}
func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) *http.Request {
req := s.newRequest()
func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) (*http.Request, error) {
req, err := s.newRequest()
if err != nil {
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %s", s.datasourceURL, err)
}
s.setPrometheusRangeReqParams(req, query, start, end)
return req
return req, nil
}
func (s *VMStorage) newQueryRequest(query string, ts time.Time) *http.Request {
req := s.newRequest()
func (s *VMStorage) newQueryRequest(query string, ts time.Time) (*http.Request, error) {
req, err := s.newRequest()
if err != nil {
return nil, fmt.Errorf("cannot create query request to datasource %q: %s", s.datasourceURL, err)
}
switch s.dataSourceType {
case "", datasourcePrometheus:
s.setPrometheusInstantReqParams(req, query, ts)
@ -226,20 +238,23 @@ func (s *VMStorage) newQueryRequest(query string, ts time.Time) *http.Request {
default:
logger.Panicf("BUG: engine not found: %q", s.dataSourceType)
}
return req
return req, nil
}
func (s *VMStorage) newRequest() *http.Request {
func (s *VMStorage) newRequest() (*http.Request, error) {
req, err := http.NewRequest(http.MethodPost, s.datasourceURL, nil)
if err != nil {
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err)
}
req.Header.Set("Content-Type", "application/json")
if s.authCfg != nil {
s.authCfg.SetHeaders(req, true)
err = s.authCfg.SetHeaders(req, true)
if err != nil {
return nil, err
}
}
for _, h := range s.extraHeaders {
req.Header.Set(h.key, h.value)
}
return req
return req, nil
}

View file

@ -637,7 +637,10 @@ func TestRequestParams(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := tc.vm.newRequest()
req, err := tc.vm.newRequest()
if err != nil {
t.Fatal(err)
}
switch tc.vm.dataSourceType {
case "", datasourcePrometheus:
if tc.queryRange {
@ -732,7 +735,10 @@ func TestHeaders(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
vm := tt.vmFn()
req := vm.newQueryRequest("foo", time.Now())
req, err := vm.newQueryRequest("foo", time.Now())
if err != nil {
t.Fatal(err)
}
tt.checkFn(t, req)
})
}

View file

@ -88,7 +88,10 @@ func (am *AlertManager) send(ctx context.Context, alerts []Alert, headers map[st
req = req.WithContext(ctx)
if am.authCfg != nil {
am.authCfg.SetHeaders(req, true)
err = am.authCfg.SetHeaders(req, true)
if err != nil {
return err
}
}
resp, err := am.client.Do(req)
if err != nil {

View file

@ -280,7 +280,10 @@ func (c *Client) send(ctx context.Context, data []byte) error {
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil {
c.authCfg.SetHeaders(req, true)
err = c.authCfg.SetHeaders(req, true)
if err != nil {
return &nonRetriableError{err: err}
}
}
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")

View file

@ -38,6 +38,7 @@ The sandbox cluster installation is running under the constant load generated by
This also means that `datasource.queryTimeAlignment` command-line flag becomes deprecated now and will have no effect if configured. If `datasource.queryTimeAlignment` was set to `false` before, then `eval_alignment` has to be set to `false` explicitly under group.
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): skip job with error logs if there is incorrect syntax under `scrape_configs`, previously will exit. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-deleteAllObjectVersions` command-line flag, which can be used for forcing removal of all object versions in remote object storage. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5121) issue and [these docs](https://docs.victoriametrics.com/vmbackup.html#permanent-deletion-of-objects-in-s3-compatible-storages) for the details.
* FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`.
@ -50,6 +51,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix vmalert web UI when running on 32-bit architectures machine.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): don't send requests if there is wrong auth config in `datasource`, `remoteWrite`, `remoteRead` and `notifier` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
* BUGFIX: [vmselect](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087) for details.
* BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component.
* BUGFIX: dashboards/vmalert: apply `desc` sorting in tooltips for vmalert dashboard in order to improve visibility of the outliers on graph.
@ -57,7 +59,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): don't send requests if there is wrong auth config in `scrape_configs` and `remoteWrite` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0)

View file

@ -14,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/fasthttp"
"github.com/cespare/xxhash/v2"
@ -194,9 +193,6 @@ type oauth2ConfigInternal struct {
}
func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInternal, error) {
if err := o.validate(); err != nil {
return nil, err
}
oi := &oauth2ConfigInternal{
cfg: &clientcredentials.Config{
ClientID: o.ClientID,
@ -280,7 +276,7 @@ type Config struct {
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
tlsCertDigest string
getAuthHeader func() string
getAuthHeader func() (string, error)
authHeaderLock sync.Mutex
authHeader string
authHeaderDeadline uint64
@ -325,45 +321,58 @@ func (ac *Config) HeadersNoAuthString() string {
}
// SetHeaders sets the configured ac headers to req.
func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) {
func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) error {
reqHeaders := req.Header
for _, h := range ac.headers {
reqHeaders.Set(h.key, h.value)
}
if setAuthHeader {
if ah := ac.GetAuthHeader(); ah != "" {
ah, err := ac.GetAuthHeader()
if err != nil {
return fmt.Errorf("failed to set request auth header: %w", err)
}
if ah != "" {
reqHeaders.Set("Authorization", ah)
}
}
return nil
}
// SetFasthttpHeaders sets the configured ac headers to req.
func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool) {
func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool) error {
reqHeaders := &req.Header
for _, h := range ac.headers {
reqHeaders.Set(h.key, h.value)
}
if setAuthHeader {
if ah := ac.GetAuthHeader(); ah != "" {
ah, err := ac.GetAuthHeader()
if err != nil {
return err
}
if ah != "" {
reqHeaders.Set("Authorization", ah)
}
}
return nil
}
// GetAuthHeader returns optional `Authorization: ...` http header.
func (ac *Config) GetAuthHeader() string {
func (ac *Config) GetAuthHeader() (string, error) {
f := ac.getAuthHeader
if f == nil {
return ""
return "", nil
}
ac.authHeaderLock.Lock()
defer ac.authHeaderLock.Unlock()
if fasttime.UnixTimestamp() > ac.authHeaderDeadline {
ac.authHeader = f()
var err error
if ac.authHeader, err = f(); err != nil {
return "", err
}
// Cache the authHeader for a second.
ac.authHeaderDeadline = fasttime.UnixTimestamp() + 1
}
return ac.authHeader
return ac.authHeader, nil
}
// String returns human-readable representation for ac.
@ -564,7 +573,7 @@ func (opts *Options) NewConfig() (*Config, error) {
type authContext struct {
// getAuthHeader must return <value> for 'Authorization: <value>' http request header
getAuthHeader func() string
getAuthHeader func() (string, error)
// authDigest must contain the digest for the used authorization
// The digest must be changed whenever the original config changes.
@ -577,8 +586,8 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
azType = az.Type
}
if az.CredentialsFile == "" {
actx.getAuthHeader = func() string {
return azType + " " + az.Credentials.String()
actx.getAuthHeader = func() (string, error) {
return azType + " " + az.Credentials.String(), nil
}
actx.authDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials)
return nil
@ -587,13 +596,12 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
return fmt.Errorf("both `credentials`=%q and `credentials_file`=%q are set", az.Credentials, az.CredentialsFile)
}
filePath := fs.GetFilepath(baseDir, az.CredentialsFile)
actx.getAuthHeader = func() string {
actx.getAuthHeader = func() (string, error) {
token, err := readPasswordFromFile(filePath)
if err != nil {
logger.Errorf("cannot read credentials from `credentials_file`=%q: %s", az.CredentialsFile, err)
return ""
return "", fmt.Errorf("cannot read credentials from `credentials_file`=%q: %s", az.CredentialsFile, err)
}
return azType + " " + token
return azType + " " + token, nil
}
actx.authDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath)
return nil
@ -604,11 +612,11 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
return fmt.Errorf("missing `username` in `basic_auth` section")
}
if ba.PasswordFile == "" {
actx.getAuthHeader = func() string {
actx.getAuthHeader = func() (string, error) {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password.String()
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
return "Basic " + token64, nil
}
actx.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
@ -617,16 +625,15 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
return fmt.Errorf("both `password`=%q and `password_file`=%q are set in `basic_auth` section", ba.Password, ba.PasswordFile)
}
filePath := fs.GetFilepath(baseDir, ba.PasswordFile)
actx.getAuthHeader = func() string {
actx.getAuthHeader = func() (string, error) {
password, err := readPasswordFromFile(filePath)
if err != nil {
logger.Errorf("cannot read password from `password_file`=%q set in `basic_auth` section: %s", ba.PasswordFile, err)
return ""
return "", fmt.Errorf("cannot read password from `password_file`=%q set in `basic_auth` section: %s", ba.PasswordFile, err)
}
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
return "Basic " + token64, nil
}
actx.authDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath)
return nil
@ -634,43 +641,44 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile string) error {
filePath := fs.GetFilepath(baseDir, bearerTokenFile)
actx.getAuthHeader = func() string {
actx.getAuthHeader = func() (string, error) {
token, err := readPasswordFromFile(filePath)
if err != nil {
logger.Errorf("cannot read bearer token from `bearer_token_file`=%q: %s", bearerTokenFile, err)
return ""
return "", fmt.Errorf("cannot read bearer token from `bearer_token_file`=%q: %s", bearerTokenFile, err)
}
return "Bearer " + token
return "Bearer " + token, nil
}
actx.authDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath)
return nil
}
func (actx *authContext) initFromBearerToken(bearerToken string) error {
actx.getAuthHeader = func() string {
return "Bearer " + bearerToken
actx.getAuthHeader = func() (string, error) {
return "Bearer " + bearerToken, nil
}
actx.authDigest = fmt.Sprintf("bearer(token=%q)", bearerToken)
return nil
}
func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) error {
oi, err := newOAuth2ConfigInternal(baseDir, o)
if err != nil {
if err := o.validate(); err != nil {
return err
}
actx.getAuthHeader = func() string {
actx.getAuthHeader = func() (string, error) {
oi, err := newOAuth2ConfigInternal(baseDir, o)
if err != nil {
return "", err
}
ts, err := oi.getTokenSource()
if err != nil {
logger.Errorf("cannot get OAuth2 tokenSource: %s", err)
return ""
return "", fmt.Errorf("cannot get OAuth2 tokenSource: %s", err)
}
t, err := ts.Token()
if err != nil {
logger.Errorf("cannot get OAuth2 token: %s", err)
return ""
return "", fmt.Errorf("cannot get OAuth2 token: %s", err)
}
return t.Type() + " " + t.AccessToken
return t.Type() + " " + t.AccessToken, nil
}
actx.authDigest = fmt.Sprintf("oauth2(%s)", o.String())
return nil
@ -679,6 +687,7 @@ func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) e
type tlsContext struct {
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
tlsCertDigest string
rootCA *x509.CertPool
serverName string
insecureSkipVerify bool

View file

@ -13,6 +13,7 @@ func TestNewConfig(t *testing.T) {
name string
opts Options
wantErr bool
wantErrWhenSetHeader bool
expectHeader string
}{
{
@ -49,6 +50,21 @@ func TestNewConfig(t *testing.T) {
},
wantErr: true,
},
{
name: "OAuth2 with wrong tls",
opts: Options{
OAuth2: &OAuth2Config{
ClientID: "some-id",
ClientSecret: NewSecret("some-secret"),
TokenURL: "http://localhost:8511",
TLSConfig: &TLSConfig{
InsecureSkipVerify: true,
CAFile: "non-existing-file",
},
},
},
wantErrWhenSetHeader: true,
},
{
name: "basic Auth config",
opts: Options{
@ -69,6 +85,16 @@ func TestNewConfig(t *testing.T) {
},
expectHeader: "Basic dXNlcjpzZWNyZXQtY29udGVudA==",
},
{
name: "basic Auth config with non-existing file",
opts: Options{
BasicAuth: &BasicAuthConfig{
Username: "user",
PasswordFile: "testdata/non-existing-file",
},
},
wantErrWhenSetHeader: true,
},
{
name: "want Authorization",
opts: Options{
@ -96,6 +122,17 @@ func TestNewConfig(t *testing.T) {
},
expectHeader: "Bearer some-token",
},
{
name: "tls with non-existing file",
opts: Options{
BearerToken: "some-token",
TLSConfig: &TLSConfig{
InsecureSkipVerify: true,
CAFile: "non-existing-file",
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -104,7 +141,6 @@ func TestNewConfig(t *testing.T) {
r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"access_token":"some-token","token_type": "Bearer"}`))
})
mock := httptest.NewServer(r)
tt.opts.OAuth2.TokenURL = mock.URL
@ -119,13 +155,19 @@ func TestNewConfig(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error in http.NewRequest: %s", err)
}
got.SetHeaders(req, true)
err = got.SetHeaders(req, true)
if (err != nil) != tt.wantErrWhenSetHeader {
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader)
}
ah := req.Header.Get("Authorization")
if ah != tt.expectHeader {
t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, tt.expectHeader)
}
var fhreq fasthttp.Request
got.SetFasthttpHeaders(&fhreq, true)
err = got.SetFasthttpHeaders(&fhreq, true)
if (err != nil) != tt.wantErrWhenSetHeader {
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader)
}
ahb := fhreq.Header.Peek("Authorization")
if string(ahb) != tt.expectHeader {
t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, tt.expectHeader)
@ -191,7 +233,7 @@ func TestConfigHeaders(t *testing.T) {
if result != resultExpected {
t.Fatalf("unexpected result from HeadersNoAuthString; got\n%s\nwant\n%s", result, resultExpected)
}
c.SetHeaders(req, false)
_ = c.SetHeaders(req, false)
for _, h := range headersParsed {
v := req.Header.Get(h.key)
if v != h.value {
@ -199,7 +241,7 @@ func TestConfigHeaders(t *testing.T) {
}
}
var fhreq fasthttp.Request
c.SetFasthttpHeaders(&fhreq, false)
_ = c.SetFasthttpHeaders(&fhreq, false)
for _, h := range headersParsed {
v := fhreq.Header.Peek(h.key)
if string(v) != h.value {

View file

@ -49,10 +49,10 @@ type client struct {
scrapeTimeoutSecondsStr string
hostPort string
requestURI string
setHeaders func(req *http.Request)
setProxyHeaders func(req *http.Request)
setFasthttpHeaders func(req *fasthttp.Request)
setFasthttpProxyHeaders func(req *fasthttp.Request)
setHeaders func(req *http.Request) error
setProxyHeaders func(req *http.Request) error
setFasthttpHeaders func(req *fasthttp.Request) error
setFasthttpProxyHeaders func(req *fasthttp.Request) error
denyRedirects bool
disableCompression bool
disableKeepAlive bool
@ -81,7 +81,7 @@ func concatTwoStrings(x, y string) string {
const scrapeUserAgent = "vm_promscrape"
func newClient(ctx context.Context, sw *ScrapeWork) *client {
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
var u fasthttp.URI
u.Update(sw.ScrapeURL)
hostPort := string(u.Host())
@ -92,8 +92,8 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
if isTLS {
tlsCfg = sw.AuthConfig.NewTLSConfig()
}
setProxyHeaders := func(req *http.Request) {}
setFasthttpProxyHeaders := func(req *fasthttp.Request) {}
setProxyHeaders := func(req *http.Request) error { return nil }
setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil }
proxyURL := sw.ProxyURL
if !isTLS && proxyURL.IsHTTPOrHTTPS() {
// Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets
@ -107,11 +107,11 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
tlsCfg = sw.ProxyAuthConfig.NewTLSConfig()
}
proxyURLOrig := proxyURL
setProxyHeaders = func(req *http.Request) {
proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req)
setProxyHeaders = func(req *http.Request) error {
return proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req)
}
setFasthttpProxyHeaders = func(req *fasthttp.Request) {
proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req)
setFasthttpProxyHeaders = func(req *fasthttp.Request) error {
return proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req)
}
proxyURL = &proxy.URL{}
}
@ -119,7 +119,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
dialAddr = addMissingPort(dialAddr, isTLS)
dialFunc, err := newStatDialFunc(proxyURL, sw.ProxyAuthConfig)
if err != nil {
logger.Fatalf("cannot create dial func: %s", err)
return nil, fmt.Errorf("cannot create dial func: %s", err)
}
hc := &fasthttp.HostClient{
Addr: dialAddr,
@ -168,14 +168,14 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
hostPort: hostPort,
requestURI: requestURI,
setHeaders: func(req *http.Request) { sw.AuthConfig.SetHeaders(req, true) },
setHeaders: func(req *http.Request) error { return sw.AuthConfig.SetHeaders(req, true) },
setProxyHeaders: setProxyHeaders,
setFasthttpHeaders: func(req *fasthttp.Request) { sw.AuthConfig.SetFasthttpHeaders(req, true) },
setFasthttpHeaders: func(req *fasthttp.Request) error { return sw.AuthConfig.SetFasthttpHeaders(req, true) },
setFasthttpProxyHeaders: setFasthttpProxyHeaders,
denyRedirects: sw.DenyRedirects,
disableCompression: sw.DisableCompression,
disableKeepAlive: sw.DisableKeepAlive,
}
}, nil
}
func (c *client) GetStreamReader() (*streamReader, error) {
@ -196,8 +196,16 @@ func (c *client) GetStreamReader() (*streamReader, error) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
req.Header.Set("User-Agent", scrapeUserAgent)
c.setHeaders(req)
c.setProxyHeaders(req)
err = c.setHeaders(req)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
err = c.setProxyHeaders(req)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
scrapeRequests.Inc()
resp, err := c.sc.Do(req)
if err != nil {
@ -244,8 +252,14 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
c.setFasthttpHeaders(req)
c.setFasthttpProxyHeaders(req)
err := c.setFasthttpHeaders(req)
if err != nil {
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
err = c.setFasthttpProxyHeaders(req)
if err != nil {
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
if !*disableCompression && !c.disableCompression {
req.Header.Set("Accept-Encoding", "gzip")
}
@ -263,7 +277,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
ctx, cancel := context.WithDeadline(c.ctx, deadline)
defer cancel()
err := doRequestWithPossibleRetry(ctx, c.hc, req, resp)
err = doRequestWithPossibleRetry(ctx, c.hc, req, resp)
statusCode := resp.StatusCode()
redirectsCount := 0
for err == nil && isStatusRedirect(statusCode) {

View file

@ -482,6 +482,7 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
}
// Initialize cfg.ScrapeConfigs
var validScrapeConfigs []*ScrapeConfig
for i, sc := range cfg.ScrapeConfigs {
// Make a copy of sc in order to remove references to `data` memory.
// This should prevent from memory leaks on config reload.
@ -490,10 +491,14 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global)
if err != nil {
return nil, fmt.Errorf("cannot parse `scrape_config`: %w", err)
// print error and skip invalid scrape config
logger.Errorf("cannot parse `scrape_config` for job %q, skip it: %w", sc.JobName, err)
continue
}
sc.swc = swc
validScrapeConfigs = append(validScrapeConfigs, sc)
}
cfg.ScrapeConfigs = validScrapeConfigs
return dataNew, nil
}

View file

@ -109,7 +109,6 @@ scrape_configs:
proxy_headers:
- 'My-Auth-Header: top-secret'
`)
}
func TestNeedSkipScrapeWork(t *testing.T) {
@ -446,27 +445,27 @@ func getStaticScrapeWork(data []byte, path string) ([]*ScrapeWork, error) {
return cfg.getStaticScrapeWork(), nil
}
func TestGetStaticScrapeWorkFailure(t *testing.T) {
f := func(data string) {
func TestGetStaticScrapeWork(t *testing.T) {
f := func(data string, wantErr bool, validConfigNum int) {
t.Helper()
sws, err := getStaticScrapeWork([]byte(data), "non-existing-file")
if err == nil {
t.Fatalf("expecting non-nil error")
if err != nil != wantErr {
t.Fatalf("expect err %t", wantErr)
}
if sws != nil {
t.Fatalf("expecting nil sws")
if !wantErr && len(sws) != validConfigNum {
t.Fatalf("got expected config num, expect %d", validConfigNum)
}
}
// incorrect yaml
f(`foo bar baz`)
f(`foo bar baz`, true, 0)
// Missing job_name
f(`
scrape_configs:
- static_configs:
- targets: ["foo"]
`)
`, false, 0)
// Duplicate job_name
f(`
@ -477,7 +476,7 @@ scrape_configs:
- job_name: foo
static_configs:
targets: ["bar"]
`)
`, true, 1)
// Invalid scheme
f(`
@ -486,7 +485,7 @@ scrape_configs:
scheme: asdf
static_configs:
- targets: ["foo"]
`)
`, false, 0)
// Missing username in `basic_auth`
f(`
@ -496,7 +495,7 @@ scrape_configs:
password: sss
static_configs:
- targets: ["a"]
`)
`, false, 0)
// Both password and password_file set in `basic_auth`
f(`
@ -508,7 +507,7 @@ scrape_configs:
password_file: sdfdf
static_configs:
- targets: ["a"]
`)
`, false, 0)
// Invalid password_file set in `basic_auth`
f(`
@ -519,7 +518,7 @@ scrape_configs:
password_file: ['foobar']
static_configs:
- targets: ["a"]
`)
`, true, 0)
// Both `bearer_token` and `bearer_token_file` are set
f(`
@ -529,7 +528,7 @@ scrape_configs:
bearer_token_file: bar
static_configs:
- targets: ["a"]
`)
`, false, 0)
// Both `basic_auth` and `bearer_token` are set
f(`
@ -541,7 +540,7 @@ scrape_configs:
password: bar
static_configs:
- targets: ["a"]
`)
`, false, 0)
// Both `authorization` and `basic_auth` are set
f(`
@ -553,7 +552,7 @@ scrape_configs:
username: foobar
static_configs:
- targets: ["a"]
`)
`, false, 0)
// Both `authorization` and `bearer_token` are set
f(`
@ -564,7 +563,7 @@ scrape_configs:
bearer_token: foo
static_configs:
- targets: ["a"]
`)
`, false, 0)
// Invalid `bearer_token_file`
f(`
@ -573,7 +572,7 @@ scrape_configs:
bearer_token_file: [foobar]
static_configs:
- targets: ["a"]
`)
`, true, 0)
// non-existing ca_file
f(`
@ -583,7 +582,7 @@ scrape_configs:
ca_file: non/extising/file
static_configs:
- targets: ["s"]
`)
`, false, 0)
// invalid ca_file
f(`
@ -593,7 +592,7 @@ scrape_configs:
ca_file: testdata/prometheus.yml
static_configs:
- targets: ["s"]
`)
`, false, 0)
// non-existing cert_file
f(`
@ -603,7 +602,7 @@ scrape_configs:
cert_file: non/extising/file
static_configs:
- targets: ["s"]
`)
`, false, 0)
// non-existing key_file
f(`
@ -613,7 +612,7 @@ scrape_configs:
key_file: non/extising/file
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Invalid regex in relabel_configs
f(`
@ -625,7 +624,7 @@ scrape_configs:
target_label: bar
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Missing target_label for action=replace in relabel_configs
f(`
@ -636,7 +635,7 @@ scrape_configs:
source_labels: [foo]
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Missing source_labels for action=keep in relabel_configs
f(`
@ -646,7 +645,7 @@ scrape_configs:
- action: keep
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Missing source_labels for action=drop in relabel_configs
f(`
@ -656,7 +655,7 @@ scrape_configs:
- action: drop
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Missing source_labels for action=hashmod in relabel_configs
f(`
@ -668,7 +667,7 @@ scrape_configs:
modulus: 123
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Missing target for action=hashmod in relabel_configs
f(`
@ -680,7 +679,7 @@ scrape_configs:
modulus: 123
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Missing modulus for action=hashmod in relabel_configs
f(`
@ -692,7 +691,7 @@ scrape_configs:
target_label: bar
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Invalid action in relabel_configs
f(`
@ -702,7 +701,7 @@ scrape_configs:
- action: foobar
static_configs:
- targets: ["s"]
`)
`, false, 0)
// Invalid scrape_config_files contents
f(`
@ -710,7 +709,7 @@ scrape_config_files:
- job_name: aa
static_configs:
- targets: ["s"]
`)
`, true, 0)
}
func resetNonEssentialFields(sws []*ScrapeWork) {

View file

@ -211,7 +211,7 @@ type groupWatcher struct {
selectors []Selector
attachNodeMetadata bool
setHeaders func(req *http.Request)
setHeaders func(req *http.Request) error
client *http.Client
mu sync.Mutex
@ -239,7 +239,7 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
selectors: selectors,
attachNodeMetadata: attachNodeMetadata,
setHeaders: func(req *http.Request) { ac.SetHeaders(req, true) },
setHeaders: func(req *http.Request) error { return ac.SetHeaders(req, true) },
client: client,
m: make(map[string]*urlWatcher),
}
@ -420,7 +420,10 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
gw.setHeaders(req)
err = gw.setHeaders(req)
if err != nil {
return nil, err
}
resp, err := gw.client.Do(req)
if err != nil {
return nil, err

View file

@ -68,8 +68,8 @@ type Client struct {
apiServer string
setHTTPHeaders func(req *http.Request)
setHTTPProxyHeaders func(req *http.Request)
setHTTPHeaders func(req *http.Request) error
setHTTPProxyHeaders func(req *http.Request) error
clientCtx context.Context
clientCancel context.CancelFunc
@ -140,10 +140,10 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
},
}
setHTTPHeaders := func(req *http.Request) {}
setHTTPHeaders := func(req *http.Request) error { return nil }
if ac != nil {
setHTTPHeaders = func(req *http.Request) {
ac.SetHeaders(req, true)
setHTTPHeaders = func(req *http.Request) error {
return ac.SetHeaders(req, true)
}
}
if httpCfg.FollowRedirects != nil && !*httpCfg.FollowRedirects {
@ -153,10 +153,10 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
client.CheckRedirect = checkRedirect
blockingClient.CheckRedirect = checkRedirect
}
setHTTPProxyHeaders := func(req *http.Request) {}
setHTTPProxyHeaders := func(req *http.Request) error { return nil }
if proxyAC != nil {
setHTTPProxyHeaders = func(req *http.Request) {
proxyURL.SetHeaders(proxyAC, req)
setHTTPProxyHeaders = func(req *http.Request) error {
return proxyURL.SetHeaders(proxyAC, req)
}
}
ctx, cancel := context.WithCancel(context.Background())
@ -247,8 +247,14 @@ func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, clien
return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err)
}
c.setHTTPHeaders(req)
c.setHTTPProxyHeaders(req)
err = c.setHTTPHeaders(req)
if err != nil {
return nil, fmt.Errorf("cannot set request http header for %q: %w", requestURL, err)
}
err = c.setHTTPProxyHeaders(req)
if err != nil {
return nil, fmt.Errorf("cannot set request http proxy header for %q: %w", requestURL, err)
}
if modifyRequest != nil {
modifyRequest(req)
}

View file

@ -405,7 +405,12 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
// Start new scrapers only after the deleted scrapers are stopped.
for _, sw := range swsToStart {
sc := newScraper(sw, sg.name, sg.pushData)
sc, err := newScraper(sw, sg.name, sg.pushData)
if err != nil {
// print error and skip invalid scraper config
logger.Errorf("cannot create scraper to %q in job %q, will skip it: %w", sw.ScrapeURL, sg.name, err)
continue
}
sg.activeScrapers.Inc()
sg.scrapersStarted.Inc()
sg.wg.Add(1)
@ -441,18 +446,21 @@ type scraper struct {
stoppedCh chan struct{}
}
func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper {
func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) (*scraper, error) {
ctx, cancel := context.WithCancel(context.Background())
sc := &scraper{
ctx: ctx,
cancel: cancel,
stoppedCh: make(chan struct{}),
}
c := newClient(ctx, sw)
c, err := newClient(ctx, sw)
if err != nil {
return &scraper{}, err
}
sc.sw.Config = sw
sc.sw.ScrapeGroup = group
sc.sw.ReadData = c.ReadData
sc.sw.GetStreamReader = c.GetStreamReader
sc.sw.PushData = pushData
return sc
return sc, nil
}

View file

@ -454,7 +454,6 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b
defer func() {
<-processScrapedDataConcurrencyLimitCh
}()
endTimestamp := time.Now().UnixNano() / 1e6
duration := float64(endTimestamp-realTimestamp) / 1e3
scrapeDuration.Update(duration)

View file

@ -73,38 +73,48 @@ func (u *URL) String() string {
}
// SetHeaders sets headers to req according to u and ac configs.
func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) {
ah := u.getAuthHeader(ac)
func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error {
ah, err := u.getAuthHeader(ac)
if err != nil {
return err
}
if ah != "" {
req.Header.Set("Proxy-Authorization", ah)
}
ac.SetHeaders(req, false)
return ac.SetHeaders(req, false)
}
// SetFasthttpHeaders sets headers to req according to u and ac configs.
func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) {
ah := u.getAuthHeader(ac)
func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) error {
ah, err := u.getAuthHeader(ac)
if err != nil {
return err
}
if ah != "" {
req.Header.Set("Proxy-Authorization", ah)
}
ac.SetFasthttpHeaders(req, false)
return ac.SetFasthttpHeaders(req, false)
}
// getAuthHeader returns Proxy-Authorization auth header for the given u and ac.
func (u *URL) getAuthHeader(ac *promauth.Config) string {
func (u *URL) getAuthHeader(ac *promauth.Config) (string, error) {
authHeader := ""
if ac != nil {
authHeader = ac.GetAuthHeader()
var err error
authHeader, err = ac.GetAuthHeader()
if err != nil {
return "", err
}
}
if u == nil || u.URL == nil {
return authHeader
return authHeader, nil
}
pu := u.URL
if pu.User != nil && len(pu.User.Username()) > 0 {
userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String()))
authHeader = "Basic " + userPasswordEncoded
}
return authHeader
return authHeader, nil
}
// MarshalYAML implements yaml.Marshaler interface.
@ -161,7 +171,10 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) {
if isTLS {
proxyConn = tls.Client(proxyConn, tlsCfg)
}
authHeader := u.getAuthHeader(ac)
authHeader, err := u.getAuthHeader(ac)
if err != nil {
return nil, fmt.Errorf("cannot get auth header: %s", err)
}
if authHeader != "" {
authHeader = "Proxy-Authorization: " + authHeader + "\r\n"
authHeader += ac.HeadersNoAuthString()