lib/promauth: follow-up for e16d3f5639

- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup
  don't prevent from processing the corresponding scrape targets after the file becomes correct,
  without the need to restart vmagent.
  Previously scrape targets with invalid TLS CA file or TLS client certificate files
  were permanently dropped after the first attempt to initialize them, and they didn't
  appear until the next vmagent reload or the next change in other places of the loaded scrape configs.

- Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent.
  Previously the old TLS CA was used until vmagent restart.

- Properly handle errors during http request creation for the second attempt to send data to remote system
  at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing,
  since the returned request is nil on error.

- Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent.
  Previously it could miss details on the source of the request.

- Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header
  of every http request issued by vmagent during service discovery or target scraping.
  Re-use the HTTP client instead until the corresponding scrape config changes.

- Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached,
  e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server
  when auth header cannot be obtained because of temporary error.

- Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config.
  Cache the loaded certificate and the error for one second. This should significantly reduce CPU load
  when scraping big number of targets with the same tls_config.

- Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`.

- Improve test coverage at lib/promauth

- Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later.
  Previously vmagent was exitting in this case.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
This commit is contained in:
Aliaksandr Valialkin 2023-10-25 23:19:33 +02:00
parent c22e3e7b1d
commit d5a599badc
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
21 changed files with 1286 additions and 871 deletions

View file

@ -106,12 +106,15 @@ type client struct {
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client { func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client {
authCfg, err := getAuthConfig(argIdx) authCfg, err := getAuthConfig(argIdx)
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot initialize auth config for remoteWrite.url=%q: %s", remoteWriteURL, err) logger.Fatalf("cannot initialize auth config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
tlsCfg, err := authCfg.NewTLSConfig()
if err != nil {
logger.Fatalf("cannot initialize tls config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
} }
tlsCfg := authCfg.NewTLSConfig()
awsCfg, err := getAWSAPIConfig(argIdx) awsCfg, err := getAWSAPIConfig(argIdx)
if err != nil { if err != nil {
logger.Fatalf("FATAL: cannot initialize AWS Config for remoteWrite.url=%q: %s", remoteWriteURL, err) logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
} }
tr := &http.Transport{ tr := &http.Transport{
DialContext: statDial, DialContext: statDial,
@ -328,15 +331,25 @@ func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
return nil, err return nil, err
} }
resp, err := c.hc.Do(req) resp, err := c.hc.Do(req)
if err != nil && errors.Is(err, io.EOF) { if err == nil {
// it is likely connection became stale. return resp, nil
// So we do one more attempt in hope request will succeed.
// If not, the error should be handled by the caller as usual.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
req, _ = c.newRequest(url, body)
resp, err = c.hc.Do(req)
} }
return resp, err if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}
// It is likely connection became stale or timed out during the first request.
// Make another attempt in hope request will succeed.
// If not, the error should be handled by the caller as usual.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
req, err = c.newRequest(url, body)
if err != nil {
return nil, fmt.Errorf("second attempt: %w", err)
}
resp, err = c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("second attempt: %w", err)
}
return resp, nil
} }
func (c *client) newRequest(url string, body []byte) (*http.Request, error) { func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
@ -362,8 +375,7 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
if c.awsCfg != nil { if c.awsCfg != nil {
sigv4Hash := awsapi.HashHex(body) sigv4Hash := awsapi.HashHex(body)
if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil { if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil {
// there is no need in retry, request will be rejected by client.Do and retried by code below return nil, fmt.Errorf("cannot sign remoteWrite request with AWS sigv4: %w", err)
logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err)
} }
} }
return req, nil return req, nil

View file

@ -142,24 +142,30 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Resu
return Result{}, nil, err return Result{}, nil, err
} }
resp, err := s.do(ctx, req) resp, err := s.do(ctx, req)
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, _ = s.newQueryRequest(query, ts)
resp, err = s.do(ctx, req)
}
if err != nil { if err != nil {
return Result{}, req, err if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
// Return unexpected error to the caller.
return Result{}, nil, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = s.newQueryRequest(query, ts)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
resp, err = s.do(ctx, req)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
} }
defer func() {
_ = resp.Body.Close()
}()
// Process the received response.
parseFn := parsePrometheusResponse parseFn := parsePrometheusResponse
if s.dataSourceType != datasourcePrometheus { if s.dataSourceType != datasourcePrometheus {
parseFn = parseGraphiteResponse parseFn = parseGraphiteResponse
} }
result, err := parseFn(req, resp) result, err := parseFn(req, resp)
_ = resp.Body.Close()
return result, req, err return result, req, err
} }
@ -177,23 +183,31 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim
return res, fmt.Errorf("end param is missing") return res, fmt.Errorf("end param is missing")
} }
req, err := s.newQueryRangeRequest(query, start, end) req, err := s.newQueryRangeRequest(query, start, end)
if err != nil {
return Result{}, err
}
resp, err := s.do(ctx, req)
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, _ = s.newQueryRangeRequest(query, start, end)
resp, err = s.do(ctx, req)
}
if err != nil { if err != nil {
return res, err return res, err
} }
defer func() { resp, err := s.do(ctx, req)
_ = resp.Body.Close() if err != nil {
}() if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return parsePrometheusResponse(req, resp) // Return unexpected error to the caller.
return res, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = s.newQueryRangeRequest(query, start, end)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
resp, err = s.do(ctx, req)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
}
// Process the received response.
res, err = parsePrometheusResponse(req, resp)
_ = resp.Body.Close()
return res, err
} }
func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) { func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) {

View file

@ -282,7 +282,9 @@ func (c *Client) send(ctx context.Context, data []byte) error {
if c.authCfg != nil { if c.authCfg != nil {
err = c.authCfg.SetHeaders(req, true) err = c.authCfg.SetHeaders(req, true)
if err != nil { if err != nil {
return &nonRetriableError{err: err} return &nonRetriableError{
err: err,
}
} }
} }
if !*disablePathAppend { if !*disablePathAppend {
@ -306,8 +308,9 @@ func (c *Client) send(ctx context.Context, data []byte) error {
case 4: case 4:
if resp.StatusCode != http.StatusTooManyRequests { if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429 // MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q", return &nonRetriableError{
resp.StatusCode, req.URL.Redacted(), body)} err: fmt.Errorf("unexpected response code %d for %s. Response body %q", resp.StatusCode, req.URL.Redacted(), body),
}
} }
fallthrough fallthrough
default: default:

View file

@ -41,7 +41,9 @@ The sandbox cluster installation is running under the constant load generated by
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-rule.evalDelay` flag and `eval_delay` attribute for [Groups](https://docs.victoriametrics.com/vmalert.html#groups). The new flag and param can be used to adjust the `time` parameter for rule evaluation requests to match [intentional query delay](https://docs.victoriametrics.com/keyConcepts.html#query-latency) from the datasource. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-rule.evalDelay` flag and `eval_delay` attribute for [Groups](https://docs.victoriametrics.com/vmalert.html#groups). The new flag and param can be used to adjust the `time` parameter for rule evaluation requests to match [intentional query delay](https://docs.victoriametrics.com/keyConcepts.html#query-latency) from the datasource. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): skip job with error logs if there is incorrect syntax under `scrape_configs`, previously will exit. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not exit on startup when [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) refer to non-existing or invalid files with auth configs, since these files may appear / updated later. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow loading TLS certificates from HTTP and HTTPS urls by specifying these urls at `cert_file` and `key_file` options inside `tls_config` and `proxy_tls_config` sections at [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU load when big number of targets are scraped over HTTPS with the same custom TLS certificate configured via `tls_config->cert_file` and `tls_config->key_file` at [scrape_config](https://docs.victoriametrics.com/sd_configs.html#scrape_configs).
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-deleteAllObjectVersions` command-line flag, which can be used for forcing removal of all object versions in remote object storage. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5121) issue and [these docs](https://docs.victoriametrics.com/vmbackup.html#permanent-deletion-of-objects-in-s3-compatible-storages) for the details. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-deleteAllObjectVersions` command-line flag, which can be used for forcing removal of all object versions in remote object storage. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5121) issue and [these docs](https://docs.victoriametrics.com/vmbackup.html#permanent-deletion-of-objects-in-s3-compatible-storages) for the details.
* FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`. * FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`.
@ -55,7 +57,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix vmalert web UI when running on 32-bit architectures machine. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix vmalert web UI when running on 32-bit architectures machine.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): don't send requests if there is wrong auth config in `datasource`, `remoteWrite`, `remoteRead` and `notifier` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): do not send requests to configured remote systems when `-datasource.*`, `-remoteWrite.*`, `-remoteRead.*` or `-notifier.*` command-line flags refer files with invalid auth configs. Previously such requests were sent without properly set auth headers. Now the requests are sent only after the files are updated with valid auth configs. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
* BUGFIX: `vmselect`: improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087). * BUGFIX: `vmselect`: improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087).
* BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component. * BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component.
* BUGFIX: dashboards/vmalert: apply `desc` sorting in tooltips for vmalert dashboard in order to improve visibility of the outliers on graph. * BUGFIX: dashboards/vmalert: apply `desc` sorting in tooltips for vmalert dashboard in order to improve visibility of the outliers on graph.
@ -63,7 +65,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details. * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): don't send requests if there is wrong auth config in `scrape_configs` and `remoteWrite` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not scrape targets if the corresponding [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) refer to files with invalid auth configs. Previously the targets were scraped without properly set auth headers in this case. Now targets are scraped only after the files are updated with valid auth configs. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly parse `ca`, `cert` and `key` options at `tls_config` section inside [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options). Previously string values couldn't be parsed for these options, since the parser was mistakenly expecting a list of `uint8` values instead. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly parse `ca`, `cert` and `key` options at `tls_config` section inside [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options). Previously string values couldn't be parsed for these options, since the parser was mistakenly expecting a list of `uint8` values instead.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly drop samples if `-streamAggr.dropInput` command-line flag is set and `-remoteWrite.streamAggr.config` contains an empty file. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5207). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly drop samples if `-streamAggr.dropInput` command-line flag is set and `-remoteWrite.streamAggr.config` contains an empty file. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5207).
* BUGFIX: [vmstorage](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent deleted series to be searchable via `/api/v1/series` API if they were re-ingested with staleness markers. This situation could happen if user deletes the series from the target and from VM, and then vmagent sends stale markers for absent series. Thanks to @ilyatrefilov for the [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069) and [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5174). * BUGFIX: [vmstorage](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent deleted series to be searchable via `/api/v1/series` API if they were re-ingested with staleness markers. This situation could happen if user deletes the series from the target and from VM, and then vmagent sends stale markers for absent series. Thanks to @ilyatrefilov for the [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069) and [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5174).

View file

@ -1,7 +1,6 @@
package promauth package promauth
import ( import (
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
@ -83,18 +82,6 @@ type TLSConfig struct {
// This can only result in lower security level if improperly set. // This can only result in lower security level if improperly set.
} }
// String returns human-readable representation of tc
func (tc *TLSConfig) String() string {
if tc == nil {
return ""
}
caHash := xxhash.Sum64([]byte(tc.CA))
certHash := xxhash.Sum64([]byte(tc.Cert))
keyHash := xxhash.Sum64([]byte(tc.Key))
return fmt.Sprintf("hash(ca)=%d, ca_file=%q, hash(cert)=%d, cert_file=%q, hash(key)=%d, key_file=%q, server_name=%q, insecure_skip_verify=%v, min_version=%q",
caHash, tc.CAFile, certHash, tc.CertFile, keyHash, tc.KeyFile, tc.ServerName, tc.InsecureSkipVerify, tc.MinVersion)
}
// Authorization represents generic authorization config. // Authorization represents generic authorization config.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/ // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/
@ -162,12 +149,6 @@ type OAuth2Config struct {
ProxyURL string `yaml:"proxy_url,omitempty"` ProxyURL string `yaml:"proxy_url,omitempty"`
} }
// String returns string representation of o.
func (o *OAuth2Config) String() string {
return fmt.Sprintf("clientID=%q, clientSecret=%q, clientSecretFile=%q, Scopes=%q, tokenURL=%q, endpointParams=%q, tlsConfig={%s}, proxyURL=%q",
o.ClientID, o.ClientSecret, o.ClientSecretFile, o.Scopes, o.TokenURL, o.EndpointParams, o.TLSConfig.String(), o.ProxyURL)
}
func (o *OAuth2Config) validate() error { func (o *OAuth2Config) validate() error {
if o.ClientID == "" { if o.ClientID == "" {
return fmt.Errorf("client_id cannot be empty") return fmt.Errorf("client_id cannot be empty")
@ -188,11 +169,26 @@ type oauth2ConfigInternal struct {
mu sync.Mutex mu sync.Mutex
cfg *clientcredentials.Config cfg *clientcredentials.Config
clientSecretFile string clientSecretFile string
ctx context.Context
tokenSource oauth2.TokenSource // ac contains auth config needed for initializing tls config
ac *Config
proxyURL string
proxyURLFunc func(*http.Request) (*url.URL, error)
ctx context.Context
tokenSource oauth2.TokenSource
}
func (oi *oauth2ConfigInternal) String() string {
return fmt.Sprintf("clientID=%q, clientSecret=%q, clientSecretFile=%q, scopes=%q, endpointParams=%q, tokenURL=%q, proxyURL=%q, tlsConfig={%s}",
oi.cfg.ClientID, oi.cfg.ClientSecret, oi.clientSecretFile, oi.cfg.Scopes, oi.cfg.EndpointParams, oi.cfg.TokenURL, oi.proxyURL, oi.ac.String())
} }
func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInternal, error) { func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInternal, error) {
if err := o.validate(); err != nil {
return nil, err
}
oi := &oauth2ConfigInternal{ oi := &oauth2ConfigInternal{
cfg: &clientcredentials.Config{ cfg: &clientcredentials.Config{
ClientID: o.ClientID, ClientID: o.ClientID,
@ -204,11 +200,8 @@ func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInte
} }
if o.ClientSecretFile != "" { if o.ClientSecretFile != "" {
oi.clientSecretFile = fs.GetFilepath(baseDir, o.ClientSecretFile) oi.clientSecretFile = fs.GetFilepath(baseDir, o.ClientSecretFile)
secret, err := readPasswordFromFile(oi.clientSecretFile) // There is no need in reading oi.clientSecretFile now, since it may be missing right now.
if err != nil { // It is read later before performing oauth2 request to server.
return nil, fmt.Errorf("cannot read OAuth2 secret from %q: %w", oi.clientSecretFile, err)
}
oi.cfg.ClientSecret = secret
} }
opts := &Options{ opts := &Options{
BaseDir: baseDir, BaseDir: baseDir,
@ -216,25 +209,17 @@ func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInte
} }
ac, err := opts.NewConfig() ac, err := opts.NewConfig()
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot initialize TLS config for OAuth2: %w", err) return nil, fmt.Errorf("cannot parse TLS config for OAuth2: %w", err)
} }
tlsCfg := ac.NewTLSConfig() oi.ac = ac
var proxyURLFunc func(*http.Request) (*url.URL, error)
if o.ProxyURL != "" { if o.ProxyURL != "" {
u, err := url.Parse(o.ProxyURL) u, err := url.Parse(o.ProxyURL)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse proxy_url=%q: %w", o.ProxyURL, err) return nil, fmt.Errorf("cannot parse proxy_url=%q: %w", o.ProxyURL, err)
} }
proxyURLFunc = http.ProxyURL(u) oi.proxyURL = o.ProxyURL
oi.proxyURLFunc = http.ProxyURL(u)
} }
c := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: proxyURLFunc,
},
}
oi.ctx = context.WithValue(context.Background(), oauth2.HTTPClient, c)
oi.tokenSource = oi.cfg.TokenSource(oi.ctx)
return oi, nil return oi, nil
} }
@ -246,10 +231,32 @@ func urlValuesFromMap(m map[string]string) url.Values {
return result return result
} }
func (oi *oauth2ConfigInternal) initTokenSource() error {
tlsCfg, err := oi.ac.NewTLSConfig()
if err != nil {
return fmt.Errorf("cannot initialize TLS config for OAuth2: %w", err)
}
c := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: oi.proxyURLFunc,
},
}
oi.ctx = context.WithValue(context.Background(), oauth2.HTTPClient, c)
oi.tokenSource = oi.cfg.TokenSource(oi.ctx)
return nil
}
func (oi *oauth2ConfigInternal) getTokenSource() (oauth2.TokenSource, error) { func (oi *oauth2ConfigInternal) getTokenSource() (oauth2.TokenSource, error) {
oi.mu.Lock() oi.mu.Lock()
defer oi.mu.Unlock() defer oi.mu.Unlock()
if oi.tokenSource == nil {
if err := oi.initTokenSource(); err != nil {
return nil, err
}
}
if oi.clientSecretFile == "" { if oi.clientSecretFile == "" {
return oi.tokenSource, nil return oi.tokenSource, nil
} }
@ -267,23 +274,21 @@ func (oi *oauth2ConfigInternal) getTokenSource() (oauth2.TokenSource, error) {
// Config is auth config. // Config is auth config.
type Config struct { type Config struct {
// Optional TLS config tlsServerName string
TLSRootCA *x509.CertPool tlsInsecureSkipVerify bool
TLSServerName string tlsMinVersion uint16
TLSInsecureSkipVerify bool
TLSMinVersion uint16
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error) getTLSRootCACached getTLSRootCAFunc
tlsCertDigest string tlsRootCADigest string
getAuthHeader func() (string, error) getTLSCertCached getTLSCertFunc
authHeaderLock sync.Mutex tlsCertDigest string
authHeader string
authHeaderDeadline uint64
headers []keyValue getAuthHeaderCached getAuthHeaderFunc
authHeaderDigest string
authDigest string headers []keyValue
headersDigest string
} }
type keyValue struct { type keyValue struct {
@ -329,7 +334,7 @@ func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) error {
if setAuthHeader { if setAuthHeader {
ah, err := ac.GetAuthHeader() ah, err := ac.GetAuthHeader()
if err != nil { if err != nil {
return fmt.Errorf("failed to set request auth header: %w", err) return fmt.Errorf("failed to obtain Authorization request header: %w", err)
} }
if ah != "" { if ah != "" {
reqHeaders.Set("Authorization", ah) reqHeaders.Set("Authorization", ah)
@ -347,7 +352,7 @@ func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool)
if setAuthHeader { if setAuthHeader {
ah, err := ac.GetAuthHeader() ah, err := ac.GetAuthHeader()
if err != nil { if err != nil {
return err return fmt.Errorf("failed to obtaine Authorization request header: %w", err)
} }
if ah != "" { if ah != "" {
reqHeaders.Set("Authorization", ah) reqHeaders.Set("Authorization", ah)
@ -358,21 +363,10 @@ func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool)
// GetAuthHeader returns optional `Authorization: ...` http header. // GetAuthHeader returns optional `Authorization: ...` http header.
func (ac *Config) GetAuthHeader() (string, error) { func (ac *Config) GetAuthHeader() (string, error) {
f := ac.getAuthHeader if f := ac.getAuthHeaderCached; f != nil {
if f == nil { return f()
return "", nil
} }
ac.authHeaderLock.Lock() return "", nil
defer ac.authHeaderLock.Unlock()
if fasttime.UnixTimestamp() > ac.authHeaderDeadline {
var err error
if ac.authHeader, err = f(); err != nil {
return "", err
}
// Cache the authHeader for a second.
ac.authHeaderDeadline = fasttime.UnixTimestamp() + 1
}
return ac.authHeader, nil
} }
// String returns human-readable representation for ac. // String returns human-readable representation for ac.
@ -380,53 +374,103 @@ func (ac *Config) GetAuthHeader() (string, error) {
// It is also used for comparing Config objects for equality. If two Config // It is also used for comparing Config objects for equality. If two Config
// objects have the same string representation, then they are considered equal. // objects have the same string representation, then they are considered equal.
func (ac *Config) String() string { func (ac *Config) String() string {
return fmt.Sprintf("AuthDigest=%s, Headers=%s, TLSRootCA=%s, TLSCertificate=%s, TLSServerName=%s, TLSInsecureSkipVerify=%v, TLSMinVersion=%d", return fmt.Sprintf("AuthHeader=%s, Headers=%s, TLSRootCA=%s, TLSCert=%s, TLSServerName=%s, TLSInsecureSkipVerify=%v, TLSMinVersion=%d",
ac.authDigest, ac.headers, ac.tlsRootCAString(), ac.tlsCertDigest, ac.TLSServerName, ac.TLSInsecureSkipVerify, ac.TLSMinVersion) ac.authHeaderDigest, ac.headersDigest, ac.tlsRootCADigest, ac.tlsCertDigest, ac.tlsServerName, ac.tlsInsecureSkipVerify, ac.tlsMinVersion)
} }
func (ac *Config) tlsRootCAString() string { // getAuthHeaderFunc must return <value> for 'Authorization: <value>' http request header
if ac.TLSRootCA == nil { type getAuthHeaderFunc func() (string, error)
return ""
func newGetAuthHeaderCached(getAuthHeader getAuthHeaderFunc) getAuthHeaderFunc {
if getAuthHeader == nil {
return nil
}
var mu sync.Mutex
var deadline uint64
var ah string
var err error
return func() (string, error) {
// Cahe the auth header and the error for up to a second in order to save CPU time
// on reading and parsing auth headers from files.
// This also reduces load on OAuth2 server when oauth2 config is enabled.
mu.Lock()
defer mu.Unlock()
if fasttime.UnixTimestamp() > deadline {
ah, err = getAuthHeader()
deadline = fasttime.UnixTimestamp() + 1
}
return ah, err
}
}
type getTLSRootCAFunc func() (*x509.CertPool, error)
func newGetTLSRootCACached(getTLSRootCA getTLSRootCAFunc) getTLSRootCAFunc {
if getTLSRootCA == nil {
return nil
}
var mu sync.Mutex
var deadline uint64
var rootCA *x509.CertPool
var err error
return func() (*x509.CertPool, error) {
// Cache the root CA and the error for up to a second in order to save CPU time
// on reading and parsing the root CA from files.
mu.Lock()
defer mu.Unlock()
if fasttime.UnixTimestamp() > deadline {
rootCA, err = getTLSRootCA()
deadline = fasttime.UnixTimestamp() + 1
}
return rootCA, err
}
}
type getTLSCertFunc func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error)
func newGetTLSCertCached(getTLSCert getTLSCertFunc) getTLSCertFunc {
if getTLSCert == nil {
return nil
}
var mu sync.Mutex
var deadline uint64
var cert *tls.Certificate
var err error
return func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) {
// Cache the certificate and the error for up to a second in order to save CPU time
// on certificate parsing when TLS connections are frequently re-established.
mu.Lock()
defer mu.Unlock()
if fasttime.UnixTimestamp() > deadline {
cert, err = getTLSCert(cri)
deadline = fasttime.UnixTimestamp() + 1
}
return cert, err
} }
data := ac.TLSRootCA.Subjects()
return string(bytes.Join(data, []byte("\n")))
} }
// NewTLSConfig returns new TLS config for the given ac. // NewTLSConfig returns new TLS config for the given ac.
func (ac *Config) NewTLSConfig() *tls.Config { func (ac *Config) NewTLSConfig() (*tls.Config, error) {
tlsCfg := &tls.Config{ tlsCfg := &tls.Config{
ClientSessionCache: tls.NewLRUClientSessionCache(0), ClientSessionCache: tls.NewLRUClientSessionCache(0),
} }
if ac == nil { if ac == nil {
return tlsCfg return tlsCfg, nil
} }
if ac.getTLSCert != nil { tlsCfg.GetClientCertificate = ac.getTLSCertCached
var certLock sync.Mutex if f := ac.getTLSRootCACached; f != nil {
var cert *tls.Certificate rootCA, err := f()
var certDeadline uint64 if err != nil {
tlsCfg.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) { return nil, fmt.Errorf("cannot load root CAs: %w", err)
// Cache the certificate for up to a second in order to save CPU time
// on certificate parsing when TLS connection are frequently re-established.
certLock.Lock()
defer certLock.Unlock()
if fasttime.UnixTimestamp() > certDeadline {
c, err := ac.getTLSCert(cri)
if err != nil {
return nil, err
}
cert = c
certDeadline = fasttime.UnixTimestamp() + 1
}
return cert, nil
} }
tlsCfg.RootCAs = rootCA
} }
tlsCfg.RootCAs = ac.TLSRootCA tlsCfg.ServerName = ac.tlsServerName
tlsCfg.ServerName = ac.TLSServerName tlsCfg.InsecureSkipVerify = ac.tlsInsecureSkipVerify
tlsCfg.InsecureSkipVerify = ac.TLSInsecureSkipVerify tlsCfg.MinVersion = ac.tlsMinVersion
tlsCfg.MinVersion = ac.TLSMinVersion
// Do not set tlsCfg.MaxVersion, since this has no sense from security PoV. // Do not set tlsCfg.MaxVersion, since this has no sense from security PoV.
// This can only result in lower security level if improperly set. // This can only result in lower security level if improperly set.
return tlsCfg return tlsCfg, nil
} }
// NewConfig creates auth config for the given hcc. // NewConfig creates auth config for the given hcc.
@ -525,17 +569,13 @@ func (opts *Options) NewConfig() (*Config, error) {
if opts.BearerToken != "" { if opts.BearerToken != "" {
return nil, fmt.Errorf("both `bearer_token`=%q and `bearer_token_file`=%q are set", opts.BearerToken, opts.BearerTokenFile) return nil, fmt.Errorf("both `bearer_token`=%q and `bearer_token_file`=%q are set", opts.BearerToken, opts.BearerTokenFile)
} }
if err := actx.initFromBearerTokenFile(baseDir, opts.BearerTokenFile); err != nil { actx.mustInitFromBearerTokenFile(baseDir, opts.BearerTokenFile)
return nil, err
}
} }
if opts.BearerToken != "" { if opts.BearerToken != "" {
if actx.getAuthHeader != nil { if actx.getAuthHeader != nil {
return nil, fmt.Errorf("cannot simultaneously use `authorization`, `basic_auth` and `bearer_token`") return nil, fmt.Errorf("cannot simultaneously use `authorization`, `basic_auth` and `bearer_token`")
} }
if err := actx.initFromBearerToken(opts.BearerToken); err != nil { actx.mustInitFromBearerToken(opts.BearerToken)
return nil, err
}
} }
if opts.OAuth2 != nil { if opts.OAuth2 != nil {
if actx.getAuthHeader != nil { if actx.getAuthHeader != nil {
@ -555,29 +595,42 @@ func (opts *Options) NewConfig() (*Config, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
hd := xxhash.New()
for _, kv := range headers {
hd.Sum([]byte(kv.key))
hd.Sum([]byte("="))
hd.Sum([]byte(kv.value))
hd.Sum([]byte(","))
}
headersDigest := fmt.Sprintf("digest(headers)=%d", hd.Sum64())
ac := &Config{ ac := &Config{
TLSRootCA: tctx.rootCA, tlsServerName: tctx.serverName,
TLSServerName: tctx.serverName, tlsInsecureSkipVerify: tctx.insecureSkipVerify,
TLSInsecureSkipVerify: tctx.insecureSkipVerify, tlsMinVersion: tctx.minVersion,
TLSMinVersion: tctx.minVersion,
getTLSCert: tctx.getTLSCert, getTLSRootCACached: newGetTLSRootCACached(tctx.getTLSRootCA),
tlsCertDigest: tctx.tlsCertDigest, tlsRootCADigest: tctx.tlsRootCADigest,
getTLSCertCached: newGetTLSCertCached(tctx.getTLSCert),
tlsCertDigest: tctx.tlsCertDigest,
getAuthHeaderCached: newGetAuthHeaderCached(actx.getAuthHeader),
authHeaderDigest: actx.authHeaderDigest,
getAuthHeader: actx.getAuthHeader,
headers: headers, headers: headers,
authDigest: actx.authDigest, headersDigest: headersDigest,
} }
return ac, nil return ac, nil
} }
type authContext struct { type authContext struct {
// getAuthHeader must return <value> for 'Authorization: <value>' http request header // getAuthHeader must return <value> for 'Authorization: <value>' http request header
getAuthHeader func() (string, error) getAuthHeader getAuthHeaderFunc
// authDigest must contain the digest for the used authorization // authHeaderDigest must contain the digest for the used authorization
// The digest must be changed whenever the original config changes. // The digest must be changed whenever the original config changes.
authDigest string authHeaderDigest string
} }
func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization) error { func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization) error {
@ -586,10 +639,11 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
azType = az.Type azType = az.Type
} }
if az.CredentialsFile == "" { if az.CredentialsFile == "" {
ah := azType + " " + az.Credentials.String()
actx.getAuthHeader = func() (string, error) { actx.getAuthHeader = func() (string, error) {
return azType + " " + az.Credentials.String(), nil return ah, nil
} }
actx.authDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials) actx.authHeaderDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials)
return nil return nil
} }
if az.Credentials != nil { if az.Credentials != nil {
@ -603,7 +657,7 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization
} }
return azType + " " + token, nil return azType + " " + token, nil
} }
actx.authDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath) actx.authHeaderDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath)
return nil return nil
} }
@ -612,13 +666,14 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
return fmt.Errorf("missing `username` in `basic_auth` section") return fmt.Errorf("missing `username` in `basic_auth` section")
} }
if ba.PasswordFile == "" { if ba.PasswordFile == "" {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password.String()
token64 := base64.StdEncoding.EncodeToString([]byte(token))
ah := "Basic " + token64
actx.getAuthHeader = func() (string, error) { actx.getAuthHeader = func() (string, error) {
// See https://en.wikipedia.org/wiki/Basic_access_authentication return ah, nil
token := ba.Username + ":" + ba.Password.String()
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64, nil
} }
actx.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password) actx.authHeaderDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil return nil
} }
if ba.Password != nil { if ba.Password != nil {
@ -635,11 +690,11 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo
token64 := base64.StdEncoding.EncodeToString([]byte(token)) token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64, nil return "Basic " + token64, nil
} }
actx.authDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath) actx.authHeaderDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath)
return nil return nil
} }
func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile string) error { func (actx *authContext) mustInitFromBearerTokenFile(baseDir string, bearerTokenFile string) {
filePath := fs.GetFilepath(baseDir, bearerTokenFile) filePath := fs.GetFilepath(baseDir, bearerTokenFile)
actx.getAuthHeader = func() (string, error) { actx.getAuthHeader = func() (string, error) {
token, err := readPasswordFromFile(filePath) token, err := readPasswordFromFile(filePath)
@ -648,28 +703,23 @@ func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile
} }
return "Bearer " + token, nil return "Bearer " + token, nil
} }
actx.authDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath) actx.authHeaderDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath)
return nil
} }
func (actx *authContext) initFromBearerToken(bearerToken string) error { func (actx *authContext) mustInitFromBearerToken(bearerToken string) {
ah := "Bearer " + bearerToken
actx.getAuthHeader = func() (string, error) { actx.getAuthHeader = func() (string, error) {
return "Bearer " + bearerToken, nil return ah, nil
} }
actx.authDigest = fmt.Sprintf("bearer(token=%q)", bearerToken) actx.authHeaderDigest = fmt.Sprintf("bearer(token=%q)", bearerToken)
return nil
} }
func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) error { func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) error {
if err := o.validate(); err != nil { oi, err := newOAuth2ConfigInternal(baseDir, o)
if err != nil {
return err return err
} }
actx.getAuthHeader = func() (string, error) { actx.getAuthHeader = func() (string, error) {
oi, err := newOAuth2ConfigInternal(baseDir, o)
if err != nil {
return "", err
}
ts, err := oi.getTokenSource() ts, err := oi.getTokenSource()
if err != nil { if err != nil {
return "", fmt.Errorf("cannot get OAuth2 tokenSource: %w", err) return "", fmt.Errorf("cannot get OAuth2 tokenSource: %w", err)
@ -680,15 +730,17 @@ func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) e
} }
return t.Type() + " " + t.AccessToken, nil return t.Type() + " " + t.AccessToken, nil
} }
actx.authDigest = fmt.Sprintf("oauth2(%s)", o.String()) actx.authHeaderDigest = fmt.Sprintf("oauth2(%s)", oi.String())
return nil return nil
} }
type tlsContext struct { type tlsContext struct {
getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error) getTLSCert getTLSCertFunc
tlsCertDigest string tlsCertDigest string
rootCA *x509.CertPool getTLSRootCA getTLSRootCAFunc
tlsRootCADigest string
serverName string serverName string
insecureSkipVerify bool insecureSkipVerify bool
minVersion uint16 minVersion uint16
@ -708,37 +760,60 @@ func (tctx *tlsContext) initFromTLSConfig(baseDir string, tc *TLSConfig) error {
h := xxhash.Sum64([]byte(tc.Key)) ^ xxhash.Sum64([]byte(tc.Cert)) h := xxhash.Sum64([]byte(tc.Key)) ^ xxhash.Sum64([]byte(tc.Cert))
tctx.tlsCertDigest = fmt.Sprintf("digest(key+cert)=%d", h) tctx.tlsCertDigest = fmt.Sprintf("digest(key+cert)=%d", h)
} else if tc.CertFile != "" || tc.KeyFile != "" { } else if tc.CertFile != "" || tc.KeyFile != "" {
certPath := fs.GetFilepath(baseDir, tc.CertFile)
keyPath := fs.GetFilepath(baseDir, tc.KeyFile)
tctx.getTLSCert = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { tctx.getTLSCert = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
// Re-read TLS certificate from disk. This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420 // Re-read TLS certificate from disk. This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420
certPath := fs.GetFilepath(baseDir, tc.CertFile) certData, err := fs.ReadFileOrHTTP(certPath)
keyPath := fs.GetFilepath(baseDir, tc.KeyFile) if err != nil {
cert, err := tls.LoadX509KeyPair(certPath, keyPath) return nil, fmt.Errorf("cannot read TLS certificate from %q: %w", certPath, err)
}
keyData, err := fs.ReadFileOrHTTP(keyPath)
if err != nil {
return nil, fmt.Errorf("cannot read TLS key from %q: %w", keyPath, err)
}
cert, err := tls.X509KeyPair(certData, keyData)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot load TLS certificate from `cert_file`=%q, `key_file`=%q: %w", tc.CertFile, tc.KeyFile, err) return nil, fmt.Errorf("cannot load TLS certificate from `cert_file`=%q, `key_file`=%q: %w", tc.CertFile, tc.KeyFile, err)
} }
return &cert, nil return &cert, nil
} }
// Check whether the configured TLS cert can be loaded.
if _, err := tctx.getTLSCert(nil); err != nil {
return err
}
tctx.tlsCertDigest = fmt.Sprintf("certFile=%q, keyFile=%q", tc.CertFile, tc.KeyFile) tctx.tlsCertDigest = fmt.Sprintf("certFile=%q, keyFile=%q", tc.CertFile, tc.KeyFile)
} }
if len(tc.CA) != 0 { if len(tc.CA) != 0 {
tctx.rootCA = x509.NewCertPool() rootCA := x509.NewCertPool()
if !tctx.rootCA.AppendCertsFromPEM([]byte(tc.CA)) { if !rootCA.AppendCertsFromPEM([]byte(tc.CA)) {
return fmt.Errorf("cannot parse data from `ca` value") return fmt.Errorf("cannot parse data from `ca` value")
} }
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
return rootCA, nil
}
h := xxhash.Sum64([]byte(tc.CA))
tctx.tlsRootCADigest = fmt.Sprintf("digest(CA)=%d", h)
} else if tc.CAFile != "" { } else if tc.CAFile != "" {
path := fs.GetFilepath(baseDir, tc.CAFile) path := fs.GetFilepath(baseDir, tc.CAFile)
tctx.getTLSRootCA = func() (*x509.CertPool, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot read `ca_file`: %w", err)
}
rootCA := x509.NewCertPool()
if !rootCA.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("cannot parse data read from `ca_file` %q", tc.CAFile)
}
return rootCA, nil
}
// The Config.NewTLSConfig() is called only once per each scrape target initialization.
// So, the tlsRootCADigest must contain the hash of CAFile contents additionally to CAFile itself,
// in order to properly reload scrape target configs when CAFile contents changes.
data, err := fs.ReadFileOrHTTP(path) data, err := fs.ReadFileOrHTTP(path)
if err != nil { if err != nil {
return fmt.Errorf("cannot read `ca_file` %q: %w", tc.CAFile, err) // Do not return the error to the caller, since this may result in fatal error.
} // The CAFile contents can become available on the next check of scrape configs.
tctx.rootCA = x509.NewCertPool() data = []byte("read error")
if !tctx.rootCA.AppendCertsFromPEM(data) {
return fmt.Errorf("cannot parse data from `ca_file` %q", tc.CAFile)
} }
h := xxhash.Sum64(data)
tctx.tlsRootCADigest = fmt.Sprintf("caFile=%q, digest(caFile)=%d", tc.CAFile, h)
} }
v, err := netutil.ParseTLSVersion(tc.MinVersion) v, err := netutil.ParseTLSVersion(tc.MinVersion)
if err != nil { if err != nil {

View file

@ -6,175 +6,511 @@ import (
"testing" "testing"
"github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/fasthttp"
"gopkg.in/yaml.v2"
) )
func TestNewConfig(t *testing.T) { func TestOptionsNewConfigFailure(t *testing.T) {
tests := []struct { f := func(yamlConfig string) {
name string t.Helper()
opts Options
wantErr bool var hcc HTTPClientConfig
wantErrWhenSetHeader bool if err := yaml.UnmarshalStrict([]byte(yamlConfig), &hcc); err != nil {
expectHeader string t.Fatalf("cannot parse: %s", err)
}{ }
{ cfg, err := hcc.NewConfig("")
name: "OAuth2 config", if err == nil {
opts: Options{ t.Fatalf("expecting non-nil error")
OAuth2: &OAuth2Config{ }
ClientID: "some-id", if cfg != nil {
ClientSecret: NewSecret("some-secret"), t.Fatalf("expecting nil cfg; got %s", cfg.String())
TokenURL: "http://localhost:8511", }
},
},
expectHeader: "Bearer some-token",
},
{
name: "OAuth2 config with file",
opts: Options{
OAuth2: &OAuth2Config{
ClientID: "some-id",
ClientSecretFile: "testdata/test_secretfile.txt",
TokenURL: "http://localhost:8511",
},
},
expectHeader: "Bearer some-token",
},
{
name: "OAuth2 want err",
opts: Options{
OAuth2: &OAuth2Config{
ClientID: "some-id",
ClientSecret: NewSecret("some-secret"),
ClientSecretFile: "testdata/test_secretfile.txt",
TokenURL: "http://localhost:8511",
},
},
wantErr: true,
},
{
name: "OAuth2 with wrong tls",
opts: Options{
OAuth2: &OAuth2Config{
ClientID: "some-id",
ClientSecret: NewSecret("some-secret"),
TokenURL: "http://localhost:8511",
TLSConfig: &TLSConfig{
InsecureSkipVerify: true,
CAFile: "non-existing-file",
},
},
},
wantErrWhenSetHeader: true,
},
{
name: "basic Auth config",
opts: Options{
BasicAuth: &BasicAuthConfig{
Username: "user",
Password: NewSecret("password"),
},
},
expectHeader: "Basic dXNlcjpwYXNzd29yZA==",
},
{
name: "basic Auth config with file",
opts: Options{
BasicAuth: &BasicAuthConfig{
Username: "user",
PasswordFile: "testdata/test_secretfile.txt",
},
},
expectHeader: "Basic dXNlcjpzZWNyZXQtY29udGVudA==",
},
{
name: "basic Auth config with non-existing file",
opts: Options{
BasicAuth: &BasicAuthConfig{
Username: "user",
PasswordFile: "testdata/non-existing-file",
},
},
wantErrWhenSetHeader: true,
},
{
name: "want Authorization",
opts: Options{
Authorization: &Authorization{
Type: "Bearer",
Credentials: NewSecret("Value"),
},
},
expectHeader: "Bearer Value",
},
{
name: "token file",
opts: Options{
BearerTokenFile: "testdata/test_secretfile.txt",
},
expectHeader: "Bearer secret-content",
},
{
name: "token with tls",
opts: Options{
BearerToken: "some-token",
TLSConfig: &TLSConfig{
InsecureSkipVerify: true,
},
},
expectHeader: "Bearer some-token",
},
{
name: "tls with non-existing file",
opts: Options{
BearerToken: "some-token",
TLSConfig: &TLSConfig{
InsecureSkipVerify: true,
CAFile: "non-existing-file",
},
},
wantErr: true,
},
} }
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { // authorization: both credentials and credentials_file are set
if tt.opts.OAuth2 != nil { f(`
r := http.NewServeMux() authorization:
r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { credentials: foo-bar
w.Header().Set("Content-Type", "application/json") credentials_file: testdata/test_secretfile.txt
w.Write([]byte(`{"access_token":"some-token","token_type": "Bearer"}`)) `)
})
mock := httptest.NewServer(r) // basic_auth: both authorization and basic_auth are set
tt.opts.OAuth2.TokenURL = mock.URL f(`
} authorization:
got, err := tt.opts.NewConfig() credentials: foo-bar
if (err != nil) != tt.wantErr { basic_auth:
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErr) username: user
return password: pass
} `)
if got != nil {
req, err := http.NewRequest(http.MethodGet, "http://foo", nil) // basic_auth: missing username
if err != nil { f(`
t.Fatalf("unexpected error in http.NewRequest: %s", err) basic_auth:
} password: pass
err = got.SetHeaders(req, true) `)
if (err != nil) != tt.wantErrWhenSetHeader {
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader) // basic_auth: password and password_file are set
} f(`
ah := req.Header.Get("Authorization") basic_auth:
if ah != tt.expectHeader { username: user
t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, tt.expectHeader) password: pass
} password_file: testdata/test_secretfile.txt
var fhreq fasthttp.Request `)
err = got.SetFasthttpHeaders(&fhreq, true)
if (err != nil) != tt.wantErrWhenSetHeader { // bearer_token: both authorization and bearer_token are set
t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader) f(`
} authorization:
ahb := fhreq.Header.Peek("Authorization") credentials: foo-bar
if string(ahb) != tt.expectHeader { bearer_token: bearer-aaa
t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, tt.expectHeader) `)
}
} // bearer_token: both basic_auth and bearer_token are set
}) f(`
bearer_token: bearer-aaa
basic_auth:
username: user
password: pass
`)
// bearer_token_file: both authorization and bearer_token_file are set
f(`
authorization:
credentials: foo-bar
bearer_token_file: testdata/test_secretfile.txt
`)
// bearer_token_file: both basic_auth and bearer_token_file are set
f(`
bearer_token_file: testdata/test_secretfile.txt
basic_auth:
username: user
password: pass
`)
// both bearer_token_file and bearer_token are set
f(`
bearer_token_file: testdata/test_secretfile.txt
bearer_token: foo-bar
`)
// oauth2: both oauth2 and authorization are set
f(`
authorization:
credentials: foo-bar
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
`)
// oauth2: both oauth2 and basic_auth are set
f(`
basic_auth:
username: user
password: pass
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
`)
// oauth2: both oauth2 and bearer_token are set
f(`
bearer_token: foo-bar
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
`)
// oauth2: both oauth2 and bearer_token_file are set
f(`
bearer_token_file: testdata/test_secretfile.txt
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
`)
// oauth2: missing client_id
f(`
oauth2:
client_secret: some-secret
token_url: http://some-url
`)
// oauth2: invalid inline tls config
f(`
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
tls_config:
key: foobar
cert: baz
`)
// oauth2: invalid ca at tls_config
f(`
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
tls_config:
ca: foobar
`)
// oauth2: invalid min_version at tls_config
f(`
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
tls_config:
min_version: foobar
`)
// oauth2: invalid proxy_url
f(`
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
proxy_url: ":invalid-proxy-url"
`)
// tls_config: invalid ca
f(`
tls_config:
ca: foobar
`)
// invalid headers
f(`
headers:
- foobar
`)
}
func TestOauth2ConfigParseFailure(t *testing.T) {
f := func(yamlConfig string) {
t.Helper()
var cfg OAuth2Config
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &cfg); err == nil {
t.Fatalf("expecting non-nil error")
}
} }
// invalid yaml
f("afdsfds")
// unknown fields
f("foobar: baz")
}
func TestOauth2ConfigValidateFailure(t *testing.T) {
f := func(yamlConfig string) {
t.Helper()
var cfg OAuth2Config
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &cfg); err != nil {
t.Fatalf("cannot unmarshal config: %s", err)
}
if err := cfg.validate(); err == nil {
t.Fatalf("expecting non-nil error")
}
}
// emtpy client_id
f(`
client_secret: some-secret
token_url: http://some-url
`)
// missing client_secret and client_secret_file
f(`
client_id: some-id
token_url: http://some-url/
`)
// client_secret and client_secret_file are set simultaneously
f(`
client_id: some-id
client_secret: some-secret
client_secret_file: testdata/test_secretfile.txt
token_url: http://some-url/
`)
// missing token_url
f(`
client_id: some-id
client_secret: some-secret
`)
}
func TestOauth2ConfigValidateSuccess(t *testing.T) {
f := func(yamlConfig string) {
t.Helper()
var cfg OAuth2Config
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &cfg); err != nil {
t.Fatalf("cannot parse: %s", err)
}
if err := cfg.validate(); err != nil {
t.Fatalf("cannot validate: %s", err)
}
}
f(`
client_id: some-id
client_secret: some-secret
token_url: http://some-url/
proxy_url: http://some-proxy/abc
scopes: [read, write, execute]
endpoint_params:
foo: bar
abc: def
tls_config:
insecure_skip_verify: true
`)
}
func TestConfigGetAuthHeaderFailure(t *testing.T) {
f := func(yamlConfig string) {
t.Helper()
var hcc HTTPClientConfig
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &hcc); err != nil {
t.Fatalf("cannot parse: %s", err)
}
cfg, err := hcc.NewConfig("")
if err != nil {
t.Fatalf("cannot initialize config: %s", err)
}
// Verify that GetAuthHeader() returns error
ah, err := cfg.GetAuthHeader()
if err == nil {
t.Fatalf("expecting non-nil error from GetAuthHeader()")
}
if ah != "" {
t.Fatalf("expecting empty auth header; got %q", ah)
}
// Verify that SetHeaders() returns error
req, err := http.NewRequest(http.MethodGet, "http://foo", nil)
if err != nil {
t.Fatalf("unexpected error in http.NewRequest: %s", err)
}
if err := cfg.SetHeaders(req, true); err == nil {
t.Fatalf("expecting non-nil error from SetHeaders()")
}
// Verify that cfg.SetFasthttpHeaders() returns error
var fhreq fasthttp.Request
if err := cfg.SetFasthttpHeaders(&fhreq, true); err == nil {
t.Fatalf("expecting non-nil error from SetFasthttpHeaders()")
}
// Verify that the tls cert cannot be loaded properly if it exists
if f := cfg.getTLSCertCached; f != nil {
cert, err := f(nil)
if err == nil {
t.Fatalf("expecting non-nil error in getTLSCertCached()")
}
if cert != nil {
t.Fatalf("expecting nil cert from getTLSCertCached()")
}
}
}
// oauth2 with invalid proxy_url
f(`
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
proxy_url: invalid-proxy-url
`)
// oauth2 with non-existing client_secret_file
f(`
oauth2:
client_id: some-id
client_secret_file: non-existing-file
token_url: http://some-url
`)
// non-existing root ca file for oauth2
f(`
oauth2:
client_id: some-id
client_secret: some-secret
token_url: http://some-url
tls_config:
ca_file: non-existing-file
`)
// basic auth via non-existing file
f(`
basic_auth:
username: user
password_file: non-existing-file
`)
// bearer token via non-existing file
f(`
bearer_token_file: non-existing-file
`)
// authorization creds via non-existing file
f(`
authorization:
type: foobar
credentials_file: non-existing-file
`)
}
func TestConfigGetAuthHeaderSuccess(t *testing.T) {
f := func(yamlConfig string, ahExpected string) {
t.Helper()
var hcc HTTPClientConfig
if err := yaml.UnmarshalStrict([]byte(yamlConfig), &hcc); err != nil {
t.Fatalf("cannot unmarshal config: %s", err)
}
if hcc.OAuth2 != nil {
if hcc.OAuth2.TokenURL != "replace-with-mock-url" {
t.Fatalf("unexpected token_url: %q; want `replace-with-mock-url`", hcc.OAuth2.TokenURL)
}
r := http.NewServeMux()
r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"access_token":"test-oauth2-token","token_type": "Bearer"}`))
})
mock := httptest.NewServer(r)
hcc.OAuth2.TokenURL = mock.URL
}
cfg, err := hcc.NewConfig("")
if err != nil {
t.Fatalf("cannot initialize config: %s", err)
}
// Verify that cfg.String() returns non-empty value
cfgString := cfg.String()
if cfgString == "" {
t.Fatalf("unexpected empty result from Config.String")
}
// Check that GetAuthHeader() returns the correct header
ah, err := cfg.GetAuthHeader()
if err != nil {
t.Fatalf("unexpected auth header; got %q; want %q", ah, ahExpected)
}
// Make sure that cfg.SetHeaders() properly set Authorization header
req, err := http.NewRequest(http.MethodGet, "http://foo", nil)
if err != nil {
t.Fatalf("unexpected error in http.NewRequest: %s", err)
}
if err := cfg.SetHeaders(req, true); err != nil {
t.Fatalf("unexpected error in SetHeaders(): %s", err)
}
ah = req.Header.Get("Authorization")
if ah != ahExpected {
t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, ahExpected)
}
// Make sure that cfg.SetFasthttpHeaders() properly set Authorization header
var fhreq fasthttp.Request
if err := cfg.SetFasthttpHeaders(&fhreq, true); err != nil {
t.Fatalf("unexpected error in SetFasthttpHeaders(): %s", err)
}
ahb := fhreq.Header.Peek("Authorization")
if string(ahb) != ahExpected {
t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, ahExpected)
}
}
// Zero config
f(``, "")
// no auth config, non-zero tls config
f(`
tls_config:
insecure_skip_verify: true
`, "")
// no auth config, tls_config with non-existing files
f(`
tls_config:
key_file: non-existing-file
cert_file: non-existing-file
`, "")
// no auth config, tls_config with non-existing ca file
f(`
tls_config:
ca_file: non-existing-file
`, "")
// inline oauth2 config
f(`
oauth2:
client_id: some-id
client_secret: some-secret
endpoint_params:
foo: bar
scopes: [foo, bar]
token_url: replace-with-mock-url
`, "Bearer test-oauth2-token")
// oauth2 config with secrets in the file
f(`
oauth2:
client_id: some-id
client_secret_file: testdata/test_secretfile.txt
token_url: replace-with-mock-url
`, "Bearer test-oauth2-token")
// inline basic auth
f(`
basic_auth:
username: user
password: password
`, "Basic dXNlcjpwYXNzd29yZA==")
// basic auth via file
f(`
basic_auth:
username: user
password_file: testdata/test_secretfile.txt
`, "Basic dXNlcjpzZWNyZXQtY29udGVudA==")
// inline authorization config
f(`
authorization:
type: My-Super-Auth
credentials: some-password
`, "My-Super-Auth some-password")
// authorization config via file
f(`
authorization:
type: Foo
credentials_file: testdata/test_secretfile.txt
`, "Foo secret-content")
// inline bearer token
f(`
bearer_token: some-token
`, "Bearer some-token")
// bearer token via file
f(`
bearer_token_file: testdata/test_secretfile.txt
`, "Bearer secret-content")
} }
func TestParseHeadersSuccess(t *testing.T) { func TestParseHeadersSuccess(t *testing.T) {
@ -233,7 +569,9 @@ func TestConfigHeaders(t *testing.T) {
if result != resultExpected { if result != resultExpected {
t.Fatalf("unexpected result from HeadersNoAuthString; got\n%s\nwant\n%s", result, resultExpected) t.Fatalf("unexpected result from HeadersNoAuthString; got\n%s\nwant\n%s", result, resultExpected)
} }
_ = c.SetHeaders(req, false) if err := c.SetHeaders(req, false); err != nil {
t.Fatalf("unexpected error in SetHeaders(): %s", err)
}
for _, h := range headersParsed { for _, h := range headersParsed {
v := req.Header.Get(h.key) v := req.Header.Get(h.key)
if v != h.value { if v != h.value {
@ -241,7 +579,9 @@ func TestConfigHeaders(t *testing.T) {
} }
} }
var fhreq fasthttp.Request var fhreq fasthttp.Request
_ = c.SetFasthttpHeaders(&fhreq, false) if err := c.SetFasthttpHeaders(&fhreq, false); err != nil {
t.Fatalf("unexpected error in SetFasthttpHeaders(): %s", err)
}
for _, h := range headersParsed { for _, h := range headersParsed {
v := fhreq.Header.Peek(h.key) v := fhreq.Header.Peek(h.key)
if string(v) != h.value { if string(v) != h.value {

View file

@ -90,7 +90,11 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
isTLS := string(u.Scheme()) == "https" isTLS := string(u.Scheme()) == "https"
var tlsCfg *tls.Config var tlsCfg *tls.Config
if isTLS { if isTLS {
tlsCfg = sw.AuthConfig.NewTLSConfig() var err error
tlsCfg, err = sw.AuthConfig.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
} }
setProxyHeaders := func(req *http.Request) error { return nil } setProxyHeaders := func(req *http.Request) error { return nil }
setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil } setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil }
@ -104,7 +108,11 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
requestURI = sw.ScrapeURL requestURI = sw.ScrapeURL
isTLS = pu.Scheme == "https" isTLS = pu.Scheme == "https"
if isTLS { if isTLS {
tlsCfg = sw.ProxyAuthConfig.NewTLSConfig() var err error
tlsCfg, err = sw.ProxyAuthConfig.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize proxy tls config: %w", err)
}
} }
proxyURLOrig := proxyURL proxyURLOrig := proxyURL
setProxyHeaders = func(req *http.Request) error { setProxyHeaders = func(req *http.Request) error {
@ -160,7 +168,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
} }
} }
return &client{ c := &client{
hc: hc, hc: hc,
ctx: ctx, ctx: ctx,
sc: sc, sc: sc,
@ -168,14 +176,19 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()), scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
hostPort: hostPort, hostPort: hostPort,
requestURI: requestURI, requestURI: requestURI,
setHeaders: func(req *http.Request) error { return sw.AuthConfig.SetHeaders(req, true) }, setHeaders: func(req *http.Request) error {
setProxyHeaders: setProxyHeaders, return sw.AuthConfig.SetHeaders(req, true)
setFasthttpHeaders: func(req *fasthttp.Request) error { return sw.AuthConfig.SetFasthttpHeaders(req, true) }, },
setProxyHeaders: setProxyHeaders,
setFasthttpHeaders: func(req *fasthttp.Request) error {
return sw.AuthConfig.SetFasthttpHeaders(req, true)
},
setFasthttpProxyHeaders: setFasthttpProxyHeaders, setFasthttpProxyHeaders: setFasthttpProxyHeaders,
denyRedirects: sw.DenyRedirects, denyRedirects: sw.DenyRedirects,
disableCompression: sw.DisableCompression, disableCompression: sw.DisableCompression,
disableKeepAlive: sw.DisableKeepAlive, disableKeepAlive: sw.DisableKeepAlive,
}, nil }
return c, nil
} }
func (c *client) GetStreamReader() (*streamReader, error) { func (c *client) GetStreamReader() (*streamReader, error) {
@ -196,15 +209,13 @@ func (c *client) GetStreamReader() (*streamReader, error) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
req.Header.Set("User-Agent", scrapeUserAgent) req.Header.Set("User-Agent", scrapeUserAgent)
err = c.setHeaders(req) if err := c.setHeaders(req); err != nil {
if err != nil {
cancel() cancel()
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err) return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
} }
err = c.setProxyHeaders(req) if err := c.setProxyHeaders(req); err != nil {
if err != nil {
cancel() cancel()
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err) return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
} }
scrapeRequests.Inc() scrapeRequests.Inc()
resp, err := c.sc.Do(req) resp, err := c.sc.Do(req)
@ -221,12 +232,13 @@ func (c *client) GetStreamReader() (*streamReader, error) {
c.scrapeURL, resp.StatusCode, http.StatusOK, respBody) c.scrapeURL, resp.StatusCode, http.StatusOK, respBody)
} }
scrapesOK.Inc() scrapesOK.Inc()
return &streamReader{ sr := &streamReader{
r: resp.Body, r: resp.Body,
cancel: cancel, cancel: cancel,
scrapeURL: c.scrapeURL, scrapeURL: c.scrapeURL,
maxBodySize: int64(c.hc.MaxResponseBodySize), maxBodySize: int64(c.hc.MaxResponseBodySize),
}, nil }
return sr, nil
} }
// checks fasthttp status code for redirect as standard http/client does. // checks fasthttp status code for redirect as standard http/client does.
@ -252,13 +264,11 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx. // Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
err := c.setFasthttpHeaders(req) if err := c.setFasthttpHeaders(req); err != nil {
if err != nil { return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err)
} }
err = c.setFasthttpProxyHeaders(req) if err := c.setFasthttpProxyHeaders(req); err != nil {
if err != nil { return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
return nil, fmt.Errorf("failed to create request to %q: %w", c.scrapeURL, err)
} }
if !*disableCompression && !c.disableCompression { if !*disableCompression && !c.disableCompression {
req.Header.Set("Accept-Encoding", "gzip") req.Header.Set("Accept-Encoding", "gzip")
@ -277,7 +287,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
ctx, cancel := context.WithDeadline(c.ctx, deadline) ctx, cancel := context.WithDeadline(c.ctx, deadline)
defer cancel() defer cancel()
err = doRequestWithPossibleRetry(ctx, c.hc, req, resp) err := doRequestWithPossibleRetry(ctx, c.hc, req, resp)
statusCode := resp.StatusCode() statusCode := resp.StatusCode()
redirectsCount := 0 redirectsCount := 0
for err == nil && isStatusRedirect(statusCode) { for err == nil && isStatusRedirect(statusCode) {

View file

@ -141,9 +141,10 @@ func (cfg *Config) mustStart() {
} }
// mustRestart restarts service discovery routines at cfg if they were changed comparing to prevCfg. // mustRestart restarts service discovery routines at cfg if they were changed comparing to prevCfg.
func (cfg *Config) mustRestart(prevCfg *Config) { //
// It returns true if at least a single scraper has been restarted.
func (cfg *Config) mustRestart(prevCfg *Config) bool {
startTime := time.Now() startTime := time.Now()
logger.Infof("restarting service discovery routines...")
prevScrapeCfgByName := make(map[string]*ScrapeConfig, len(prevCfg.ScrapeConfigs)) prevScrapeCfgByName := make(map[string]*ScrapeConfig, len(prevCfg.ScrapeConfigs))
for _, scPrev := range prevCfg.ScrapeConfigs { for _, scPrev := range prevCfg.ScrapeConfigs {
@ -186,7 +187,12 @@ func (cfg *Config) mustRestart(prevCfg *Config) {
} }
jobNames := cfg.getJobNames() jobNames := cfg.getJobNames()
tsmGlobal.registerJobNames(jobNames) tsmGlobal.registerJobNames(jobNames)
logger.Infof("restarted service discovery routines in %.3f seconds, stopped=%d, started=%d, restarted=%d", time.Since(startTime).Seconds(), stopped, started, restarted) hasChanges := started > 0 || stopped > 0 || restarted > 0
if hasChanges {
logger.Infof("updated %d service discovery routines in %.3f seconds, started=%d, stopped=%d, restarted=%d",
len(cfg.ScrapeConfigs), time.Since(startTime).Seconds(), started, stopped, restarted)
}
return hasChanges
} }
func areEqualGlobalConfigs(a, b *GlobalConfig) bool { func areEqualGlobalConfigs(a, b *GlobalConfig) bool {
@ -198,7 +204,20 @@ func areEqualGlobalConfigs(a, b *GlobalConfig) bool {
func areEqualScrapeConfigs(a, b *ScrapeConfig) bool { func areEqualScrapeConfigs(a, b *ScrapeConfig) bool {
sa := a.marshalJSON() sa := a.marshalJSON()
sb := b.marshalJSON() sb := b.marshalJSON()
return string(sa) == string(sb) if string(sa) != string(sb) {
return false
}
// Compare auth configs for a and b, since they may differ by TLS CA file contents,
// which is missing in the marshaled JSON of a and b,
// but it existis in the string representation of auth configs.
if a.swc.authConfig.String() != b.swc.authConfig.String() {
return false
}
if a.swc.proxyAuthConfig.String() != b.swc.proxyAuthConfig.String() {
return false
}
return true
} }
func (sc *ScrapeConfig) unmarshalJSON(data []byte) error { func (sc *ScrapeConfig) unmarshalJSON(data []byte) error {
@ -400,29 +419,28 @@ func loadStaticConfigs(path string) ([]StaticConfig, error) {
} }
// loadConfig loads Prometheus config from the given path. // loadConfig loads Prometheus config from the given path.
func loadConfig(path string) (*Config, []byte, error) { func loadConfig(path string) (*Config, error) {
data, err := fs.ReadFileOrHTTP(path) data, err := fs.ReadFileOrHTTP(path)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("cannot read Prometheus config from %q: %w", path, err) return nil, fmt.Errorf("cannot read Prometheus config from %q: %w", path, err)
} }
var c Config var c Config
dataNew, err := c.parseData(data, path) if err := c.parseData(data, path); err != nil {
if err != nil { return nil, fmt.Errorf("cannot parse Prometheus config from %q: %w", path, err)
return nil, nil, fmt.Errorf("cannot parse Prometheus config from %q: %w", path, err)
} }
return &c, dataNew, nil return &c, nil
} }
func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]*ScrapeConfig, []byte, error) { func mustLoadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) []*ScrapeConfig {
var scrapeConfigs []*ScrapeConfig var scrapeConfigs []*ScrapeConfig
var scsData []byte
for _, filePath := range scrapeConfigFiles { for _, filePath := range scrapeConfigFiles {
filePath := fs.GetFilepath(baseDir, filePath) filePath := fs.GetFilepath(baseDir, filePath)
paths := []string{filePath} paths := []string{filePath}
if strings.Contains(filePath, "*") { if strings.Contains(filePath, "*") {
ps, err := filepath.Glob(filePath) ps, err := filepath.Glob(filePath)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("invalid pattern %q: %w", filePath, err) logger.Errorf("skipping pattern %q at `scrape_config_files` because of error: %s", filePath, err)
continue
} }
sort.Strings(ps) sort.Strings(ps)
paths = ps paths = ps
@ -430,22 +448,23 @@ func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]*Scrap
for _, path := range paths { for _, path := range paths {
data, err := fs.ReadFileOrHTTP(path) data, err := fs.ReadFileOrHTTP(path)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("cannot load %q: %w", path, err) logger.Errorf("skipping %q at `scrape_config_files` because of error: %s", path, err)
continue
} }
data, err = envtemplate.ReplaceBytes(data) data, err = envtemplate.ReplaceBytes(data)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("cannot expand environment vars in %q: %w", path, err) logger.Errorf("skipping %q at `scrape_config_files` because of failure to expand environment vars: %s", path, err)
continue
} }
var scs []*ScrapeConfig var scs []*ScrapeConfig
if err = yaml.UnmarshalStrict(data, &scs); err != nil { if err = yaml.UnmarshalStrict(data, &scs); err != nil {
return nil, nil, fmt.Errorf("cannot parse %q: %w", path, err) logger.Errorf("skipping %q at `scrape_config_files` because of failure to parse it: %s", path, err)
continue
} }
scrapeConfigs = append(scrapeConfigs, scs...) scrapeConfigs = append(scrapeConfigs, scs...)
scsData = append(scsData, '\n')
scsData = append(scsData, data...)
} }
} }
return scrapeConfigs, scsData, nil return scrapeConfigs
} }
// IsDryRun returns true if -promscrape.config.dryRun command-line flag is set // IsDryRun returns true if -promscrape.config.dryRun command-line flag is set
@ -453,54 +472,56 @@ func IsDryRun() bool {
return *dryRun return *dryRun
} }
func (cfg *Config) parseData(data []byte, path string) ([]byte, error) { func (cfg *Config) parseData(data []byte, path string) error {
if err := cfg.unmarshal(data, *strictParse); err != nil { if err := cfg.unmarshal(data, *strictParse); err != nil {
return nil, fmt.Errorf("cannot unmarshal data: %w", err) cfg.ScrapeConfigs = nil
return fmt.Errorf("cannot unmarshal data: %w", err)
} }
absPath, err := filepath.Abs(path) absPath, err := filepath.Abs(path)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain abs path for %q: %w", path, err) cfg.ScrapeConfigs = nil
return fmt.Errorf("cannot obtain abs path for %q: %w", path, err)
} }
cfg.baseDir = filepath.Dir(absPath) cfg.baseDir = filepath.Dir(absPath)
// Load cfg.ScrapeConfigFiles into c.ScrapeConfigs // Load cfg.ScrapeConfigFiles into c.ScrapeConfigs
scs, scsData, err := loadScrapeConfigFiles(cfg.baseDir, cfg.ScrapeConfigFiles) scs := mustLoadScrapeConfigFiles(cfg.baseDir, cfg.ScrapeConfigFiles)
if err != nil {
return nil, fmt.Errorf("cannot load `scrape_config_files` from %q: %w", path, err)
}
cfg.ScrapeConfigFiles = nil cfg.ScrapeConfigFiles = nil
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scs...) cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, scs...)
dataNew := append(data, scsData...)
// Check that all the scrape configs have unique JobName // Check that all the scrape configs have unique JobName
m := make(map[string]struct{}, len(cfg.ScrapeConfigs)) m := make(map[string]struct{}, len(cfg.ScrapeConfigs))
for _, sc := range cfg.ScrapeConfigs { for _, sc := range cfg.ScrapeConfigs {
jobName := sc.JobName jobName := sc.JobName
if _, ok := m[jobName]; ok { if _, ok := m[jobName]; ok {
return nil, fmt.Errorf("duplicate `job_name` in `scrape_configs` loaded from %q: %q", path, jobName) cfg.ScrapeConfigs = nil
return fmt.Errorf("duplicate `job_name` in `scrape_configs` loaded from %q: %q", path, jobName)
} }
m[jobName] = struct{}{} m[jobName] = struct{}{}
} }
// Initialize cfg.ScrapeConfigs // Initialize cfg.ScrapeConfigs
var validScrapeConfigs []*ScrapeConfig validScrapeConfigs := cfg.ScrapeConfigs[:0]
for i, sc := range cfg.ScrapeConfigs { for _, sc := range cfg.ScrapeConfigs {
// Make a copy of sc in order to remove references to `data` memory. // Make a copy of sc in order to remove references to `data` memory.
// This should prevent from memory leaks on config reload. // This should prevent from memory leaks on config reload.
sc = sc.clone() sc = sc.clone()
cfg.ScrapeConfigs[i] = sc
swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global) swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global)
if err != nil { if err != nil {
// print error and skip invalid scrape config logger.Errorf("skipping `scrape_config` for job_name=%s because of error: %s", sc.JobName, err)
logger.Errorf("cannot parse `scrape_config` for job %q, skip it: %w", sc.JobName, err)
continue continue
} }
sc.swc = swc sc.swc = swc
validScrapeConfigs = append(validScrapeConfigs, sc) validScrapeConfigs = append(validScrapeConfigs, sc)
} }
tailScrapeConfigs := cfg.ScrapeConfigs[len(validScrapeConfigs):]
cfg.ScrapeConfigs = validScrapeConfigs cfg.ScrapeConfigs = validScrapeConfigs
return dataNew, nil for i := range tailScrapeConfigs {
tailScrapeConfigs[i] = nil
}
return nil
} }
func (sc *ScrapeConfig) clone() *ScrapeConfig { func (sc *ScrapeConfig) clone() *ScrapeConfig {

File diff suppressed because it is too large Load diff

View file

@ -86,7 +86,7 @@ func getMXAddrLabels(ctx context.Context, sdc *SDConfig) []*promutils.Labels {
for range sdc.Names { for range sdc.Names {
r := <-ch r := <-ch
if r.err != nil { if r.err != nil {
logger.Errorf("error in MX lookup for %q; skipping it; error: %s", r.name, r.err) logger.Errorf("dns_sd_config: skipping MX lookup for %q because of error: %s", r.name, r.err)
continue continue
} }
for _, mx := range r.mx { for _, mx := range r.mx {
@ -121,7 +121,7 @@ func getSRVAddrLabels(ctx context.Context, sdc *SDConfig) []*promutils.Labels {
for range sdc.Names { for range sdc.Names {
r := <-ch r := <-ch
if r.err != nil { if r.err != nil {
logger.Errorf("error in SRV lookup for %q; skipping it; error: %s", r.name, r.err) logger.Errorf("dns_sd_config: skipping SRV lookup for %q because of error: %s", r.name, r.err)
continue continue
} }
for _, a := range r.as { for _, a := range r.as {

View file

@ -50,7 +50,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
case "nodes": case "nodes":
return getNodesLabels(cfg) return getNodesLabels(cfg)
default: default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `tasks`, `services` or `nodes`; skipping it", sdc.Role) return nil, fmt.Errorf("skipping unexpected role=%q; must be one of `tasks`, `services` or `nodes`", sdc.Role)
} }
} }

View file

@ -72,7 +72,7 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
if sdc.Namespaces.OwnNamespace { if sdc.Namespaces.OwnNamespace {
namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil { if err != nil {
logger.Fatalf("cannot determine namespace for the current pod according to `own_namespace: true` option in kubernetes_sd_config: %s", err) logger.Panicf("FATAL: cannot determine namespace for the current pod according to `own_namespace: true` option in kubernetes_sd_config: %s", err)
} }
namespaces = []string{string(namespace)} namespaces = []string{string(namespace)}
} }
@ -223,9 +223,13 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
if proxyURL != nil { if proxyURL != nil {
proxy = http.ProxyURL(proxyURL) proxy = http.ProxyURL(proxyURL)
} }
tlsConfig, err := ac.NewTLSConfig()
if err != nil {
logger.Panicf("FATAL: cannot initialize tls config: %s", err)
}
client := &http.Client{ client := &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
TLSClientConfig: ac.NewTLSConfig(), TLSClientConfig: tlsConfig,
Proxy: proxy, Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: *apiServerTimeout, IdleConnTimeout: *apiServerTimeout,
@ -239,9 +243,11 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
selectors: selectors, selectors: selectors,
attachNodeMetadata: attachNodeMetadata, attachNodeMetadata: attachNodeMetadata,
setHeaders: func(req *http.Request) error { return ac.SetHeaders(req, true) }, setHeaders: func(req *http.Request) error {
client: client, return ac.SetHeaders(req, true)
m: make(map[string]*urlWatcher), },
client: client,
m: make(map[string]*urlWatcher),
} }
} }
@ -418,11 +424,10 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http
} }
req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
if err != nil { if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err) logger.Panicf("FATAL: cannot create a request for %q: %s", requestURL, err)
} }
err = gw.setHeaders(req) if err := gw.setHeaders(req); err != nil {
if err != nil { return nil, fmt.Errorf("cannot set request headers: %w", err)
return nil, err
} }
resp, err := gw.client.Do(req) resp, err := gw.client.Do(req)
if err != nil { if err != nil {

View file

@ -92,10 +92,15 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
ac, err := opts.NewConfig() ac, err := opts.NewConfig()
if err != nil { if err != nil {
cfg.client.CloseIdleConnections() cfg.client.CloseIdleConnections()
return nil, err return nil, fmt.Errorf("cannot parse TLS config: %w", err)
}
tlsConfig, err := ac.NewTLSConfig()
if err != nil {
cfg.client.CloseIdleConnections()
return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
} }
cfg.client.Transport = &http.Transport{ cfg.client.Transport = &http.Transport{
TLSClientConfig: ac.NewTLSConfig(), TLSClientConfig: tlsConfig,
MaxIdleConnsPerHost: 100, MaxIdleConnsPerHost: 100,
} }
} }
@ -121,7 +126,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint) parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint)
if err != nil { if err != nil {
cfg.client.CloseIdleConnections() cfg.client.CloseIdleConnections()
return nil, fmt.Errorf("cannot parse identity_endpoint: %s as url, err: %w", sdcAuth.IdentityEndpoint, err) return nil, fmt.Errorf("cannot parse identity_endpoint %s as url: %w", sdcAuth.IdentityEndpoint, err)
} }
cfg.endpoint = parsedURL cfg.endpoint = parsedURL
tokenReq, err := buildAuthRequestBody(&sdcAuth) tokenReq, err := buildAuthRequestBody(&sdcAuth)
@ -144,7 +149,7 @@ func getCreds(cfg *apiConfig) (*apiCredentials, error) {
resp, err := cfg.client.Post(apiURL.String(), "application/json", bytes.NewBuffer(cfg.authTokenReq)) resp, err := cfg.client.Post(apiURL.String(), "application/json", bytes.NewBuffer(cfg.authTokenReq))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed query openstack identity api, url: %s, err: %w", apiURL.String(), err) return nil, fmt.Errorf("failed query openstack identity api at url %s: %w", apiURL.String(), err)
} }
r, err := io.ReadAll(resp.Body) r, err := io.ReadAll(resp.Body)
_ = resp.Body.Close() _ = resp.Body.Close()

View file

@ -51,7 +51,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
case "instance": case "instance":
return getInstancesLabels(cfg) return getInstancesLabels(cfg)
default: default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `instance` or `hypervisor`; skipping it", sdc.Role) return nil, fmt.Errorf("skipping unexpected role=%q; must be one of `instance` or `hypervisor`", sdc.Role)
} }
} }

View file

@ -56,10 +56,14 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
TLSConfig: sdc.TLSConfig, TLSConfig: sdc.TLSConfig,
} }
ac, err := opts.NewConfig() ac, err := opts.NewConfig()
if err != nil {
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
}
tlsConfig, err := ac.NewTLSConfig()
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot initialize TLS config: %w", err) return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
} }
transport.TLSClientConfig = ac.NewTLSConfig() transport.TLSClientConfig = tlsConfig
} }
cfg := &apiConfig{ cfg := &apiConfig{
client: &http.Client{ client: &http.Client{

View file

@ -34,7 +34,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
case "compute": case "compute":
return getInstancesLabels(cfg) return getInstancesLabels(cfg)
default: default:
return nil, fmt.Errorf("unexpected `service`: %q; only `compute` supported yet; skipping it", sdc.Service) return nil, fmt.Errorf("skipping unexpected service=%q; only `compute` supported for now", sdc.Service)
} }
} }

View file

@ -109,7 +109,11 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
isTLS := u.Scheme == "https" isTLS := u.Scheme == "https"
var tlsCfg *tls.Config var tlsCfg *tls.Config
if isTLS { if isTLS {
tlsCfg = ac.NewTLSConfig() var err error
tlsCfg, err = ac.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
} }
var proxyURLFunc func(*http.Request) (*url.URL, error) var proxyURLFunc func(*http.Request) (*url.URL, error)
@ -247,13 +251,11 @@ func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, clien
return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err) return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err)
} }
err = c.setHTTPHeaders(req) if err := c.setHTTPHeaders(req); err != nil {
if err != nil { return nil, fmt.Errorf("cannot set request headers for %q: %w", requestURL, err)
return nil, fmt.Errorf("cannot set request http header for %q: %w", requestURL, err)
} }
err = c.setHTTPProxyHeaders(req) if err := c.setHTTPProxyHeaders(req); err != nil {
if err != nil { return nil, fmt.Errorf("cannot set request proxy headers for %q: %w", requestURL, err)
return nil, fmt.Errorf("cannot set request http proxy header for %q: %w", requestURL, err)
} }
if modifyRequest != nil { if modifyRequest != nil {
modifyRequest(req) modifyRequest(req)

View file

@ -1,7 +1,6 @@
package promscrape package promscrape
import ( import (
"bytes"
"context" "context"
"flag" "flag"
"fmt" "fmt"
@ -36,8 +35,8 @@ import (
) )
var ( var (
configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in '-promscrape.config' file. "+ configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in -promscrape.config file. "+
"By default, the checking is disabled. Send SIGHUP signal in order to force config check for changes") "By default, the checking is disabled. See how to reload -promscrape.config file at https://docs.victoriametrics.com/vmagent.html#configuration-update")
suppressDuplicateScrapeTargetErrors = flag.Bool("promscrape.suppressDuplicateScrapeTargetErrors", false, "Whether to suppress 'duplicate scrape target' errors; "+ suppressDuplicateScrapeTargetErrors = flag.Bool("promscrape.suppressDuplicateScrapeTargetErrors", false, "Whether to suppress 'duplicate scrape target' errors; "+
"see https://docs.victoriametrics.com/vmagent.html#troubleshooting for details") "see https://docs.victoriametrics.com/vmagent.html#troubleshooting for details")
promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+ promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
@ -53,7 +52,7 @@ func CheckConfig() error {
if *promscrapeConfigFile == "" { if *promscrapeConfigFile == "" {
return nil return nil
} }
_, _, err := loadConfig(*promscrapeConfigFile) _, err := loadConfig(*promscrapeConfigFile)
return err return err
} }
@ -110,8 +109,8 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan() sighupCh := procutil.NewSighupChan()
logger.Infof("reading Prometheus configs from %q", configFile) logger.Infof("reading scrape configs from %q", configFile)
cfg, data, err := loadConfig(configFile) cfg, err := loadConfig(configFile)
if err != nil { if err != nil {
logger.Fatalf("cannot read %q: %s", configFile, err) logger.Fatalf("cannot read %q: %s", configFile, err)
} }
@ -154,41 +153,40 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
select { select {
case <-sighupCh: case <-sighupCh:
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile) logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
cfgNew, dataNew, err := loadConfig(configFile) cfgNew, err := loadConfig(configFile)
if err != nil { if err != nil {
configReloadErrors.Inc() configReloadErrors.Inc()
configSuccess.Set(0) configSuccess.Set(0)
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err) logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
goto waitForChans goto waitForChans
} }
if bytes.Equal(data, dataNew) { configSuccess.Set(1)
configSuccess.Set(1) if !cfgNew.mustRestart(cfg) {
logger.Infof("nothing changed in %q", configFile) logger.Infof("nothing changed in %q", configFile)
goto waitForChans goto waitForChans
} }
cfgNew.mustRestart(cfg)
cfg = cfgNew cfg = cfgNew
data = dataNew marshaledData = cfg.marshal()
marshaledData = cfgNew.marshal()
configData.Store(&marshaledData) configData.Store(&marshaledData)
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
case <-tickerCh: case <-tickerCh:
cfgNew, dataNew, err := loadConfig(configFile) cfgNew, err := loadConfig(configFile)
if err != nil { if err != nil {
configReloadErrors.Inc() configReloadErrors.Inc()
configSuccess.Set(0) configSuccess.Set(0)
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err) logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
goto waitForChans goto waitForChans
} }
if bytes.Equal(data, dataNew) { configSuccess.Set(1)
configSuccess.Set(1) if !cfgNew.mustRestart(cfg) {
// Nothing changed since the previous loadConfig
goto waitForChans goto waitForChans
} }
cfgNew.mustRestart(cfg)
cfg = cfgNew cfg = cfgNew
data = dataNew marshaledData = cfg.marshal()
marshaledData = cfgNew.marshal()
configData.Store(&marshaledData) configData.Store(&marshaledData)
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
case <-globalStopCh: case <-globalStopCh:
cfg.mustStop() cfg.mustStop()
logger.Infof("stopping Prometheus scrapers") logger.Infof("stopping Prometheus scrapers")
@ -197,10 +195,6 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
return return
} }
logger.Infof("found changes in %q; applying these changes", configFile)
configReloads.Inc()
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
} }
} }
@ -407,8 +401,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
for _, sw := range swsToStart { for _, sw := range swsToStart {
sc, err := newScraper(sw, sg.name, sg.pushData) sc, err := newScraper(sw, sg.name, sg.pushData)
if err != nil { if err != nil {
// print error and skip invalid scraper config logger.Errorf("skipping scraper for url=%s, job=%s because of error: %s", sw.ScrapeURL, sg.name, err)
logger.Errorf("cannot create scraper to %q in job %q, will skip it: %w", sw.ScrapeURL, sg.name, err)
continue continue
} }
sg.activeScrapers.Inc() sg.activeScrapers.Inc()
@ -455,7 +448,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *
} }
c, err := newClient(ctx, sw) c, err := newClient(ctx, sw)
if err != nil { if err != nil {
return &scraper{}, err return nil, err
} }
sc.sw.Config = sw sc.sw.Config = sw
sc.sw.ScrapeGroup = group sc.sw.ScrapeGroup = group

View file

@ -213,7 +213,7 @@ func unmarshalRow(dst []Row, s string, tagsPool []Tag, fieldsPool []Field, noEsc
tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars) tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars)
if err != nil { if err != nil {
dst = dst[:len(dst)-1] dst = dst[:len(dst)-1]
logger.Errorf("cannot unmarshal InfluxDB line %q: %s; skipping it", s, err) logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err)
invalidLines.Inc() invalidLines.Inc()
} }
return dst, tagsPool, fieldsPool return dst, tagsPool, fieldsPool

View file

@ -236,7 +236,7 @@ func unmarshalRow(dst []Row, s string, tu *tagsUnmarshaler) []Row {
r := &dst[len(dst)-1] r := &dst[len(dst)-1]
if err := r.unmarshal(s, tu); err != nil { if err := r.unmarshal(s, tu); err != nil {
dst = dst[:len(dst)-1] dst = dst[:len(dst)-1]
logger.Errorf("cannot unmarshal json line %q: %s; skipping it", s, err) logger.Errorf("skipping json line %q because of error: %s", s, err)
invalidLines.Inc() invalidLines.Inc()
} }
return dst return dst

View file

@ -76,7 +76,7 @@ func (u *URL) String() string {
func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error { func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error {
ah, err := u.getAuthHeader(ac) ah, err := u.getAuthHeader(ac)
if err != nil { if err != nil {
return err return fmt.Errorf("cannot obtain Proxy-Authorization headers: %w", err)
} }
if ah != "" { if ah != "" {
req.Header.Set("Proxy-Authorization", ah) req.Header.Set("Proxy-Authorization", ah)
@ -88,7 +88,7 @@ func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error {
func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) error { func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) error {
ah, err := u.getAuthHeader(ac) ah, err := u.getAuthHeader(ac)
if err != nil { if err != nil {
return err return fmt.Errorf("cannot obtain Proxy-Authorization headers: %w", err)
} }
if ah != "" { if ah != "" {
req.Header.Set("Proxy-Authorization", ah) req.Header.Set("Proxy-Authorization", ah)
@ -155,7 +155,11 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) {
proxyAddr := addMissingPort(pu.Host, isTLS) proxyAddr := addMissingPort(pu.Host, isTLS)
var tlsCfg *tls.Config var tlsCfg *tls.Config
if isTLS { if isTLS {
tlsCfg = ac.NewTLSConfig() var err error
tlsCfg, err = ac.NewTLSConfig()
if err != nil {
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
}
if !tlsCfg.InsecureSkipVerify && tlsCfg.ServerName == "" { if !tlsCfg.InsecureSkipVerify && tlsCfg.ServerName == "" {
tlsCfg.ServerName = tlsServerName(proxyAddr) tlsCfg.ServerName = tlsServerName(proxyAddr)
} }
@ -173,7 +177,7 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) {
} }
authHeader, err := u.getAuthHeader(ac) authHeader, err := u.getAuthHeader(ac)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot get auth header: %w", err) return nil, fmt.Errorf("cannot obtain Proxy-Authorization header: %w", err)
} }
if authHeader != "" { if authHeader != "" {
authHeader = "Proxy-Authorization: " + authHeader + "\r\n" authHeader = "Proxy-Authorization: " + authHeader + "\r\n"