VictoriaMetrics/lib/promscrape/discovery/yandexcloud/api.go

254 lines
7.6 KiB
Go
Raw Normal View History

package yandexcloud
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
var configMap = discoveryutils.NewConfigMap()
type apiCredentials struct {
Token string `json:"Token"`
Expiration time.Time `json:"Expiration"`
}
// yandexPassportOAuth is a struct for Yandex Cloud IAM token request
// https://cloud.yandex.com/en-ru/docs/iam/operations/iam-token/create
type yandexPassportOAuth struct {
YandexPassportOAuthToken string `json:"yandexPassportOauthToken"`
}
type apiConfig struct {
client *http.Client
yandexPassportOAuth *yandexPassportOAuth
serviceEndpoints map[string]string
// credsLock protects the refresh of creds
credsLock sync.Mutex
creds *apiCredentials
}
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
lib/{promauth,promscrape}: automatically refresh root CA certificates after changes on disk (#5725) * lib/{promauth,promscrape}: automatically refresh root CA certificates after changes on disk Added a custom `http.RoundTripper` implementation which checks for root CA content changes and updates `tls.Config` used by `http.RoundTripper` after detecting CA change. Client certificate changes are not tracked by this implementation since `tls.Config` already supports passing certificate dynamically by overriding `tls.Config.GetClientCertificate`. This change implements dynamic reload of root CA only for streaming client used for scraping. Blocking client (`fasthttp.HostClient`) does not support using custom transport so can't use this implementation. See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5526 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: update NewRoundTripper API Update API to allow user to update only parameters required for transport. Add warning log when reloading Root CA failed. Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: fix mutex acquire logic Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: replace RWMutex with regular mutex to simplify the code - remove additional mutex used for getRootCABytes - require callee to use mutex - replace RWMutex with regular mutex Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: refactor - hold the mutex lock to avoid round tripper being re-created twice - move recreation logic into separate func to simplify the code Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
2024-04-03 08:01:43 +00:00
var transport http.RoundTripper = &http.Transport{
MaxIdleConnsPerHost: 100,
}
if sdc.TLSConfig != nil {
opts := &promauth.Options{
BaseDir: baseDir,
TLSConfig: sdc.TLSConfig,
}
ac, err := opts.NewConfig()
lib/promauth: follow-up for e16d3f5639d67ff970975d342aaa276e339e9b0c - Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup don't prevent from processing the corresponding scrape targets after the file becomes correct, without the need to restart vmagent. Previously scrape targets with invalid TLS CA file or TLS client certificate files were permanently dropped after the first attempt to initialize them, and they didn't appear until the next vmagent reload or the next change in other places of the loaded scrape configs. - Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent. Previously the old TLS CA was used until vmagent restart. - Properly handle errors during http request creation for the second attempt to send data to remote system at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing, since the returned request is nil on error. - Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent. Previously it could miss details on the source of the request. - Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header of every http request issued by vmagent during service discovery or target scraping. Re-use the HTTP client instead until the corresponding scrape config changes. - Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached, e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server when auth header cannot be obtained because of temporary error. - Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config. Cache the loaded certificate and the error for one second. This should significantly reduce CPU load when scraping big number of targets with the same tls_config. - Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`. - Improve test coverage at lib/promauth - Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later. Previously vmagent was exitting in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
2023-10-25 21:19:33 +00:00
if err != nil {
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
}
lib/{promauth,promscrape}: automatically refresh root CA certificates after changes on disk (#5725) * lib/{promauth,promscrape}: automatically refresh root CA certificates after changes on disk Added a custom `http.RoundTripper` implementation which checks for root CA content changes and updates `tls.Config` used by `http.RoundTripper` after detecting CA change. Client certificate changes are not tracked by this implementation since `tls.Config` already supports passing certificate dynamically by overriding `tls.Config.GetClientCertificate`. This change implements dynamic reload of root CA only for streaming client used for scraping. Blocking client (`fasthttp.HostClient`) does not support using custom transport so can't use this implementation. See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5526 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: update NewRoundTripper API Update API to allow user to update only parameters required for transport. Add warning log when reloading Root CA failed. Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: fix mutex acquire logic Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: replace RWMutex with regular mutex to simplify the code - remove additional mutex used for getRootCABytes - require callee to use mutex - replace RWMutex with regular mutex Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * lib/promauth/config: refactor - hold the mutex lock to avoid round tripper being re-created twice - move recreation logic into separate func to simplify the code Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
2024-04-03 08:01:43 +00:00
transport, err = ac.NewRoundTripper(func(tr *http.Transport) {
tr.MaxIdleConnsPerHost = 100
})
if err != nil {
return nil, fmt.Errorf("cannot initialize TLS config: %w", err)
}
}
cfg := &apiConfig{
client: &http.Client{
Transport: transport,
},
}
apiEndpoint := sdc.APIEndpoint
if apiEndpoint == "" {
apiEndpoint = "https://api.cloud.yandex.net"
}
serviceEndpoints, err := cfg.getServiceEndpoints(apiEndpoint)
if err != nil {
return nil, fmt.Errorf("cannot obtain endpoints for yandex services: %w", err)
}
cfg.serviceEndpoints = serviceEndpoints
if sdc.YandexPassportOAuthToken != nil {
logger.Infof("yandexcloud_sd: using yandex passport OAuth token")
cfg.yandexPassportOAuth = &yandexPassportOAuth{
YandexPassportOAuthToken: sdc.YandexPassportOAuthToken.String(),
}
}
return cfg, nil
}
// getFreshAPICredentials checks token lifetime and update if needed
func (cfg *apiConfig) getFreshAPICredentials() (*apiCredentials, error) {
cfg.credsLock.Lock()
defer cfg.credsLock.Unlock()
if cfg.creds != nil && time.Until(cfg.creds.Expiration) > 10*time.Second {
// Credentials aren't expired yet.
return cfg.creds, nil
}
// Refresh credentials.
newCreds, err := getCreds(cfg)
if err != nil {
return nil, fmt.Errorf("cannot refresh service account api token: %w", err)
}
cfg.creds = newCreds
logger.Infof("yandexcloud_sd: successfully refreshed service account api token; expiration: %.3f seconds", time.Until(newCreds.Expiration).Seconds())
return newCreds, nil
}
// getCreds get Yandex Cloud IAM token based on configuration
func getCreds(cfg *apiConfig) (*apiCredentials, error) {
if cfg.yandexPassportOAuth == nil {
return getInstanceCreds(cfg)
}
it, err := getIAMToken(cfg)
if err != nil {
return nil, fmt.Errorf("cannot get IAM token: %w", err)
}
return &apiCredentials{
Token: it.IAMToken,
Expiration: it.ExpiresAt,
}, nil
}
// getInstanceCreds gets Yandex Cloud IAM token using instance Service Account
//
// See https://cloud.yandex.com/en-ru/docs/compute/operations/vm-connect/auth-inside-vm
func getInstanceCreds(cfg *apiConfig) (*apiCredentials, error) {
endpoint := "http://169.254.169.254/latest/meta-data/iam/security-credentials/default"
resp, err := cfg.client.Get(endpoint)
if err != nil {
return nil, fmt.Errorf("cannot read instance creds from %s: %w", endpoint, err)
}
data, err := readResponseBody(resp, endpoint)
if err != nil {
return nil, err
}
var ac apiCredentials
if err := json.Unmarshal(data, &ac); err != nil {
return nil, fmt.Errorf("cannot parse auth credentials response from %s: %w", endpoint, err)
}
return &ac, nil
}
// getIAMToken gets Yandex Cloud IAM token using OAuth
//
// See https://cloud.yandex.com/en-ru/docs/iam/operations/iam-token/create
func getIAMToken(cfg *apiConfig) (*iamToken, error) {
iamURL := cfg.serviceEndpoints["iam"] + "/iam/v1/tokens"
passport, err := json.Marshal(cfg.yandexPassportOAuth)
if err != nil {
logger.Panicf("BUG: cannot marshal yandex passport OAuth token: %s", err)
}
body := bytes.NewBuffer(passport)
resp, err := cfg.client.Post(iamURL, "application/json", body)
if err != nil {
logger.Panicf("BUG: cannot create request to yandex cloud iam api %q: %s", iamURL, err)
}
data, err := readResponseBody(resp, iamURL)
if err != nil {
return nil, err
}
var it iamToken
if err := json.Unmarshal(data, &it); err != nil {
return nil, fmt.Errorf("cannot parse iam token: %w; data: %s", err, data)
}
return &it, nil
}
// iamToken represents Yandex Cloud IAM token response
//
// See https://cloud.yandex.com/en-ru/docs/iam/operations/iam-token/create
type iamToken struct {
IAMToken string `json:"iamToken"`
ExpiresAt time.Time `json:"expiresAt"`
}
// getServiceEndpoints returns services endpoints map
//
// See https://cloud.yandex.com/en-ru/docs/api-design-guide/concepts/endpoints
func (cfg *apiConfig) getServiceEndpoints(apiEndpoint string) (map[string]string, error) {
apiEndpointURL, err := url.Parse(apiEndpoint)
if err != nil {
return nil, fmt.Errorf("cannot parse api_endpoint %q: %w", apiEndpoint, err)
}
scheme := apiEndpointURL.Scheme
if scheme == "" {
return nil, fmt.Errorf("missing scheme in api_endpoint %q", apiEndpoint)
}
if apiEndpointURL.Host == "" {
return nil, fmt.Errorf("missing host in api_endpoint %q", apiEndpoint)
}
endpointsURL := apiEndpoint + "/endpoints"
resp, err := cfg.client.Get(endpointsURL)
if err != nil {
return nil, fmt.Errorf("cannot query %q: %w", endpointsURL, err)
}
data, err := readResponseBody(resp, endpointsURL)
if err != nil {
return nil, err
}
var eps endpoints
if err := json.Unmarshal(data, &eps); err != nil {
return nil, fmt.Errorf("cannot parse API endpoints list: %w; data=%s", err, data)
}
m := make(map[string]string, len(eps.Endpoints))
for _, endpoint := range eps.Endpoints {
m[endpoint.ID] = scheme + "://" + endpoint.Address
}
return m, nil
}
type endpoints struct {
Endpoints []endpoint `json:"endpoints"`
}
type endpoint struct {
ID string `json:"id"`
Address string `json:"address"`
}
// getAPIResponse calls Yandex Cloud apiURL and returns response body.
func getAPIResponse(apiURL string, cfg *apiConfig) ([]byte, error) {
creds, err := cfg.getFreshAPICredentials()
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
logger.Panicf("BUG: cannot create new request for yandex cloud api url %s: %s", apiURL, err)
}
req.Header.Set("Authorization", "Bearer "+creds.Token)
resp, err := cfg.client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot query yandex cloud api url %s: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}
// readResponseBody reads body from http.Response.
func readResponseBody(resp *http.Response, apiURL string) ([]byte, error) {
data, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("cannot read response from %q: %w", apiURL, err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code for %q; got %d; want %d; response body: %s",
apiURL, resp.StatusCode, http.StatusOK, data)
}
return data, nil
}