diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 56d0f7210..5403f8a9f 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -323,26 +323,32 @@ func (c *client) runWorker() { } func (c *client) doRequest(url string, body []byte) (*http.Response, error) { - req := c.newRequest(url, body) + req, err := c.newRequest(url, body) + if err != nil { + return nil, err + } resp, err := c.hc.Do(req) if err != nil && errors.Is(err, io.EOF) { // it is likely connection became stale. // So we do one more attempt in hope request will succeed. // If not, the error should be handled by the caller as usual. // This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139 - req = c.newRequest(url, body) + req, _ = c.newRequest(url, body) resp, err = c.hc.Do(req) } return resp, err } -func (c *client) newRequest(url string, body []byte) *http.Request { +func (c *client) newRequest(url string, body []byte) (*http.Request, error) { reqBody := bytes.NewBuffer(body) req, err := http.NewRequest(http.MethodPost, url, reqBody) if err != nil { logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", url, err) } - c.authCfg.SetHeaders(req, true) + err = c.authCfg.SetHeaders(req, true) + if err != nil { + return nil, err + } h := req.Header h.Set("User-Agent", "vmagent") h.Set("Content-Type", "application/x-protobuf") @@ -360,7 +366,7 @@ func (c *client) newRequest(url string, body []byte) *http.Request { logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err) } } - return req + return req, nil } // sendBlockHTTP sends the given block to c.remoteWriteURL. diff --git a/app/vmalert/datasource/init.go b/app/vmalert/datasource/init.go index 2c8922acc..83eb6c21f 100644 --- a/app/vmalert/datasource/init.go +++ b/app/vmalert/datasource/init.go @@ -111,6 +111,10 @@ func Init(extraParams url.Values) (QuerierBuilder, error) { if err != nil { return nil, fmt.Errorf("failed to configure auth: %w", err) } + _, err = authCfg.GetAuthHeader() + if err != nil { + return nil, fmt.Errorf("failed to set request auth header to datasource %q: %s", *addr, err) + } return &VMStorage{ c: &http.Client{Transport: tr}, diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index 5267cbba2..a4c9a9a07 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -137,12 +137,15 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati // Query executes the given query and returns parsed response func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) { - req := s.newQueryRequest(query, ts) + req, err := s.newQueryRequest(query, ts) + if err != nil { + return Result{}, nil, err + } resp, err := s.do(ctx, req) if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { // something in the middle between client and datasource might be closing // the connection. So we do a one more attempt in hope request will succeed. - req = s.newQueryRequest(query, ts) + req, _ = s.newQueryRequest(query, ts) resp, err = s.do(ctx, req) } if err != nil { @@ -173,12 +176,15 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim if end.IsZero() { return res, fmt.Errorf("end param is missing") } - req := s.newQueryRangeRequest(query, start, end) + req, err := s.newQueryRangeRequest(query, start, end) + if err != nil { + return Result{}, err + } resp, err := s.do(ctx, req) if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { // something in the middle between client and datasource might be closing // the connection. So we do a one more attempt in hope request will succeed. - req = s.newQueryRangeRequest(query, start, end) + req, _ = s.newQueryRangeRequest(query, start, end) resp, err = s.do(ctx, req) } if err != nil { @@ -210,14 +216,20 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, return resp, nil } -func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) *http.Request { - req := s.newRequest() +func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) (*http.Request, error) { + req, err := s.newRequest() + if err != nil { + return nil, fmt.Errorf("cannot create query_range request to datasource %q: %s", s.datasourceURL, err) + } s.setPrometheusRangeReqParams(req, query, start, end) - return req + return req, nil } -func (s *VMStorage) newQueryRequest(query string, ts time.Time) *http.Request { - req := s.newRequest() +func (s *VMStorage) newQueryRequest(query string, ts time.Time) (*http.Request, error) { + req, err := s.newRequest() + if err != nil { + return nil, fmt.Errorf("cannot create query request to datasource %q: %s", s.datasourceURL, err) + } switch s.dataSourceType { case "", datasourcePrometheus: s.setPrometheusInstantReqParams(req, query, ts) @@ -226,20 +238,23 @@ func (s *VMStorage) newQueryRequest(query string, ts time.Time) *http.Request { default: logger.Panicf("BUG: engine not found: %q", s.dataSourceType) } - return req + return req, nil } -func (s *VMStorage) newRequest() *http.Request { +func (s *VMStorage) newRequest() (*http.Request, error) { req, err := http.NewRequest(http.MethodPost, s.datasourceURL, nil) if err != nil { logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err) } req.Header.Set("Content-Type", "application/json") if s.authCfg != nil { - s.authCfg.SetHeaders(req, true) + err = s.authCfg.SetHeaders(req, true) + if err != nil { + return nil, err + } } for _, h := range s.extraHeaders { req.Header.Set(h.key, h.value) } - return req + return req, nil } diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index 757b012b7..769797438 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -637,7 +637,10 @@ func TestRequestParams(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - req := tc.vm.newRequest() + req, err := tc.vm.newRequest() + if err != nil { + t.Fatal(err) + } switch tc.vm.dataSourceType { case "", datasourcePrometheus: if tc.queryRange { @@ -732,7 +735,10 @@ func TestHeaders(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { vm := tt.vmFn() - req := vm.newQueryRequest("foo", time.Now()) + req, err := vm.newQueryRequest("foo", time.Now()) + if err != nil { + t.Fatal(err) + } tt.checkFn(t, req) }) } diff --git a/app/vmalert/notifier/alertmanager.go b/app/vmalert/notifier/alertmanager.go index 8d1abbb83..3352bda25 100644 --- a/app/vmalert/notifier/alertmanager.go +++ b/app/vmalert/notifier/alertmanager.go @@ -88,7 +88,10 @@ func (am *AlertManager) send(ctx context.Context, alerts []Alert, headers map[st req = req.WithContext(ctx) if am.authCfg != nil { - am.authCfg.SetHeaders(req, true) + err = am.authCfg.SetHeaders(req, true) + if err != nil { + return err + } } resp, err := am.client.Do(req) if err != nil { diff --git a/app/vmalert/remotewrite/client.go b/app/vmalert/remotewrite/client.go index 2fcdcbf16..2b8549de8 100644 --- a/app/vmalert/remotewrite/client.go +++ b/app/vmalert/remotewrite/client.go @@ -280,7 +280,10 @@ func (c *Client) send(ctx context.Context, data []byte) error { req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") if c.authCfg != nil { - c.authCfg.SetHeaders(req, true) + err = c.authCfg.SetHeaders(req, true) + if err != nil { + return &nonRetriableError{err: err} + } } if !*disablePathAppend { req.URL.Path = path.Join(req.URL.Path, "/api/v1/write") diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b6976d5bd..788befdc9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -38,6 +38,7 @@ The sandbox cluster installation is running under the constant load generated by This also means that `datasource.queryTimeAlignment` command-line flag becomes deprecated now and will have no effect if configured. If `datasource.queryTimeAlignment` was set to `false` before, then `eval_alignment` has to be set to `false` explicitly under group. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support data ingestion from [NewRelic infrastructure agent](https://docs.newrelic.com/docs/infrastructure/install-infrastructure-agent). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-newrelic-agent), [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): skip job with error logs if there is incorrect syntax under `scrape_configs`, previously will exit. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153). * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-filestream.disableFadvise` command-line flag, which can be used for disabling `fadvise` syscall during backup upload to the remote storage. By default `vmbackup` uses `fadvise` syscall in order to prevent from eviction of recently accessed data from the [OS page cache](https://en.wikipedia.org/wiki/Page_cache) when backing up large files. Sometimes the `fadvise` syscall may take significant amounts of CPU when the backup is performed with large value of `-concurrency` command-line flag on systems with big number of CPU cores. In this case it is better to manually disable `fadvise` syscall by passing `-filestream.disableFadvise` command-line flag to `vmbackup`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120) for details. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add `-deleteAllObjectVersions` command-line flag, which can be used for forcing removal of all object versions in remote object storage. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5121) issue and [these docs](https://docs.victoriametrics.com/vmbackup.html#permanent-deletion-of-objects-in-s3-compatible-storages) for the details. * FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): account for `vmauth` component for alerts `ServiceDown` and `TooManyRestarts`. @@ -50,6 +51,7 @@ The sandbox cluster installation is running under the constant load generated by * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): fix vmalert web UI when running on 32-bit architectures machine. +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): don't send requests if there is wrong auth config in `datasource`, `remoteWrite`, `remoteRead` and `notifier` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153). * BUGFIX: [vmselect](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087) for details. * BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component. * BUGFIX: dashboards/vmalert: apply `desc` sorting in tooltips for vmalert dashboard in order to improve visibility of the outliers on graph. @@ -57,7 +59,7 @@ The sandbox cluster installation is running under the constant load generated by * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): bump hard-coded limit for search query size at `vmstorage` from 1MB to 5MB. The change should be more suitable for real-world scenarios and protect vmstorage from excessive memory usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154) for details * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix error when creating an incremental backup with the `-origin` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5144) for details. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix vmagent ignoring configuration reload for streaming aggregation if it was started with empty streaming aggregation config. Thanks to @aluode99 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5178). - +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): don't send requests if there is wrong auth config in `scrape_configs` and `remoteWrite` section, previously will send requests without auth header. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5153). ## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0) diff --git a/lib/promauth/config.go b/lib/promauth/config.go index 491fd166e..aac56b2b4 100644 --- a/lib/promauth/config.go +++ b/lib/promauth/config.go @@ -14,7 +14,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/fasthttp" "github.com/cespare/xxhash/v2" @@ -194,9 +193,6 @@ type oauth2ConfigInternal struct { } func newOAuth2ConfigInternal(baseDir string, o *OAuth2Config) (*oauth2ConfigInternal, error) { - if err := o.validate(); err != nil { - return nil, err - } oi := &oauth2ConfigInternal{ cfg: &clientcredentials.Config{ ClientID: o.ClientID, @@ -280,7 +276,7 @@ type Config struct { getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error) tlsCertDigest string - getAuthHeader func() string + getAuthHeader func() (string, error) authHeaderLock sync.Mutex authHeader string authHeaderDeadline uint64 @@ -325,45 +321,58 @@ func (ac *Config) HeadersNoAuthString() string { } // SetHeaders sets the configured ac headers to req. -func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) { +func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) error { reqHeaders := req.Header for _, h := range ac.headers { reqHeaders.Set(h.key, h.value) } if setAuthHeader { - if ah := ac.GetAuthHeader(); ah != "" { + ah, err := ac.GetAuthHeader() + if err != nil { + return fmt.Errorf("failed to set request auth header: %w", err) + } + if ah != "" { reqHeaders.Set("Authorization", ah) } } + return nil } // SetFasthttpHeaders sets the configured ac headers to req. -func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool) { +func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool) error { reqHeaders := &req.Header for _, h := range ac.headers { reqHeaders.Set(h.key, h.value) } if setAuthHeader { - if ah := ac.GetAuthHeader(); ah != "" { + ah, err := ac.GetAuthHeader() + if err != nil { + return err + } + if ah != "" { reqHeaders.Set("Authorization", ah) } } + return nil } // GetAuthHeader returns optional `Authorization: ...` http header. -func (ac *Config) GetAuthHeader() string { +func (ac *Config) GetAuthHeader() (string, error) { f := ac.getAuthHeader if f == nil { - return "" + return "", nil } ac.authHeaderLock.Lock() defer ac.authHeaderLock.Unlock() if fasttime.UnixTimestamp() > ac.authHeaderDeadline { - ac.authHeader = f() + var err error + if ac.authHeader, err = f(); err != nil { + return "", err + } // Cache the authHeader for a second. ac.authHeaderDeadline = fasttime.UnixTimestamp() + 1 } - return ac.authHeader + return ac.authHeader, nil } // String returns human-readable representation for ac. @@ -564,7 +573,7 @@ func (opts *Options) NewConfig() (*Config, error) { type authContext struct { // getAuthHeader must return for 'Authorization: ' http request header - getAuthHeader func() string + getAuthHeader func() (string, error) // authDigest must contain the digest for the used authorization // The digest must be changed whenever the original config changes. @@ -577,8 +586,8 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization azType = az.Type } if az.CredentialsFile == "" { - actx.getAuthHeader = func() string { - return azType + " " + az.Credentials.String() + actx.getAuthHeader = func() (string, error) { + return azType + " " + az.Credentials.String(), nil } actx.authDigest = fmt.Sprintf("custom(type=%q, creds=%q)", az.Type, az.Credentials) return nil @@ -587,13 +596,12 @@ func (actx *authContext) initFromAuthorization(baseDir string, az *Authorization return fmt.Errorf("both `credentials`=%q and `credentials_file`=%q are set", az.Credentials, az.CredentialsFile) } filePath := fs.GetFilepath(baseDir, az.CredentialsFile) - actx.getAuthHeader = func() string { + actx.getAuthHeader = func() (string, error) { token, err := readPasswordFromFile(filePath) if err != nil { - logger.Errorf("cannot read credentials from `credentials_file`=%q: %s", az.CredentialsFile, err) - return "" + return "", fmt.Errorf("cannot read credentials from `credentials_file`=%q: %s", az.CredentialsFile, err) } - return azType + " " + token + return azType + " " + token, nil } actx.authDigest = fmt.Sprintf("custom(type=%q, credsFile=%q)", az.Type, filePath) return nil @@ -604,11 +612,11 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo return fmt.Errorf("missing `username` in `basic_auth` section") } if ba.PasswordFile == "" { - actx.getAuthHeader = func() string { + actx.getAuthHeader = func() (string, error) { // See https://en.wikipedia.org/wiki/Basic_access_authentication token := ba.Username + ":" + ba.Password.String() token64 := base64.StdEncoding.EncodeToString([]byte(token)) - return "Basic " + token64 + return "Basic " + token64, nil } actx.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password) return nil @@ -617,16 +625,15 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo return fmt.Errorf("both `password`=%q and `password_file`=%q are set in `basic_auth` section", ba.Password, ba.PasswordFile) } filePath := fs.GetFilepath(baseDir, ba.PasswordFile) - actx.getAuthHeader = func() string { + actx.getAuthHeader = func() (string, error) { password, err := readPasswordFromFile(filePath) if err != nil { - logger.Errorf("cannot read password from `password_file`=%q set in `basic_auth` section: %s", ba.PasswordFile, err) - return "" + return "", fmt.Errorf("cannot read password from `password_file`=%q set in `basic_auth` section: %s", ba.PasswordFile, err) } // See https://en.wikipedia.org/wiki/Basic_access_authentication token := ba.Username + ":" + password token64 := base64.StdEncoding.EncodeToString([]byte(token)) - return "Basic " + token64 + return "Basic " + token64, nil } actx.authDigest = fmt.Sprintf("basic(username=%q, passwordFile=%q)", ba.Username, filePath) return nil @@ -634,51 +641,53 @@ func (actx *authContext) initFromBasicAuthConfig(baseDir string, ba *BasicAuthCo func (actx *authContext) initFromBearerTokenFile(baseDir string, bearerTokenFile string) error { filePath := fs.GetFilepath(baseDir, bearerTokenFile) - actx.getAuthHeader = func() string { + actx.getAuthHeader = func() (string, error) { token, err := readPasswordFromFile(filePath) if err != nil { - logger.Errorf("cannot read bearer token from `bearer_token_file`=%q: %s", bearerTokenFile, err) - return "" + return "", fmt.Errorf("cannot read bearer token from `bearer_token_file`=%q: %s", bearerTokenFile, err) } - return "Bearer " + token + return "Bearer " + token, nil } actx.authDigest = fmt.Sprintf("bearer(tokenFile=%q)", filePath) return nil } func (actx *authContext) initFromBearerToken(bearerToken string) error { - actx.getAuthHeader = func() string { - return "Bearer " + bearerToken + actx.getAuthHeader = func() (string, error) { + return "Bearer " + bearerToken, nil } actx.authDigest = fmt.Sprintf("bearer(token=%q)", bearerToken) return nil } func (actx *authContext) initFromOAuth2Config(baseDir string, o *OAuth2Config) error { - oi, err := newOAuth2ConfigInternal(baseDir, o) - if err != nil { + if err := o.validate(); err != nil { return err } - actx.getAuthHeader = func() string { + + actx.getAuthHeader = func() (string, error) { + oi, err := newOAuth2ConfigInternal(baseDir, o) + if err != nil { + return "", err + } ts, err := oi.getTokenSource() if err != nil { - logger.Errorf("cannot get OAuth2 tokenSource: %s", err) - return "" + return "", fmt.Errorf("cannot get OAuth2 tokenSource: %s", err) } t, err := ts.Token() if err != nil { - logger.Errorf("cannot get OAuth2 token: %s", err) - return "" + return "", fmt.Errorf("cannot get OAuth2 token: %s", err) } - return t.Type() + " " + t.AccessToken + return t.Type() + " " + t.AccessToken, nil } actx.authDigest = fmt.Sprintf("oauth2(%s)", o.String()) return nil } type tlsContext struct { - getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error) - tlsCertDigest string + getTLSCert func(*tls.CertificateRequestInfo) (*tls.Certificate, error) + tlsCertDigest string + rootCA *x509.CertPool serverName string insecureSkipVerify bool diff --git a/lib/promauth/config_test.go b/lib/promauth/config_test.go index 6bf0f9c4b..2c4c85005 100644 --- a/lib/promauth/config_test.go +++ b/lib/promauth/config_test.go @@ -10,10 +10,11 @@ import ( func TestNewConfig(t *testing.T) { tests := []struct { - name string - opts Options - wantErr bool - expectHeader string + name string + opts Options + wantErr bool + wantErrWhenSetHeader bool + expectHeader string }{ { name: "OAuth2 config", @@ -49,6 +50,21 @@ func TestNewConfig(t *testing.T) { }, wantErr: true, }, + { + name: "OAuth2 with wrong tls", + opts: Options{ + OAuth2: &OAuth2Config{ + ClientID: "some-id", + ClientSecret: NewSecret("some-secret"), + TokenURL: "http://localhost:8511", + TLSConfig: &TLSConfig{ + InsecureSkipVerify: true, + CAFile: "non-existing-file", + }, + }, + }, + wantErrWhenSetHeader: true, + }, { name: "basic Auth config", opts: Options{ @@ -69,6 +85,16 @@ func TestNewConfig(t *testing.T) { }, expectHeader: "Basic dXNlcjpzZWNyZXQtY29udGVudA==", }, + { + name: "basic Auth config with non-existing file", + opts: Options{ + BasicAuth: &BasicAuthConfig{ + Username: "user", + PasswordFile: "testdata/non-existing-file", + }, + }, + wantErrWhenSetHeader: true, + }, { name: "want Authorization", opts: Options{ @@ -96,6 +122,17 @@ func TestNewConfig(t *testing.T) { }, expectHeader: "Bearer some-token", }, + { + name: "tls with non-existing file", + opts: Options{ + BearerToken: "some-token", + TLSConfig: &TLSConfig{ + InsecureSkipVerify: true, + CAFile: "non-existing-file", + }, + }, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -104,7 +141,6 @@ func TestNewConfig(t *testing.T) { r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"access_token":"some-token","token_type": "Bearer"}`)) - }) mock := httptest.NewServer(r) tt.opts.OAuth2.TokenURL = mock.URL @@ -119,13 +155,19 @@ func TestNewConfig(t *testing.T) { if err != nil { t.Fatalf("unexpected error in http.NewRequest: %s", err) } - got.SetHeaders(req, true) + err = got.SetHeaders(req, true) + if (err != nil) != tt.wantErrWhenSetHeader { + t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader) + } ah := req.Header.Get("Authorization") if ah != tt.expectHeader { t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, tt.expectHeader) } var fhreq fasthttp.Request - got.SetFasthttpHeaders(&fhreq, true) + err = got.SetFasthttpHeaders(&fhreq, true) + if (err != nil) != tt.wantErrWhenSetHeader { + t.Errorf("NewConfig() error = %v, wantErr %v", err, tt.wantErrWhenSetHeader) + } ahb := fhreq.Header.Peek("Authorization") if string(ahb) != tt.expectHeader { t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, tt.expectHeader) @@ -191,7 +233,7 @@ func TestConfigHeaders(t *testing.T) { if result != resultExpected { t.Fatalf("unexpected result from HeadersNoAuthString; got\n%s\nwant\n%s", result, resultExpected) } - c.SetHeaders(req, false) + _ = c.SetHeaders(req, false) for _, h := range headersParsed { v := req.Header.Get(h.key) if v != h.value { @@ -199,7 +241,7 @@ func TestConfigHeaders(t *testing.T) { } } var fhreq fasthttp.Request - c.SetFasthttpHeaders(&fhreq, false) + _ = c.SetFasthttpHeaders(&fhreq, false) for _, h := range headersParsed { v := fhreq.Header.Peek(h.key) if string(v) != h.value { diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 7ec8c34e6..2f7a7a31b 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -49,10 +49,10 @@ type client struct { scrapeTimeoutSecondsStr string hostPort string requestURI string - setHeaders func(req *http.Request) - setProxyHeaders func(req *http.Request) - setFasthttpHeaders func(req *fasthttp.Request) - setFasthttpProxyHeaders func(req *fasthttp.Request) + setHeaders func(req *http.Request) error + setProxyHeaders func(req *http.Request) error + setFasthttpHeaders func(req *fasthttp.Request) error + setFasthttpProxyHeaders func(req *fasthttp.Request) error denyRedirects bool disableCompression bool disableKeepAlive bool @@ -81,7 +81,7 @@ func concatTwoStrings(x, y string) string { const scrapeUserAgent = "vm_promscrape" -func newClient(ctx context.Context, sw *ScrapeWork) *client { +func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) { var u fasthttp.URI u.Update(sw.ScrapeURL) hostPort := string(u.Host()) @@ -92,8 +92,8 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client { if isTLS { tlsCfg = sw.AuthConfig.NewTLSConfig() } - setProxyHeaders := func(req *http.Request) {} - setFasthttpProxyHeaders := func(req *fasthttp.Request) {} + setProxyHeaders := func(req *http.Request) error { return nil } + setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil } proxyURL := sw.ProxyURL if !isTLS && proxyURL.IsHTTPOrHTTPS() { // Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets @@ -107,11 +107,11 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client { tlsCfg = sw.ProxyAuthConfig.NewTLSConfig() } proxyURLOrig := proxyURL - setProxyHeaders = func(req *http.Request) { - proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req) + setProxyHeaders = func(req *http.Request) error { + return proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req) } - setFasthttpProxyHeaders = func(req *fasthttp.Request) { - proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req) + setFasthttpProxyHeaders = func(req *fasthttp.Request) error { + return proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req) } proxyURL = &proxy.URL{} } @@ -119,7 +119,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client { dialAddr = addMissingPort(dialAddr, isTLS) dialFunc, err := newStatDialFunc(proxyURL, sw.ProxyAuthConfig) if err != nil { - logger.Fatalf("cannot create dial func: %s", err) + return nil, fmt.Errorf("cannot create dial func: %s", err) } hc := &fasthttp.HostClient{ Addr: dialAddr, @@ -168,14 +168,14 @@ func newClient(ctx context.Context, sw *ScrapeWork) *client { scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()), hostPort: hostPort, requestURI: requestURI, - setHeaders: func(req *http.Request) { sw.AuthConfig.SetHeaders(req, true) }, + setHeaders: func(req *http.Request) error { return sw.AuthConfig.SetHeaders(req, true) }, setProxyHeaders: setProxyHeaders, - setFasthttpHeaders: func(req *fasthttp.Request) { sw.AuthConfig.SetFasthttpHeaders(req, true) }, + setFasthttpHeaders: func(req *fasthttp.Request) error { return sw.AuthConfig.SetFasthttpHeaders(req, true) }, setFasthttpProxyHeaders: setFasthttpProxyHeaders, denyRedirects: sw.DenyRedirects, disableCompression: sw.DisableCompression, disableKeepAlive: sw.DisableKeepAlive, - } + }, nil } func (c *client) GetStreamReader() (*streamReader, error) { @@ -196,8 +196,16 @@ func (c *client) GetStreamReader() (*streamReader, error) { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) req.Header.Set("User-Agent", scrapeUserAgent) - c.setHeaders(req) - c.setProxyHeaders(req) + err = c.setHeaders(req) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err) + } + err = c.setProxyHeaders(req) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err) + } scrapeRequests.Inc() resp, err := c.sc.Do(req) if err != nil { @@ -244,8 +252,14 @@ func (c *client) ReadData(dst []byte) ([]byte, error) { // Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) - c.setFasthttpHeaders(req) - c.setFasthttpProxyHeaders(req) + err := c.setFasthttpHeaders(req) + if err != nil { + return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err) + } + err = c.setFasthttpProxyHeaders(req) + if err != nil { + return nil, fmt.Errorf("failed to create request to %q: %s", c.scrapeURL, err) + } if !*disableCompression && !c.disableCompression { req.Header.Set("Accept-Encoding", "gzip") } @@ -263,7 +277,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) { ctx, cancel := context.WithDeadline(c.ctx, deadline) defer cancel() - err := doRequestWithPossibleRetry(ctx, c.hc, req, resp) + err = doRequestWithPossibleRetry(ctx, c.hc, req, resp) statusCode := resp.StatusCode() redirectsCount := 0 for err == nil && isStatusRedirect(statusCode) { diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index b598c6a70..131cbc397 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -482,6 +482,7 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) { } // Initialize cfg.ScrapeConfigs + var validScrapeConfigs []*ScrapeConfig for i, sc := range cfg.ScrapeConfigs { // Make a copy of sc in order to remove references to `data` memory. // This should prevent from memory leaks on config reload. @@ -490,10 +491,14 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) { swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global) if err != nil { - return nil, fmt.Errorf("cannot parse `scrape_config`: %w", err) + // print error and skip invalid scrape config + logger.Errorf("cannot parse `scrape_config` for job %q, skip it: %w", sc.JobName, err) + continue } sc.swc = swc + validScrapeConfigs = append(validScrapeConfigs, sc) } + cfg.ScrapeConfigs = validScrapeConfigs return dataNew, nil } diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index 6548c4849..455ef8cc9 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -109,7 +109,6 @@ scrape_configs: proxy_headers: - 'My-Auth-Header: top-secret' `) - } func TestNeedSkipScrapeWork(t *testing.T) { @@ -446,27 +445,27 @@ func getStaticScrapeWork(data []byte, path string) ([]*ScrapeWork, error) { return cfg.getStaticScrapeWork(), nil } -func TestGetStaticScrapeWorkFailure(t *testing.T) { - f := func(data string) { +func TestGetStaticScrapeWork(t *testing.T) { + f := func(data string, wantErr bool, validConfigNum int) { t.Helper() sws, err := getStaticScrapeWork([]byte(data), "non-existing-file") - if err == nil { - t.Fatalf("expecting non-nil error") + if err != nil != wantErr { + t.Fatalf("expect err %t", wantErr) } - if sws != nil { - t.Fatalf("expecting nil sws") + if !wantErr && len(sws) != validConfigNum { + t.Fatalf("got expected config num, expect %d", validConfigNum) } } // incorrect yaml - f(`foo bar baz`) + f(`foo bar baz`, true, 0) // Missing job_name f(` scrape_configs: - static_configs: - targets: ["foo"] -`) +`, false, 0) // Duplicate job_name f(` @@ -477,7 +476,7 @@ scrape_configs: - job_name: foo static_configs: targets: ["bar"] -`) +`, true, 1) // Invalid scheme f(` @@ -486,7 +485,7 @@ scrape_configs: scheme: asdf static_configs: - targets: ["foo"] -`) +`, false, 0) // Missing username in `basic_auth` f(` @@ -496,7 +495,7 @@ scrape_configs: password: sss static_configs: - targets: ["a"] -`) +`, false, 0) // Both password and password_file set in `basic_auth` f(` @@ -508,7 +507,7 @@ scrape_configs: password_file: sdfdf static_configs: - targets: ["a"] -`) +`, false, 0) // Invalid password_file set in `basic_auth` f(` @@ -519,7 +518,7 @@ scrape_configs: password_file: ['foobar'] static_configs: - targets: ["a"] -`) +`, true, 0) // Both `bearer_token` and `bearer_token_file` are set f(` @@ -529,7 +528,7 @@ scrape_configs: bearer_token_file: bar static_configs: - targets: ["a"] -`) +`, false, 0) // Both `basic_auth` and `bearer_token` are set f(` @@ -541,7 +540,7 @@ scrape_configs: password: bar static_configs: - targets: ["a"] -`) +`, false, 0) // Both `authorization` and `basic_auth` are set f(` @@ -553,7 +552,7 @@ scrape_configs: username: foobar static_configs: - targets: ["a"] -`) +`, false, 0) // Both `authorization` and `bearer_token` are set f(` @@ -564,7 +563,7 @@ scrape_configs: bearer_token: foo static_configs: - targets: ["a"] -`) +`, false, 0) // Invalid `bearer_token_file` f(` @@ -573,7 +572,7 @@ scrape_configs: bearer_token_file: [foobar] static_configs: - targets: ["a"] -`) +`, true, 0) // non-existing ca_file f(` @@ -583,7 +582,7 @@ scrape_configs: ca_file: non/extising/file static_configs: - targets: ["s"] -`) +`, false, 0) // invalid ca_file f(` @@ -593,7 +592,7 @@ scrape_configs: ca_file: testdata/prometheus.yml static_configs: - targets: ["s"] -`) +`, false, 0) // non-existing cert_file f(` @@ -603,7 +602,7 @@ scrape_configs: cert_file: non/extising/file static_configs: - targets: ["s"] -`) +`, false, 0) // non-existing key_file f(` @@ -613,7 +612,7 @@ scrape_configs: key_file: non/extising/file static_configs: - targets: ["s"] -`) +`, false, 0) // Invalid regex in relabel_configs f(` @@ -625,7 +624,7 @@ scrape_configs: target_label: bar static_configs: - targets: ["s"] -`) +`, false, 0) // Missing target_label for action=replace in relabel_configs f(` @@ -636,7 +635,7 @@ scrape_configs: source_labels: [foo] static_configs: - targets: ["s"] -`) +`, false, 0) // Missing source_labels for action=keep in relabel_configs f(` @@ -646,7 +645,7 @@ scrape_configs: - action: keep static_configs: - targets: ["s"] -`) +`, false, 0) // Missing source_labels for action=drop in relabel_configs f(` @@ -656,7 +655,7 @@ scrape_configs: - action: drop static_configs: - targets: ["s"] -`) +`, false, 0) // Missing source_labels for action=hashmod in relabel_configs f(` @@ -668,7 +667,7 @@ scrape_configs: modulus: 123 static_configs: - targets: ["s"] -`) +`, false, 0) // Missing target for action=hashmod in relabel_configs f(` @@ -680,7 +679,7 @@ scrape_configs: modulus: 123 static_configs: - targets: ["s"] -`) +`, false, 0) // Missing modulus for action=hashmod in relabel_configs f(` @@ -692,7 +691,7 @@ scrape_configs: target_label: bar static_configs: - targets: ["s"] -`) +`, false, 0) // Invalid action in relabel_configs f(` @@ -702,7 +701,7 @@ scrape_configs: - action: foobar static_configs: - targets: ["s"] -`) +`, false, 0) // Invalid scrape_config_files contents f(` @@ -710,7 +709,7 @@ scrape_config_files: - job_name: aa static_configs: - targets: ["s"] -`) +`, true, 0) } func resetNonEssentialFields(sws []*ScrapeWork) { diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index b677bd241..ceb1c3a38 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -211,7 +211,7 @@ type groupWatcher struct { selectors []Selector attachNodeMetadata bool - setHeaders func(req *http.Request) + setHeaders func(req *http.Request) error client *http.Client mu sync.Mutex @@ -239,7 +239,7 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors: selectors, attachNodeMetadata: attachNodeMetadata, - setHeaders: func(req *http.Request) { ac.SetHeaders(req, true) }, + setHeaders: func(req *http.Request) error { return ac.SetHeaders(req, true) }, client: client, m: make(map[string]*urlWatcher), } @@ -420,7 +420,10 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http if err != nil { logger.Fatalf("cannot create a request for %q: %s", requestURL, err) } - gw.setHeaders(req) + err = gw.setHeaders(req) + if err != nil { + return nil, err + } resp, err := gw.client.Do(req) if err != nil { return nil, err diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 161c207af..6e8163d4f 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -68,8 +68,8 @@ type Client struct { apiServer string - setHTTPHeaders func(req *http.Request) - setHTTPProxyHeaders func(req *http.Request) + setHTTPHeaders func(req *http.Request) error + setHTTPProxyHeaders func(req *http.Request) error clientCtx context.Context clientCancel context.CancelFunc @@ -140,10 +140,10 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy }, } - setHTTPHeaders := func(req *http.Request) {} + setHTTPHeaders := func(req *http.Request) error { return nil } if ac != nil { - setHTTPHeaders = func(req *http.Request) { - ac.SetHeaders(req, true) + setHTTPHeaders = func(req *http.Request) error { + return ac.SetHeaders(req, true) } } if httpCfg.FollowRedirects != nil && !*httpCfg.FollowRedirects { @@ -153,10 +153,10 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy client.CheckRedirect = checkRedirect blockingClient.CheckRedirect = checkRedirect } - setHTTPProxyHeaders := func(req *http.Request) {} + setHTTPProxyHeaders := func(req *http.Request) error { return nil } if proxyAC != nil { - setHTTPProxyHeaders = func(req *http.Request) { - proxyURL.SetHeaders(proxyAC, req) + setHTTPProxyHeaders = func(req *http.Request) error { + return proxyURL.SetHeaders(proxyAC, req) } } ctx, cancel := context.WithCancel(context.Background()) @@ -247,8 +247,14 @@ func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, clien return nil, fmt.Errorf("cannot create request for %q: %w", requestURL, err) } - c.setHTTPHeaders(req) - c.setHTTPProxyHeaders(req) + err = c.setHTTPHeaders(req) + if err != nil { + return nil, fmt.Errorf("cannot set request http header for %q: %w", requestURL, err) + } + err = c.setHTTPProxyHeaders(req) + if err != nil { + return nil, fmt.Errorf("cannot set request http proxy header for %q: %w", requestURL, err) + } if modifyRequest != nil { modifyRequest(req) } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index f6bf3dd8d..f13c5654b 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -405,7 +405,12 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) { // Start new scrapers only after the deleted scrapers are stopped. for _, sw := range swsToStart { - sc := newScraper(sw, sg.name, sg.pushData) + sc, err := newScraper(sw, sg.name, sg.pushData) + if err != nil { + // print error and skip invalid scraper config + logger.Errorf("cannot create scraper to %q in job %q, will skip it: %w", sw.ScrapeURL, sg.name, err) + continue + } sg.activeScrapers.Inc() sg.scrapersStarted.Inc() sg.wg.Add(1) @@ -441,18 +446,21 @@ type scraper struct { stoppedCh chan struct{} } -func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper { +func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) (*scraper, error) { ctx, cancel := context.WithCancel(context.Background()) sc := &scraper{ ctx: ctx, cancel: cancel, stoppedCh: make(chan struct{}), } - c := newClient(ctx, sw) + c, err := newClient(ctx, sw) + if err != nil { + return &scraper{}, err + } sc.sw.Config = sw sc.sw.ScrapeGroup = group sc.sw.ReadData = c.ReadData sc.sw.GetStreamReader = c.GetStreamReader sc.sw.PushData = pushData - return sc + return sc, nil } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index ac100e958..2b684e1b9 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -454,7 +454,6 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b defer func() { <-processScrapedDataConcurrencyLimitCh }() - endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) diff --git a/lib/proxy/proxy.go b/lib/proxy/proxy.go index bd0814113..02bd2f3c9 100644 --- a/lib/proxy/proxy.go +++ b/lib/proxy/proxy.go @@ -73,38 +73,48 @@ func (u *URL) String() string { } // SetHeaders sets headers to req according to u and ac configs. -func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) { - ah := u.getAuthHeader(ac) +func (u *URL) SetHeaders(ac *promauth.Config, req *http.Request) error { + ah, err := u.getAuthHeader(ac) + if err != nil { + return err + } if ah != "" { req.Header.Set("Proxy-Authorization", ah) } - ac.SetHeaders(req, false) + return ac.SetHeaders(req, false) } // SetFasthttpHeaders sets headers to req according to u and ac configs. -func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) { - ah := u.getAuthHeader(ac) +func (u *URL) SetFasthttpHeaders(ac *promauth.Config, req *fasthttp.Request) error { + ah, err := u.getAuthHeader(ac) + if err != nil { + return err + } if ah != "" { req.Header.Set("Proxy-Authorization", ah) } - ac.SetFasthttpHeaders(req, false) + return ac.SetFasthttpHeaders(req, false) } // getAuthHeader returns Proxy-Authorization auth header for the given u and ac. -func (u *URL) getAuthHeader(ac *promauth.Config) string { +func (u *URL) getAuthHeader(ac *promauth.Config) (string, error) { authHeader := "" if ac != nil { - authHeader = ac.GetAuthHeader() + var err error + authHeader, err = ac.GetAuthHeader() + if err != nil { + return "", err + } } if u == nil || u.URL == nil { - return authHeader + return authHeader, nil } pu := u.URL if pu.User != nil && len(pu.User.Username()) > 0 { userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String())) authHeader = "Basic " + userPasswordEncoded } - return authHeader + return authHeader, nil } // MarshalYAML implements yaml.Marshaler interface. @@ -161,7 +171,10 @@ func (u *URL) NewDialFunc(ac *promauth.Config) (fasthttp.DialFunc, error) { if isTLS { proxyConn = tls.Client(proxyConn, tlsCfg) } - authHeader := u.getAuthHeader(ac) + authHeader, err := u.getAuthHeader(ac) + if err != nil { + return nil, fmt.Errorf("cannot get auth header: %s", err) + } if authHeader != "" { authHeader = "Proxy-Authorization: " + authHeader + "\r\n" authHeader += ac.HeadersNoAuthString()