fix inconsistent behaviors with prometheus when scraping (#5153)

* fix inconsistent behaviors with prometheus when scraping

1. address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959. skip job with wrong syntax in `scrape_configs` with error logs instead of exiting;
2. show error messages on vmagent /targets ui if there are wrong auth configs in `scrape_configs`, previously will print error logs and do scrape without auth header;
3. don't send requests if there are wrong auth configs in:
    1. vmagent remoteWrite;
    2. vmalert datasource/remoteRead/remoteWrite/notifier.

* add changelogs

* address review comments

* fix ut
This commit is contained in:
Hui Wang 2023-10-17 17:58:19 +08:00 committed by Aliaksandr Valialkin
parent f00729ee24
commit d7dd7614eb
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
17 changed files with 293 additions and 156 deletions

View file

@ -323,26 +323,32 @@ func (c *client) runWorker() {
} }
func (c *client) doRequest(url string, body []byte) (*http.Response, error) { func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
req := c.newRequest(url, body) req, err := c.newRequest(url, body)
if err != nil {
return nil, err
}
resp, err := c.hc.Do(req) resp, err := c.hc.Do(req)
if err != nil && errors.Is(err, io.EOF) { if err != nil && errors.Is(err, io.EOF) {
// it is likely connection became stale. // it is likely connection became stale.
// So we do one more attempt in hope request will succeed. // So we do one more attempt in hope request will succeed.
// If not, the error should be handled by the caller as usual. // If not, the error should be handled by the caller as usual.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139 // This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
req = c.newRequest(url, body) req, _ = c.newRequest(url, body)
resp, err = c.hc.Do(req) resp, err = c.hc.Do(req)
} }
return resp, err return resp, err
} }
func (c *client) newRequest(url string, body []byte) *http.Request { func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
reqBody := bytes.NewBuffer(body) reqBody := bytes.NewBuffer(body)
req, err := http.NewRequest(http.MethodPost, url, reqBody) req, err := http.NewRequest(http.MethodPost, url, reqBody)
if err != nil { if err != nil {
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", url, err) logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", url, err)
} }
c.authCfg.SetHeaders(req, true) err = c.authCfg.SetHeaders(req, true)
if err != nil {
return nil, err
}
h := req.Header h := req.Header
h.Set("User-Agent", "vmagent") h.Set("User-Agent", "vmagent")
h.Set("Content-Type", "application/x-protobuf") h.Set("Content-Type", "application/x-protobuf")
@ -360,7 +366,7 @@ func (c *client) newRequest(url string, body []byte) *http.Request {
logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err) logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err)
} }
} }
return req return req, nil
} }
// sendBlockHTTP sends the given block to c.remoteWriteURL. // sendBlockHTTP sends the given block to c.remoteWriteURL.

View file

@ -115,6 +115,10 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to configure auth: %w", err) return nil, fmt.Errorf("failed to configure auth: %w", err)
} }
_, err = authCfg.GetAuthHeader()
if err != nil {
return nil, fmt.Errorf("failed to set request auth header to datasource %q: %s", *addr, err)
}
return &VMStorage{ return &VMStorage{
c: &http.Client{Transport: tr}, c: &http.Client{Transport: tr},

View file

@ -137,12 +137,15 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati
// Query executes the given query and returns parsed response // Query executes the given query and returns parsed response
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) { func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
req := s.newQueryRequest(query, ts) req, err := s.newQueryRequest(query, ts)
if err != nil {
return Result{}, nil, err
}
resp, err := s.do(ctx, req) resp, err := s.do(ctx, req)
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// something in the middle between client and datasource might be closing // something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed. // the connection. So we do a one more attempt in hope request will succeed.
req = s.newQueryRequest(query, ts) req, _ = s.newQueryRequest(query, ts)
resp, err = s.do(ctx, req) resp, err = s.do(ctx, req)
} }
if err != nil { if err != nil {
@ -173,12 +176,15 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim
if end.IsZero() { if end.IsZero() {
return res, fmt.Errorf("end param is missing") return res, fmt.Errorf("end param is missing")
} }
req := s.newQueryRangeRequest(query, start, end) req, err := s.newQueryRangeRequest(query, start, end)
if err != nil {
return Result{}, err
}
resp, err := s.do(ctx, req) resp, err := s.do(ctx, req)
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// something in the middle between client and datasource might be closing // something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed. // the connection. So we do a one more attempt in hope request will succeed.
req = s.newQueryRangeRequest(query, start, end) req, _ = s.newQueryRangeRequest(query, start, end)
resp, err = s.do(ctx, req) resp, err = s.do(ctx, req)
} }
if err != nil { if err != nil {
@ -210,14 +216,20 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response,
return resp, nil return resp, nil
} }
func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) *http.Request { func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) (*http.Request, error) {
req := s.newRequest() req, err := s.newRequest()
if err != nil {
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %s", s.datasourceURL, err)
}
s.setPrometheusRangeReqParams(req, query, start, end) s.setPrometheusRangeReqParams(req, query, start, end)
return req return req, nil
} }
func (s *VMStorage) newQueryRequest(query string, ts time.Time) *http.Request { func (s *VMStorage) newQueryRequest(query string, ts time.Time) (*http.Request, error) {
req := s.newRequest() req, err := s.newRequest()
if err != nil {
return nil, fmt.Errorf("cannot create query request to datasource %q: %s", s.datasourceURL, err)
}
switch s.dataSourceType { switch s.dataSourceType {
case "", datasourcePrometheus: case "", datasourcePrometheus:
s.setPrometheusInstantReqParams(req, query, ts) s.setPrometheusInstantReqParams(req, query, ts)
@ -226,20 +238,23 @@ func (s *VMStorage) newQueryRequest(query string, ts time.Time) *http.Request {
default: default:
logger.Panicf("BUG: engine not found: %q", s.dataSourceType) logger.Panicf("BUG: engine not found: %q", s.dataSourceType)
} }
return req return req, nil
} }
func (s *VMStorage) newRequest() *http.Request { func (s *VMStorage) newRequest() (*http.Request, error) {
req, err := http.NewRequest(http.MethodPost, s.datasourceURL, nil) req, err := http.NewRequest(http.MethodPost, s.datasourceURL, nil)
if err != nil { if err != nil {
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err) logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err)
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
if s.authCfg != nil { if s.authCfg != nil {
s.authCfg.SetHeaders(req, true) err = s.authCfg.SetHeaders(req, true)
if err != nil {
return nil, err
}
} }
for _, h := range s.extraHeaders { for _, h := range s.extraHeaders {
req.Header.Set(h.key, h.value) req.Header.Set(h.key, h.value)
} }
return req return req, nil
} }

View file

@ -637,7 +637,10 @@ func TestRequestParams(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
req := tc.vm.newRequest() req, err := tc.vm.newRequest()
if err != nil {
t.Fatal(err)
}
switch tc.vm.dataSourceType { switch tc.vm.dataSourceType {
case "", datasourcePrometheus: case "", datasourcePrometheus:
if tc.queryRange { if tc.queryRange {
@ -732,7 +735,10 @@ func TestHeaders(t *testing.T) {
for _, tt := range testCases { for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
vm := tt.vmFn() vm := tt.vmFn()
req := vm.newQueryRequest("foo", time.Now()) req, err := vm.newQueryRequest("foo", time.Now())
if err != nil {
t.Fatal(err)
}
tt.checkFn(t, req) tt.checkFn(t, req)
}) })
} }

View file

@ -88,7 +88,10 @@ func (am *AlertManager) send(ctx context.Context, alerts []Alert, headers map[st
req = req.WithContext(ctx) req = req.WithContext(ctx)
if am.authCfg != nil { if am.authCfg != nil {
am.authCfg.SetHeaders(req, true) err = am.authCfg.SetHeaders(req, true)
if err != nil {
return err
}
} }
resp, err := am.client.Do(req) resp, err := am.client.Do(req)
if err != nil { if err != nil {

View file

@ -280,7 +280,10 @@ func (c *Client) send(ctx context.Context, data []byte) error {
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil { if c.authCfg != nil {
c.authCfg.SetHeaders(req, true) err = c.authCfg.SetHeaders(req, true)
if err != nil {
return &nonRetriableError{err: err}
}
} }
if !*disablePathAppend { if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write") req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")

View file

@ -40,6 +40,7 @@ The sandbox cluster installation is running under the constant load generated by
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-rule.evalDelay` flag and `eval_delay` attribute for [Groups](https://docs.victoriametrics.com/vmalert.html#groups). The new flag and param can be used to adjust the `time` parameter for rule evaluation requests to match [intentional query delay](https://docs.victoriametrics.com/keyConcepts.html#query-latency) from the datasource. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-rule.evalDelay` flag and `eval_delay` attribute for [Groups](https://docs.victoriametrics.com/vmalert.html#groups). The new flag and param can be used to adjust the `time` parameter for rule evaluation requests to match [intentional query delay](https://docs.victoriametrics.com/keyConcepts.html#query-latency) from the datasource. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): skip job with error logs if there is incorrect syntax under `scrape_configs`, previously will exit. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-deleteAllObjectVersions` command-line flag, which can be used for forcing removal of all object versions in remote object storage. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5121) issue and [these docs](https://docs.victoriametrics.com/vmbackup.html#permanent-deletion-of-objects-in-s3-compatible-storages) for the details. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-deleteAllObjectVersions` command-line flag, which can be used for forcing removal of all object versions in remote object storage. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5121) issue and [these docs](https://docs.victoriametrics.com/vmbackup.html#permanent-deletion-of-objects-in-s3-compatible-storages) for the details.
* FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`. * FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`.
@ -53,6 +54,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix vmalert web UI when running on 32-bit architectures machine. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix vmalert web UI when running on 32-bit architectures machine.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): don't send requests if there is wrong auth config in `datasource`, `remoteWrite`, `remoteRead` and `notifier` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
* BUGFIX: [vmselect](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087) for details. * BUGFIX: [vmselect](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087) for details.
* BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component. * BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component.
* BUGFIX: dashboards/vmalert: apply `desc` sorting in tooltips for vmalert dashboard in order to improve visibility of the outliers on graph. * BUGFIX: dashboards/vmalert: apply `desc` sorting in tooltips for vmalert dashboard in order to improve visibility of the outliers on graph.
@ -60,7 +62,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details. * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): don't send requests if there is wrong auth config in `scrape_configs` and `remoteWrite` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0) ## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0)

View file

@ -14,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/fasthttp"
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
@ -194,9 +193,6 @@ type oauth2ConfigInternal struct {
} }
func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInternal, error) { func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInternal, error) {
if err := o.validate(); err != nil {
return nil, err
}
oi := &oauth2ConfigInternal{ oi := &oauth2ConfigInternal{
cfg: &clientcredentials.Config{ cfg: &clientcredentials.Config{
ClientID: o.ClientID, ClientID: o.ClientID,
@ -280,7 +276,7 @@ type Config struct {
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error) getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
tlsCertDigest string tlsCertDigest string
getAuthHeader func() string getAuthHeader func() (string, error)
authHeaderLock sync.Mutex authHeaderLock sync.Mutex
authHeader string authHeader string
authHeaderDeadline uint64 authHeaderDeadline uint64
@ -325,45 +321,58 @@ func (ac *Config) HeadersNoAuthString() string {
} }
// SetHeaders sets the configured ac headers to req. // SetHeaders sets the configured ac headers to req.
func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) { func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) error {
reqHeaders := req.Header reqHeaders := req.Header
for _, h := range ac.headers { for _, h := range ac.headers {
reqHeaders.Set(h.key, h.value) reqHeaders.Set(h.key, h.value)
} }
if setAuthHeader { if setAuthHeader {
if ah := ac.GetAuthHeader(); ah != "" { ah, err := ac.GetAuthHeader()
if err != nil {
return fmt.Errorf("failed to set request auth header: %w", err)
}
if ah != "" {
reqHeaders.Set("Authorization", ah) reqHeaders.Set("Authorization", ah)
} }
} }
return nil
} }
// SetFasthttpHeaders sets the configured ac headers to req. // SetFasthttpHeaders sets the configured ac headers to req.
func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool) { func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool) error {
reqHeaders := &req.Header reqHeaders := &req.Header
for _, h := range ac.headers { for _, h := range ac.headers {
reqHeaders.Set(h.key, h.value) reqHeaders.Set(h.key, h.value)
} }
if setAuthHeader { if setAuthHeader {
if ah := ac.GetAuthHeader(); ah != "" { ah, err := ac.GetAuthHeader()
if err != nil {
return err
}
if ah != "" {
reqHeaders.Set("Authorization", ah) reqHeaders.Set("Authorization", ah)
} }
} }
return nil
} }
// GetAuthHeader returns optional `Authorization: ...` http header. // GetAuthHeader returns optional `Authorization: ...` http header.
func (ac *Config) GetAuthHeader() string { func (ac *Config) GetAuthHeader() (string, error) {
f := ac.getAuthHeader f := ac.getAuthHeader
if f == nil { if f == nil {
return "" return "", nil
} }
ac.authHeaderLock.Lock() ac.authHeaderLock.Lock()
defer ac.authHeaderLock.Unlock() defer ac.authHeaderLock.Unlock()
if fasttime.UnixTimestamp() > ac.authHeaderDeadline { if fasttime.UnixTimestamp() > ac.authHeaderDeadline {
ac.authHeader = f() var err error
if ac.authHeader, err = f(); err != nil {
return "", err
}
// Cache the authHeader for a second. // Cache the authHeader for a second.
ac.authHeaderDeadline = fasttime.UnixTimestamp() + 1 ac.authHeaderDeadline = fasttime.UnixTimestamp() + 1
} }
return ac.authHeader return ac.authHeader, nil
} }
// String returns human-readable representation for ac. // String returns human-readable representation for ac.
@ -564,7 +573,7 @@ func (opts *Options) NewConfig() (*Config, error) {
type authContext struct { type authContext struct {
// getAuthHeader must return <value> for 'Authorization: <value>' http request header // getAuthHeader must return <value> for 'Authorization: <value>' http request header
getAuthHeader func() string getAuthHeader func() (string, error)
// authDigest must contain the digest for the used authorization // authDigest must contain the digest for the used authorization
// The digest must be changed whenever the original config changes. // The digest must be changed whenever the original config changes.
@ -577,8 +586,8 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
azType = az.Type azType = az.Type
} }
if az.CredentialsFile == "" { if az.CredentialsFile == "" {
actx.getAuthHeader = func() string { actx.getAuthHeader = func() (string, error) {
return azType + " " + az.Credentials.String() return azType + " " + az.Credentials.String(), nil
} }
actx.authDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials) actx.authDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials)
return nil return nil
@ -587,13 +596,12 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
return fmt.Errorf("both `credentials`=%q and `credentials_file`=%q are set", az.Credentials, az.CredentialsFile) return fmt.Errorf("both `credentials`=%q and `credentials_file`=%q are set", az.Credentials, az.CredentialsFile)
} }
filePath := fs.GetFilepath(baseDir, az.CredentialsFile) filePath := fs.GetFilepath(baseDir, az.CredentialsFile)
actx.getAuthHeader = func() string { actx.getAuthHeader = func() (string, error) {
token, err := readPasswordFromFile(filePath) token, err := readPasswordFromFile(filePath)
if err != nil { if err != nil {
logger.Errorf("cannot read credentials from `credentials_file`=%q: %s", az.CredentialsFile, err) return "", fmt.Errorf("cannot read credentials from `credentials_file`=%q: %s", az.CredentialsFile, err)
return ""
} }
return azType + " " + token return azType + " " + token, nil
} }
actx.authDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath) actx.authDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath)
return nil return nil
@ -604,11 +612,11 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
return fmt.Errorf("missing `username` in `basic_auth` section") return fmt.Errorf("missing `username` in `basic_auth` section")
} }
if ba.PasswordFile == "" { if ba.PasswordFile == "" {
actx.getAuthHeader = func() string { actx.getAuthHeader = func() (string, error) {
// See https://en.wikipedia.org/wiki/Basic_access_authentication // See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password.String() token := ba.Username + ":" + ba.Password.String()
token64 := base64.StdEncoding.EncodeToString([]byte(token)) token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64 return "Basic " + token64, nil
} }
actx.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password) actx.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil return nil
@ -617,16 +625,15 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
return fmt.Errorf("both `password`=%q and `password_file`=%q are set in `basic_auth` section", ba.Password, ba.PasswordFile) return fmt.Errorf("both `password`=%q and `password_file`=%q are set in `basic_auth` section", ba.Password, ba.PasswordFile)
} }
filePath := fs.GetFilepath(baseDir, ba.PasswordFile) filePath := fs.GetFilepath(baseDir, ba.PasswordFile)
actx.getAuthHeader = func() string { actx.getAuthHeader = func() (string, error) {
password, err := readPasswordFromFile(filePath) password, err := readPasswordFromFile(filePath)
if err != nil { if err != nil {
logger.Errorf("cannot read password from `password_file`=%q set in `basic_auth` section: %s", ba.PasswordFile, err) return "", fmt.Errorf("cannot read password from `password_file`=%q set in `basic_auth` section: %s", ba.PasswordFile, err)
return ""
} }
// See https://en.wikipedia.org/wiki/Basic_access_authentication // See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + password token := ba.Username + ":" + password
token64 := base64.StdEncoding.EncodeToString([]byte(token)) token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64 return "Basic " + token64, nil
} }
actx.authDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath) actx.authDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath)
return nil return nil
@ -634,43 +641,44 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile string) error { func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile string) error {
filePath := fs.GetFilepath(baseDir, bearerTokenFile) filePath := fs.GetFilepath(baseDir, bearerTokenFile)
actx.getAuthHeader = func() string { actx.getAuthHeader = func() (string, error) {
token, err := readPasswordFromFile(filePath) token, err := readPasswordFromFile(filePath)
if err != nil { if err != nil {
logger.Errorf("cannot read bearer token from `bearer_token_file`=%q: %s", bearerTokenFile, err) return "", fmt.Errorf("cannot read bearer token from `bearer_token_file`=%q: %s", bearerTokenFile, err)
return ""
} }
return "Bearer " + token return "Bearer " + token, nil
} }
actx.authDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath) actx.authDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath)
return nil return nil
} }
func (actx *authContext) initFromBearerToken(bearerToken string) error { func (actx *authContext) initFromBearerToken(bearerToken string) error {
actx.getAuthHeader = func() string { actx.getAuthHeader = func() (string, error) {
return "Bearer " + bearerToken return "Bearer " + bearerToken, nil
} }
actx.authDigest = fmt.Sprintf("bearer(token=%q)", bearerToken) actx.authDigest = fmt.Sprintf("bearer(token=%q)", bearerToken)
return nil return nil
} }
func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) error { func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) error {
oi, err := newOAuth2ConfigInternal(baseDir, o) if err := o.validate(); err != nil {
if err != nil {
return err return err
} }
actx.getAuthHeader = func() string {
actx.getAuthHeader = func() (string, error) {
oi, err := newOAuth2ConfigInternal(baseDir, o)
if err != nil {
return "", err
}
ts, err := oi.getTokenSource() ts, err := oi.getTokenSource()
if err != nil { if err != nil {
logger.Errorf("cannot get OAuth2 tokenSource: %s", err) return "", fmt.Errorf("cannot get OAuth2 tokenSource: %s", err)
return ""
} }
t, err := ts.Token() t, err := ts.Token()
if err != nil { if err != nil {
logger.Errorf("cannot get OAuth2 token: %s", err) return "", fmt.Errorf("cannot get OAuth2 token: %s", err)
return ""
} }
return t.Type() + " " + t.AccessToken return t.Type() + " " + t.AccessToken, nil
} }
actx.authDigest = fmt.Sprintf("oauth2(%s)", o.String()) actx.authDigest = fmt.Sprintf("oauth2(%s)", o.String())
return nil return nil
@ -679,6 +687,7 @@ func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) e
type tlsContext struct { type tlsContext struct {
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error) getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
tlsCertDigest string tlsCertDigest string
rootCA *x509.CertPool rootCA *x509.CertPool
serverName string serverName string
insecureSkipVerify bool insecureSkipVerify bool

View file

@ -13,6 +13,7 @@ func TestNewConfig(t *testing.T) {
name string name string
opts Options opts Options
wantErr bool wantErr bool
wantErrWhenSetHeader bool
expectHeader string expectHeader string
}{ }{
{ {
@ -49,6 +50,21 @@ func TestNewConfig(t *testing.T) {
}, },
wantErr: true, wantErr: true,
}, },
{
name: "OAuth2 with wrong tls",
opts: Options{
OAuth2: &OAuth2Config{
ClientID: "some-id",
ClientSecret: NewSecret("some-secret"),
TokenURL: "http://localhost:8511",
TLSConfig: &TLSConfig{
InsecureSkipVerify: true,
CAFile: "non-existing-file",
},
},
},
wantErrWhenSetHeader: true,
},
{ {
name: "basic Auth config", name: "basic Auth config",
opts: Options{ opts: Options{
@ -69,6 +85,16 @@ func TestNewConfig(t *testing.T) {
}, },
expectHeader: "Basic dXNlcjpzZWNyZXQtY29udGVudA==", expectHeader: "Basic dXNlcjpzZWNyZXQtY29udGVudA==",
}, },
{
name: "basic Auth config with non-existing file",
opts: Options{
BasicAuth: &BasicAuthConfig{
Username: "user",
PasswordFile: "testdata/non-existing-file",
},
},
wantErrWhenSetHeader: true,
},
{ {
name: "want Authorization", name: "want Authorization",
opts: Options{ opts: Options{
@ -96,6 +122,17 @@ func TestNewConfig(t *testing.T) {
}, },
expectHeader: "Bearer some-token", expectHeader: "Bearer some-token",
}, },
{
name: "tls with non-existing file",
opts: Options{
BearerToken: "some-token",
TLSConfig: &TLSConfig{
InsecureSkipVerify: true,
CAFile: "non-existing-file",
},
},
wantErr: true,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
@ -104,7 +141,6 @@ func TestNewConfig(t *testing.T) {
r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"access_token":"some-token","token_type": "Bearer"}`)) w.Write([]byte(`{"access_token":"some-token","token_type": "Bearer"}`))
}) })
mock := httptest.NewServer(r) mock := httptest.NewServer(r)
tt.opts.OAuth2.TokenURL = mock.URL tt.opts.OAuth2.TokenURL = mock.URL
@ -119,13 +155,19 @@ func TestNewConfig(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error in http.NewRequest: %s", err) t.Fatalf("unexpected error in http.NewRequest: %s", err)
} }
got.SetHeaders(req, true) err = got.SetHeaders(req, true)
if (err != nil) != tt.wantErrWhenSetHeader {
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader)
}
ah := req.Header.Get("Authorization") ah := req.Header.Get("Authorization")
if ah != tt.expectHeader { if ah != tt.expectHeader {
t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, tt.expectHeader) t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, tt.expectHeader)
} }
var fhreq fasthttp.Request var fhreq fasthttp.Request
got.SetFasthttpHeaders(&fhreq, true) err = got.SetFasthttpHeaders(&fhreq, true)
if (err != nil) != tt.wantErrWhenSetHeader {
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader)
}
ahb := fhreq.Header.Peek("Authorization") ahb := fhreq.Header.Peek("Authorization")
if string(ahb) != tt.expectHeader { if string(ahb) != tt.expectHeader {
t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, tt.expectHeader) t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, tt.expectHeader)
@ -191,7 +233,7 @@ func TestConfigHeaders(t *testing.T) {
if result != resultExpected { if result != resultExpected {
t.Fatalf("unexpected result from HeadersNoAuthString; got\n%s\nwant\n%s", result, resultExpected) t.Fatalf("unexpected result from HeadersNoAuthString; got\n%s\nwant\n%s", result, resultExpected)
} }
c.SetHeaders(req, false) _ = c.SetHeaders(req, false)
for _, h := range headersParsed { for _, h := range headersParsed {
v := req.Header.Get(h.key) v := req.Header.Get(h.key)
if v != h.value { if v != h.value {
@ -199,7 +241,7 @@ func TestConfigHeaders(t *testing.T) {
} }
} }
var fhreq fasthttp.Request var fhreq fasthttp.Request
c.SetFasthttpHeaders(&fhreq, false) _ = c.SetFasthttpHeaders(&fhreq, false)
for _, h := range headersParsed { for _, h := range headersParsed {
v := fhreq.Header.Peek(h.key) v := fhreq.Header.Peek(h.key)
if string(v) != h.value { if string(v) != h.value {

View file

@ -49,10 +49,10 @@ type client struct {
scrapeTimeoutSecondsStr string scrapeTimeoutSecondsStr string
hostPort string hostPort string
requestURI string requestURI string
setHeaders func(req *http.Request) setHeaders func(req *http.Request) error
setProxyHeaders func(req *http.Request) setProxyHeaders func(req *http.Request) error
setFasthttpHeaders func(req *fasthttp.Request) setFasthttpHeaders func(req *fasthttp.Request) error
setFasthttpProxyHeaders func(req *fasthttp.Request) setFasthttpProxyHeaders func(req *fasthttp.Request) error
denyRedirects bool denyRedirects bool
disableCompression bool disableCompression bool
disableKeepAlive bool disableKeepAlive bool
@ -81,7 +81,7 @@ func concatTwoStrings(x, y string) string {
const scrapeUserAgent = "vm_promscrape" const scrapeUserAgent = "vm_promscrape"
func newClient(ctx context.Context, sw *ScrapeWork) *client { func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
var u fasthttp.URI var u fasthttp.URI
u.Update(sw.ScrapeURL) u.Update(sw.ScrapeURL)
hostPort := string(u.Host()) hostPort := string(u.Host())
@ -92,8 +92,8 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
if isTLS { if isTLS {
tlsCfg = sw.AuthConfig.NewTLSConfig() tlsCfg = sw.AuthConfig.NewTLSConfig()
} }
setProxyHeaders := func(req *http.Request) {} setProxyHeaders := func(req *http.Request) error { return nil }
setFasthttpProxyHeaders := func(req *fasthttp.Request) {} setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil }
proxyURL := sw.ProxyURL proxyURL := sw.ProxyURL
if !isTLS && proxyURL.IsHTTPOrHTTPS() { if !isTLS && proxyURL.IsHTTPOrHTTPS() {
// Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets // Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets
@ -107,11 +107,11 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
tlsCfg = sw.ProxyAuthConfig.NewTLSConfig() tlsCfg = sw.ProxyAuthConfig.NewTLSConfig()
} }
proxyURLOrig := proxyURL proxyURLOrig := proxyURL
setProxyHeaders = func(req *http.Request) { setProxyHeaders = func(req *http.Request) error {
proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req) return proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req)
} }
setFasthttpProxyHeaders = func(req *fasthttp.Request) { setFasthttpProxyHeaders = func(req *fasthttp.Request) error {
proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req) return proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req)
} }
proxyURL = &proxy.URL{} proxyURL = &proxy.URL{}
} }
@ -119,7 +119,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
dialAddr = addMissingPort(dialAddr, isTLS) dialAddr = addMissingPort(dialAddr, isTLS)
dialFunc, err := newStatDialFunc(proxyURL, sw.ProxyAuthConfig) dialFunc, err := newStatDialFunc(proxyURL, sw.ProxyAuthConfig)
if err != nil { if err != nil {
logger.Fatalf("cannot create dial func: %s", err) return nil, fmt.Errorf("cannot create dial func: %s", err)
} }
hc := &fasthttp.HostClient{ hc := &fasthttp.HostClient{
Addr: dialAddr, Addr: dialAddr,
@ -168,14 +168,14 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client {
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()), scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
hostPort: hostPort, hostPort: hostPort,
requestURI: requestURI, requestURI: requestURI,
setHeaders: func(req *http.Request) { sw.AuthConfig.SetHeaders(req, true) }, setHeaders: func(req *http.Request) error { return sw.AuthConfig.SetHeaders(req, true) },
setProxyHeaders: setProxyHeaders, setProxyHeaders: setProxyHeaders,
setFasthttpHeaders: func(req *fasthttp.Request) { sw.AuthConfig.SetFasthttpHeaders(req, true) }, setFasthttpHeaders: func(req *fasthttp.Request) error { return sw.AuthConfig.SetFasthttpHeaders(req, true) },
setFasthttpProxyHeaders: setFasthttpProxyHeaders, setFasthttpProxyHeaders: setFasthttpProxyHeaders,
denyRedirects: sw.DenyRedirects, denyRedirects: sw.DenyRedirects,
disableCompression: sw.DisableCompression, disableCompression: sw.DisableCompression,
disableKeepAlive: sw.DisableKeepAlive, disableKeepAlive: sw.DisableKeepAlive,
} }, nil
} }
func (c *client) GetStreamReader() (*streamReader, error) { func (c *client) GetStreamReader() (*streamReader, error) {
@ -196,8 +196,16 @@ func (c *client) GetStreamReader() (*streamReader, error) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
req.Header.Set("User-Agent", scrapeUserAgent) req.Header.Set("User-Agent", scrapeUserAgent)
c.setHeaders(req) err = c.setHeaders(req)
c.setProxyHeaders(req) if err != nil {
cancel()
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
err = c.setProxyHeaders(req)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
scrapeRequests.Inc() scrapeRequests.Inc()
resp, err := c.sc.Do(req) resp, err := c.sc.Do(req)
if err != nil { if err != nil {
@ -244,8 +252,14 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx. // Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
c.setFasthttpHeaders(req) err := c.setFasthttpHeaders(req)
c.setFasthttpProxyHeaders(req) if err != nil {
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
err = c.setFasthttpProxyHeaders(req)
if err != nil {
return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err)
}
if !*disableCompression && !c.disableCompression { if !*disableCompression && !c.disableCompression {
req.Header.Set("Accept-Encoding", "gzip") req.Header.Set("Accept-Encoding", "gzip")
} }
@ -263,7 +277,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
ctx, cancel := context.WithDeadline(c.ctx, deadline) ctx, cancel := context.WithDeadline(c.ctx, deadline)
defer cancel() defer cancel()
err := doRequestWithPossibleRetry(ctx, c.hc, req, resp) err = doRequestWithPossibleRetry(ctx, c.hc, req, resp)
statusCode := resp.StatusCode() statusCode := resp.StatusCode()
redirectsCount := 0 redirectsCount := 0
for err == nil && isStatusRedirect(statusCode) { for err == nil && isStatusRedirect(statusCode) {

View file

@ -482,6 +482,7 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
} }
// Initialize cfg.ScrapeConfigs // Initialize cfg.ScrapeConfigs
var validScrapeConfigs []*ScrapeConfig
for i, sc := range cfg.ScrapeConfigs { for i, sc := range cfg.ScrapeConfigs {
// Make a copy of sc in order to remove references to `data` memory. // Make a copy of sc in order to remove references to `data` memory.
// This should prevent from memory leaks on config reload. // This should prevent from memory leaks on config reload.
@ -490,10 +491,14 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global) swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse `scrape_config`: %w", err) // print error and skip invalid scrape config
logger.Errorf("cannot parse `scrape_config` for job %q, skip it: %w", sc.JobName, err)
continue
} }
sc.swc = swc sc.swc = swc
validScrapeConfigs = append(validScrapeConfigs, sc)
} }
cfg.ScrapeConfigs = validScrapeConfigs
return dataNew, nil return dataNew, nil
} }

View file

@ -109,7 +109,6 @@ scrape_configs:
proxy_headers: proxy_headers:
- 'My-Auth-Header: top-secret' - 'My-Auth-Header: top-secret'
`) `)
} }
func TestNeedSkipScrapeWork(t *testing.T) { func TestNeedSkipScrapeWork(t *testing.T) {
@ -446,27 +445,27 @@ func getStaticScrapeWork(data []byte, path string) ([]*ScrapeWork, error) {
return cfg.getStaticScrapeWork(), nil return cfg.getStaticScrapeWork(), nil
} }
func TestGetStaticScrapeWorkFailure(t *testing.T) { func TestGetStaticScrapeWork(t *testing.T) {
f := func(data string) { f := func(data string, wantErr bool, validConfigNum int) {
t.Helper() t.Helper()
sws, err := getStaticScrapeWork([]byte(data), "non-existing-file") sws, err := getStaticScrapeWork([]byte(data), "non-existing-file")
if err == nil { if err != nil != wantErr {
t.Fatalf("expecting non-nil error") t.Fatalf("expect err %t", wantErr)
} }
if sws != nil { if !wantErr && len(sws) != validConfigNum {
t.Fatalf("expecting nil sws") t.Fatalf("got expected config num, expect %d", validConfigNum)
} }
} }
// incorrect yaml // incorrect yaml
f(`foo bar baz`) f(`foo bar baz`, true, 0)
// Missing job_name // Missing job_name
f(` f(`
scrape_configs: scrape_configs:
- static_configs: - static_configs:
- targets: ["foo"] - targets: ["foo"]
`) `, false, 0)
// Duplicate job_name // Duplicate job_name
f(` f(`
@ -477,7 +476,7 @@ scrape_configs:
- job_name: foo - job_name: foo
static_configs: static_configs:
targets: ["bar"] targets: ["bar"]
`) `, true, 1)
// Invalid scheme // Invalid scheme
f(` f(`
@ -486,7 +485,7 @@ scrape_configs:
scheme: asdf scheme: asdf
static_configs: static_configs:
- targets: ["foo"] - targets: ["foo"]
`) `, false, 0)
// Missing username in `basic_auth` // Missing username in `basic_auth`
f(` f(`
@ -496,7 +495,7 @@ scrape_configs:
password: sss password: sss
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, false, 0)
// Both password and password_file set in `basic_auth` // Both password and password_file set in `basic_auth`
f(` f(`
@ -508,7 +507,7 @@ scrape_configs:
password_file: sdfdf password_file: sdfdf
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, false, 0)
// Invalid password_file set in `basic_auth` // Invalid password_file set in `basic_auth`
f(` f(`
@ -519,7 +518,7 @@ scrape_configs:
password_file: ['foobar'] password_file: ['foobar']
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, true, 0)
// Both `bearer_token` and `bearer_token_file` are set // Both `bearer_token` and `bearer_token_file` are set
f(` f(`
@ -529,7 +528,7 @@ scrape_configs:
bearer_token_file: bar bearer_token_file: bar
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, false, 0)
// Both `basic_auth` and `bearer_token` are set // Both `basic_auth` and `bearer_token` are set
f(` f(`
@ -541,7 +540,7 @@ scrape_configs:
password: bar password: bar
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, false, 0)
// Both `authorization` and `basic_auth` are set // Both `authorization` and `basic_auth` are set
f(` f(`
@ -553,7 +552,7 @@ scrape_configs:
username: foobar username: foobar
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, false, 0)
// Both `authorization` and `bearer_token` are set // Both `authorization` and `bearer_token` are set
f(` f(`
@ -564,7 +563,7 @@ scrape_configs:
bearer_token: foo bearer_token: foo
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, false, 0)
// Invalid `bearer_token_file` // Invalid `bearer_token_file`
f(` f(`
@ -573,7 +572,7 @@ scrape_configs:
bearer_token_file: [foobar] bearer_token_file: [foobar]
static_configs: static_configs:
- targets: ["a"] - targets: ["a"]
`) `, true, 0)
// non-existing ca_file // non-existing ca_file
f(` f(`
@ -583,7 +582,7 @@ scrape_configs:
ca_file: non/extising/file ca_file: non/extising/file
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// invalid ca_file // invalid ca_file
f(` f(`
@ -593,7 +592,7 @@ scrape_configs:
ca_file: testdata/prometheus.yml ca_file: testdata/prometheus.yml
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// non-existing cert_file // non-existing cert_file
f(` f(`
@ -603,7 +602,7 @@ scrape_configs:
cert_file: non/extising/file cert_file: non/extising/file
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// non-existing key_file // non-existing key_file
f(` f(`
@ -613,7 +612,7 @@ scrape_configs:
key_file: non/extising/file key_file: non/extising/file
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Invalid regex in relabel_configs // Invalid regex in relabel_configs
f(` f(`
@ -625,7 +624,7 @@ scrape_configs:
target_label: bar target_label: bar
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Missing target_label for action=replace in relabel_configs // Missing target_label for action=replace in relabel_configs
f(` f(`
@ -636,7 +635,7 @@ scrape_configs:
source_labels: [foo] source_labels: [foo]
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Missing source_labels for action=keep in relabel_configs // Missing source_labels for action=keep in relabel_configs
f(` f(`
@ -646,7 +645,7 @@ scrape_configs:
- action: keep - action: keep
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Missing source_labels for action=drop in relabel_configs // Missing source_labels for action=drop in relabel_configs
f(` f(`
@ -656,7 +655,7 @@ scrape_configs:
- action: drop - action: drop
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Missing source_labels for action=hashmod in relabel_configs // Missing source_labels for action=hashmod in relabel_configs
f(` f(`
@ -668,7 +667,7 @@ scrape_configs:
modulus: 123 modulus: 123
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Missing target for action=hashmod in relabel_configs // Missing target for action=hashmod in relabel_configs
f(` f(`
@ -680,7 +679,7 @@ scrape_configs:
modulus: 123 modulus: 123
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Missing modulus for action=hashmod in relabel_configs // Missing modulus for action=hashmod in relabel_configs
f(` f(`
@ -692,7 +691,7 @@ scrape_configs:
target_label: bar target_label: bar
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Invalid action in relabel_configs // Invalid action in relabel_configs
f(` f(`
@ -702,7 +701,7 @@ scrape_configs:
- action: foobar - action: foobar
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, false, 0)
// Invalid scrape_config_files contents // Invalid scrape_config_files contents
f(` f(`
@ -710,7 +709,7 @@ scrape_config_files:
- job_name: aa - job_name: aa
static_configs: static_configs:
- targets: ["s"] - targets: ["s"]
`) `, true, 0)
} }
func resetNonEssentialFields(sws []*ScrapeWork) { func resetNonEssentialFields(sws []*ScrapeWork) {

View file

@ -211,7 +211,7 @@ type groupWatcher struct {
selectors []Selector selectors []Selector
attachNodeMetadata bool attachNodeMetadata bool
setHeaders func(req *http.Request) setHeaders func(req *http.Request) error
client *http.Client client *http.Client
mu sync.Mutex mu sync.Mutex
@ -239,7 +239,7 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
selectors: selectors, selectors: selectors,
attachNodeMetadata: attachNodeMetadata, attachNodeMetadata: attachNodeMetadata,
setHeaders: func(req *http.Request) { ac.SetHeaders(req, true) }, setHeaders: func(req *http.Request) error { return ac.SetHeaders(req, true) },
client: client, client: client,
m: make(map[string]*urlWatcher), m: make(map[string]*urlWatcher),
} }
@ -420,7 +420,10 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http
if err != nil { if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err) logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
} }
gw.setHeaders(req) err = gw.setHeaders(req)
if err != nil {
return nil, err
}
resp, err := gw.client.Do(req) resp, err := gw.client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -68,8 +68,8 @@ type Client struct {
apiServer string apiServer string
setHTTPHeaders func(req *http.Request) setHTTPHeaders func(req *http.Request) error
setHTTPProxyHeaders func(req *http.Request) setHTTPProxyHeaders func(req *http.Request) error
clientCtx context.Context clientCtx context.Context
clientCancel context.CancelFunc clientCancel context.CancelFunc
@ -140,10 +140,10 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
}, },
} }
setHTTPHeaders := func(req *http.Request) {} setHTTPHeaders := func(req *http.Request) error { return nil }
if ac != nil { if ac != nil {
setHTTPHeaders = func(req *http.Request) { setHTTPHeaders = func(req *http.Request) error {
ac.SetHeaders(req, true) return ac.SetHeaders(req, true)
} }
} }
if httpCfg.FollowRedirects != nil && !*httpCfg.FollowRedirects { if httpCfg.FollowRedirects != nil && !*httpCfg.FollowRedirects {
@ -153,10 +153,10 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
client.CheckRedirect = checkRedirect client.CheckRedirect = checkRedirect
blockingClient.CheckRedirect = checkRedirect blockingClient.CheckRedirect = checkRedirect
} }
setHTTPProxyHeaders := func(req *http.Request) {} setHTTPProxyHeaders := func(req *http.Request) error { return nil }
if proxyAC != nil { if proxyAC != nil {
setHTTPProxyHeaders = func(req *http.Request) { setHTTPProxyHeaders = func(req *http.Request) error {
proxyURL.SetHeaders(proxyAC, req) return proxyURL.SetHeaders(proxyAC, req)
} }
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -247,8 +247,14 @@ func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, clien
return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err) return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err)
} }
c.setHTTPHeaders(req) err = c.setHTTPHeaders(req)
c.setHTTPProxyHeaders(req) if err != nil {
return nil, fmt.Errorf("cannot set request http header for %q: %w", requestURL, err)
}
err = c.setHTTPProxyHeaders(req)
if err != nil {
return nil, fmt.Errorf("cannot set request http proxy header for %q: %w", requestURL, err)
}
if modifyRequest != nil { if modifyRequest != nil {
modifyRequest(req) modifyRequest(req)
} }

View file

@ -405,7 +405,12 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
// Start new scrapers only after the deleted scrapers are stopped. // Start new scrapers only after the deleted scrapers are stopped.
for _, sw := range swsToStart { for _, sw := range swsToStart {
sc := newScraper(sw, sg.name, sg.pushData) sc, err := newScraper(sw, sg.name, sg.pushData)
if err != nil {
// print error and skip invalid scraper config
logger.Errorf("cannot create scraper to %q in job %q, will skip it: %w", sw.ScrapeURL, sg.name, err)
continue
}
sg.activeScrapers.Inc() sg.activeScrapers.Inc()
sg.scrapersStarted.Inc() sg.scrapersStarted.Inc()
sg.wg.Add(1) sg.wg.Add(1)
@ -441,18 +446,21 @@ type scraper struct {
stoppedCh chan struct{} stoppedCh chan struct{}
} }
func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper { func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) (*scraper, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sc := &scraper{ sc := &scraper{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
stoppedCh: make(chan struct{}), stoppedCh: make(chan struct{}),
} }
c := newClient(ctx, sw) c, err := newClient(ctx, sw)
if err != nil {
return &scraper{}, err
}
sc.sw.Config = sw sc.sw.Config = sw
sc.sw.ScrapeGroup = group sc.sw.ScrapeGroup = group
sc.sw.ReadData = c.ReadData sc.sw.ReadData = c.ReadData
sc.sw.GetStreamReader = c.GetStreamReader sc.sw.GetStreamReader = c.GetStreamReader
sc.sw.PushData = pushData sc.sw.PushData = pushData
return sc return sc, nil
} }

View file

@ -454,7 +454,6 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b
defer func() { defer func() {
<-processScrapedDataConcurrencyLimitCh <-processScrapedDataConcurrencyLimitCh
}() }()
endTimestamp := time.Now().UnixNano() / 1e6 endTimestamp := time.Now().UnixNano() / 1e6
duration := float64(endTimestamp-realTimestamp) / 1e3 duration := float64(endTimestamp-realTimestamp) / 1e3
scrapeDuration.Update(duration) scrapeDuration.Update(duration)

View file

@ -73,38 +73,48 @@ func (u *URL) String() string {
} }
// SetHeaders sets headers to req according to u and ac configs. // SetHeaders sets headers to req according to u and ac configs.
func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) { func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error {
ah := u.getAuthHeader(ac) ah, err := u.getAuthHeader(ac)
if err != nil {
return err
}
if ah != "" { if ah != "" {
req.Header.Set("Proxy-Authorization", ah) req.Header.Set("Proxy-Authorization", ah)
} }
ac.SetHeaders(req, false) return ac.SetHeaders(req, false)
} }
// SetFasthttpHeaders sets headers to req according to u and ac configs. // SetFasthttpHeaders sets headers to req according to u and ac configs.
func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) { func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) error {
ah := u.getAuthHeader(ac) ah, err := u.getAuthHeader(ac)
if err != nil {
return err
}
if ah != "" { if ah != "" {
req.Header.Set("Proxy-Authorization", ah) req.Header.Set("Proxy-Authorization", ah)
} }
ac.SetFasthttpHeaders(req, false) return ac.SetFasthttpHeaders(req, false)
} }
// getAuthHeader returns Proxy-Authorization auth header for the given u and ac. // getAuthHeader returns Proxy-Authorization auth header for the given u and ac.
func (u *URL) getAuthHeader(ac *promauth.Config) string { func (u *URL) getAuthHeader(ac *promauth.Config) (string, error) {
authHeader := "" authHeader := ""
if ac != nil { if ac != nil {
authHeader = ac.GetAuthHeader() var err error
authHeader, err = ac.GetAuthHeader()
if err != nil {
return "", err
}
} }
if u == nil || u.URL == nil { if u == nil || u.URL == nil {
return authHeader return authHeader, nil
} }
pu := u.URL pu := u.URL
if pu.User != nil && len(pu.User.Username()) > 0 { if pu.User != nil && len(pu.User.Username()) > 0 {
userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String())) userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String()))
authHeader = "Basic " + userPasswordEncoded authHeader = "Basic " + userPasswordEncoded
} }
return authHeader return authHeader, nil
} }
// MarshalYAML implements yaml.Marshaler interface. // MarshalYAML implements yaml.Marshaler interface.
@ -161,7 +171,10 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) {
if isTLS { if isTLS {
proxyConn = tls.Client(proxyConn, tlsCfg) proxyConn = tls.Client(proxyConn, tlsCfg)
} }
authHeader := u.getAuthHeader(ac) authHeader, err := u.getAuthHeader(ac)
if err != nil {
return nil, fmt.Errorf("cannot get auth header: %s", err)
}
if authHeader != "" { if authHeader != "" {
authHeader = "Proxy-Authorization: " + authHeader + "\r\n" authHeader = "Proxy-Authorization: " + authHeader + "\r\n"
authHeader += ac.HeadersNoAuthString() authHeader += ac.HeadersNoAuthString()