{lib/promscrape,app/vmagent}: adds sigv4 support for vmagent remoteWrite (#2458)

* {lib/promscrape,app/vmagent}: adds sigv4 support for vmagent remoteWrite
moves aws related code into separate lib from lib/promscrape
it allows to write data from vmagent to the AWS managed prometheus (cortex)

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1287

* Apply suggestions from code review

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2022-05-04 19:24:19 +02:00 committed by GitHub
parent f6dcfbcdd6
commit d289ecded1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 586 additions and 490 deletions

View file

@ -10,6 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
@ -59,6 +61,17 @@ var (
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url") "If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
oauth2Scopes = flagutil.NewArray("remoteWrite.oauth2.scopes", "Optional OAuth2 scopes to use for -remoteWrite.url. Scopes must be delimited by ';'. "+ oauth2Scopes = flagutil.NewArray("remoteWrite.oauth2.scopes", "Optional OAuth2 scopes to use for -remoteWrite.url. Scopes must be delimited by ';'. "+
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url") "If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
useSigV4 = flagutil.NewArrayBool("remoteWrite.useSigv4", "Enables SigV4 request signing to use for -remoteWrite.url. "+
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
awsRegion = flagutil.NewArray("remoteWrite.aws.region", "Optional AWS region to use for -remoteWrite.url if -remoteWrite.useSigv4 is set. "+
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
awsRoleARN = flagutil.NewArray("remoteWrite.aws.roleARN", "Optional AWS roleARN to use for -remoteWrite.url if -remoteWrite.useSigv4 is set. "+
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
awsAccessKey = flagutil.NewArray("remoteWrite.aws.accessKey", "Optional AWS AccessKey to use for -remoteWrite.url if -remoteWrite.useSigv4 is set. "+
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
awsSecretKey = flagutil.NewArray("remoteWrite.aws.secretKey", "Optional AWS SecretKey to use for -remoteWrite.url if -remoteWrite.useSigv4 is set. "+
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url")
) )
type client struct { type client struct {
@ -69,6 +82,7 @@ type client struct {
sendBlock func(block []byte) bool sendBlock func(block []byte) bool
authCfg *promauth.Config authCfg *promauth.Config
awsCfg *awsapi.Config
rl rateLimiter rl rateLimiter
@ -89,9 +103,13 @@ type client struct {
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client { func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client {
authCfg, err := getAuthConfig(argIdx) authCfg, err := getAuthConfig(argIdx)
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot initialize auth config: %s", err) logger.Panicf("FATAL: cannot initialize auth config for remoteWrite.url=%q: %s", remoteWriteURL, err)
} }
tlsCfg := authCfg.NewTLSConfig() tlsCfg := authCfg.NewTLSConfig()
awsCfg, err := getAWSAPIConfig(argIdx)
if err != nil {
logger.Fatalf("FATAL: cannot initialize AWS Config for remoteWrite.url=%q: %s", remoteWriteURL, err)
}
tr := &http.Transport{ tr := &http.Transport{
DialContext: statDial, DialContext: statDial,
TLSClientConfig: tlsCfg, TLSClientConfig: tlsCfg,
@ -116,6 +134,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
sanitizedURL: sanitizedURL, sanitizedURL: sanitizedURL,
remoteWriteURL: remoteWriteURL, remoteWriteURL: remoteWriteURL,
authCfg: authCfg, authCfg: authCfg,
awsCfg: awsCfg,
fq: fq, fq: fq,
hc: &http.Client{ hc: &http.Client{
Transport: tr, Transport: tr,
@ -205,6 +224,21 @@ func getAuthConfig(argIdx int) (*promauth.Config, error) {
return authCfg, nil return authCfg, nil
} }
func getAWSAPIConfig(argIdx int) (*awsapi.Config, error) {
if !useSigV4.GetOptionalArg(argIdx) {
return nil, nil
}
region := awsRegion.GetOptionalArg(argIdx)
roleARN := awsRoleARN.GetOptionalArg(argIdx)
accessKey := awsAccessKey.GetOptionalArg(argIdx)
secretKey := awsSecretKey.GetOptionalArg(argIdx)
cfg, err := awsapi.NewConfig(region, roleARN, accessKey, secretKey, "")
if err != nil {
return nil, err
}
return cfg, nil
}
func (c *client) runWorker() { func (c *client) runWorker() {
var ok bool var ok bool
var block []byte var block []byte
@ -254,6 +288,10 @@ func (c *client) sendBlockHTTP(block []byte) bool {
retriesCount := 0 retriesCount := 0
c.bytesSent.Add(len(block)) c.bytesSent.Add(len(block))
c.blocksSent.Inc() c.blocksSent.Inc()
sigv4Hash := ""
if c.awsCfg != nil {
sigv4Hash = awsapi.HashHex(block)
}
again: again:
req, err := http.NewRequest("POST", c.remoteWriteURL, bytes.NewBuffer(block)) req, err := http.NewRequest("POST", c.remoteWriteURL, bytes.NewBuffer(block))
@ -268,7 +306,12 @@ again:
if ah := c.authCfg.GetAuthHeader(); ah != "" { if ah := c.authCfg.GetAuthHeader(); ah != "" {
req.Header.Set("Authorization", ah) req.Header.Set("Authorization", ah)
} }
if c.awsCfg != nil {
if err := c.awsCfg.SignRequest(req, "aps", sigv4Hash); err != nil {
// there is no need in retry, request will be rejected by client.Do and retried by code below
logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err)
}
}
startTime := time.Now() startTime := time.Now()
resp, err := c.hc.Do(req) resp, err := c.hc.Do(req)
c.requestDuration.UpdateDuration(startTime) c.requestDuration.UpdateDuration(startTime)

View file

@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip ## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for sending data to remote storage with AWS sigv4 authorization. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1287).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow filtering targets by target url and by target labels with [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) on `http://vmagent:8429/targets` page. This may be useful when `vmagent` scrapes big number of targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1796). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow filtering targets by target url and by target labels with [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) on `http://vmagent:8429/targets` page. This may be useful when `vmagent` scrapes big number of targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1796).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce `-promscrape.config` reload duration when the config contains big number of jobs (aka [scrape_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) sections) and only a few of them are changed. Previously all the jobs were restarted. Now only the jobs with changed configs are restarted. This should reduce the probability of data miss because of slow config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2270). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce `-promscrape.config` reload duration when the config contains big number of jobs (aka [scrape_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) sections) and only a few of them are changed. Previously all the jobs were restarted. Now only the jobs with changed configs are restarted. This should reduce the probability of data miss because of slow config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2270).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve service discovery speed for big number of scrape targets. This should help when `vmagent` discovers big number of targets (e.g. thousands) in Kubernetes cluster. The service discovery speed now should scale with the number of CPU cores available to `vmagent`. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve service discovery speed for big number of scrape targets. This should help when `vmagent` discovers big number of targets (e.g. thousands) in Kubernetes cluster. The service discovery speed now should scale with the number of CPU cores available to `vmagent`.

428
lib/awsapi/config.go Normal file
View file

@ -0,0 +1,428 @@
package awsapi
import (
"encoding/json"
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// Config represent aws access configuration.
type Config struct {
client *http.Client
region string
roleARN string
webTokenPath string
filtersQueryString string
ec2Endpoint string
stsEndpoint string
// these keys are needed for obtaining creds.
defaultAccessKey string
defaultSecretKey string
// Real credentials used for accessing EC2 API.
creds *credentials
credsLock sync.Mutex
}
// credentials represent aws api credentials.
type credentials struct {
AccessKeyID string
SecretAccessKey string
Token string
Expiration time.Time
}
// NewConfig returns new AWS Config.
//
// filtersQueryString must contain an optional percent-encoded query string for aws filters.
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html for examples.
func NewConfig(region, roleARN, accessKey, secretKey, filtersQueryString string) (*Config, error) {
cfg := &Config{
client: http.DefaultClient,
region: region,
roleARN: roleARN,
filtersQueryString: filtersQueryString,
defaultAccessKey: os.Getenv("AWS_ACCESS_KEY_ID"),
defaultSecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
}
cfg.region = region
if cfg.region == "" {
r, err := getDefaultRegion(cfg.client)
if err != nil {
return nil, fmt.Errorf("cannot determine default AWS region: %w", err)
}
cfg.region = r
}
cfg.ec2Endpoint = buildAPIEndpoint(cfg.ec2Endpoint, cfg.region, "ec2")
cfg.stsEndpoint = buildAPIEndpoint(cfg.stsEndpoint, cfg.region, "sts")
if cfg.roleARN == "" {
cfg.roleARN = os.Getenv("AWS_ROLE_ARN")
}
cfg.webTokenPath = os.Getenv("AWS_WEB_IDENTITY_TOKEN_FILE")
if cfg.webTokenPath != "" && cfg.roleARN == "" {
return nil, fmt.Errorf("roleARN is missing for AWS_WEB_IDENTITY_TOKEN_FILE=%q; set it via env var AWS_ROLE_ARN", cfg.webTokenPath)
}
// explicitly set credentials has priority over env variables
cfg.defaultAccessKey = os.Getenv("AWS_ACCESS_KEY_ID")
cfg.defaultSecretKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
if len(accessKey) > 0 {
cfg.defaultAccessKey = accessKey
}
if len(secretKey) > 0 {
cfg.defaultSecretKey = secretKey
}
cfg.creds = &credentials{
AccessKeyID: cfg.defaultAccessKey,
SecretAccessKey: cfg.defaultSecretKey,
}
return cfg, nil
}
// GetEC2APIResponse performs EC2 API request with ghe given action.
func (cfg *Config) GetEC2APIResponse(action, nextPageToken string) ([]byte, error) {
ac, err := cfg.getFreshAPICredentials()
if err != nil {
return nil, err
}
apiURL := fmt.Sprintf("%s?Action=%s", cfg.ec2Endpoint, url.QueryEscape(action))
if len(cfg.filtersQueryString) > 0 {
apiURL += "&" + cfg.filtersQueryString
}
if len(nextPageToken) > 0 {
apiURL += fmt.Sprintf("&NextToken=%s", url.QueryEscape(nextPageToken))
}
apiURL += "&Version=2013-10-15"
req, err := newSignedGetRequest(apiURL, "ec2", cfg.region, ac)
if err != nil {
return nil, fmt.Errorf("cannot create signed request: %w", err)
}
resp, err := cfg.client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot perform http request to %q: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}
// SignRequest signs request for service access and payloadHash.
func (cfg *Config) SignRequest(req *http.Request, service string, payloadHash string) error {
ac, err := cfg.getFreshAPICredentials()
if err != nil {
return err
}
return signRequestWithTime(req, service, cfg.region, payloadHash, ac, time.Now().UTC())
}
func readResponseBody(resp *http.Response, apiURL string) ([]byte, error) {
data, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("cannot read response from %q: %w", apiURL, err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code for %q; got %d; want %d; response body: %q",
apiURL, resp.StatusCode, http.StatusOK, data)
}
return data, nil
}
func getDefaultRegion(client *http.Client) (string, error) {
envRegion := os.Getenv("AWS_REGION")
if envRegion != "" {
return envRegion, nil
}
data, err := getMetadataByPath(client, "dynamic/instance-identity/document")
if err != nil {
return "", err
}
var id IdentityDocument
if err := json.Unmarshal(data, &id); err != nil {
return "", fmt.Errorf("cannot parse identity document: %w", err)
}
return id.Region, nil
}
// IdentityDocument is identity document returned from AWS metadata server.
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html
type IdentityDocument struct {
Region string
}
// getFreshAPICredentials returns fresh EC2 API credentials.
//
// The credentials are refreshed if needed.
func (cfg *Config) getFreshAPICredentials() (*credentials, error) {
cfg.credsLock.Lock()
defer cfg.credsLock.Unlock()
if len(cfg.defaultAccessKey) > 0 && len(cfg.defaultSecretKey) > 0 && len(cfg.roleARN) == 0 {
// There is no need in refreshing statically set api credentials if roleARN isn't set.
return cfg.creds, nil
}
if time.Until(cfg.creds.Expiration) > 10*time.Second {
// credentials aren't expired yet.
return cfg.creds, nil
}
// credentials have been expired. Update them.
ac, err := cfg.getAPICredentials()
if err != nil {
return nil, fmt.Errorf("cannot obtain new EC2 API credentials: %w", err)
}
cfg.creds = ac
return ac, nil
}
// getAPICredentials obtains new EC2 API credentials from instance metadata and role_arn.
func (cfg *Config) getAPICredentials() (*credentials, error) {
acNew := &credentials{
AccessKeyID: cfg.defaultAccessKey,
SecretAccessKey: cfg.defaultSecretKey,
}
if len(cfg.webTokenPath) > 0 {
token, err := ioutil.ReadFile(cfg.webTokenPath)
if err != nil {
return nil, fmt.Errorf("cannot read webToken from path: %q, err: %w", cfg.webTokenPath, err)
}
return cfg.getRoleWebIdentityCredentials(string(token))
}
if ecsMetaURI := os.Getenv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"); len(ecsMetaURI) > 0 {
path := "http://169.254.170.2" + ecsMetaURI
return getECSRoleCredentialsByPath(cfg.client, path)
}
// we need instance credentials if dont have access keys
if len(acNew.AccessKeyID) == 0 && len(acNew.SecretAccessKey) == 0 {
ac, err := getInstanceRoleCredentials(cfg.client)
if err != nil {
return nil, fmt.Errorf("cannot obtain instance role credentials: %w", err)
}
acNew = ac
}
// read credentials from sts api, if role_arn is defined
if len(cfg.roleARN) > 0 {
ac, err := cfg.getRoleARNCredentials(acNew)
if err != nil {
return nil, fmt.Errorf("cannot get credentials for role_arn %q: %w", cfg.roleARN, err)
}
acNew = ac
}
if len(acNew.AccessKeyID) == 0 {
return nil, fmt.Errorf("missing AWS access_key; it may be set via env var AWS_ACCESS_KEY_ID or use instance iam role")
}
if len(acNew.SecretAccessKey) == 0 {
return nil, fmt.Errorf("missing AWS secret_key; it may be set via env var AWS_SECRET_ACCESS_KEY or use instance iam role")
}
return acNew, nil
}
// getECSRoleCredentialsByPath makes request to ecs metadata service
// and retrieves instances credentails
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
func getECSRoleCredentialsByPath(client *http.Client, path string) (*credentials, error) {
resp, err := client.Get(path)
if err != nil {
return nil, fmt.Errorf("cannot get ECS instance role credentials: %w", err)
}
data, err := readResponseBody(resp, path)
if err != nil {
return nil, err
}
return parseMetadataSecurityCredentials(data)
}
// getInstanceRoleCredentials makes request to local ec2 instance metadata service
// and tries to retrieve credentials from assigned iam role.
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
func getInstanceRoleCredentials(client *http.Client) (*credentials, error) {
instanceRoleName, err := getMetadataByPath(client, "meta-data/iam/security-credentials/")
if err != nil {
return nil, fmt.Errorf("cannot get instanceRoleName: %w", err)
}
data, err := getMetadataByPath(client, "meta-data/iam/security-credentials/"+string(instanceRoleName))
if err != nil {
return nil, fmt.Errorf("cannot get security credentails for instanceRoleName %q: %w", instanceRoleName, err)
}
return parseMetadataSecurityCredentials(data)
}
// parseMetadataSecurityCredentials parses apiCredentials from metadata response to http://169.254.169.254/latest/meta-data/iam/security-credentials/*
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
func parseMetadataSecurityCredentials(data []byte) (*credentials, error) {
var msc MetadataSecurityCredentials
if err := json.Unmarshal(data, &msc); err != nil {
return nil, fmt.Errorf("cannot parse metadata security credentials from %q: %w", data, err)
}
return &credentials{
AccessKeyID: msc.AccessKeyID,
SecretAccessKey: msc.SecretAccessKey,
Token: msc.Token,
Expiration: msc.Expiration,
}, nil
}
// MetadataSecurityCredentials represents credentials obtained from http://169.254.169.254/latest/meta-data/iam/security-credentials/*
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
type MetadataSecurityCredentials struct {
AccessKeyID string `json:"AccessKeyId"`
SecretAccessKey string `json:"SecretAccessKey"`
Token string `json:"Token"`
Expiration time.Time `json:"Expiration"`
}
// getMetadataByPath returns instance metadata by url path
func getMetadataByPath(client *http.Client, apiPath string) ([]byte, error) {
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
// Obtain session token
sessionTokenURL := "http://169.254.169.254/latest/api/token"
req, err := http.NewRequest("PUT", sessionTokenURL, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request for IMDSv2 session token at url %q: %w", sessionTokenURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "60")
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot obtain IMDSv2 session token from %q: %w", sessionTokenURL, err)
}
token, err := readResponseBody(resp, sessionTokenURL)
if err != nil {
return nil, fmt.Errorf("cannot read IMDSv2 session token from %q: %w", sessionTokenURL, err)
}
// Use session token in the request.
apiURL := "http://169.254.169.254/latest/" + apiPath
req, err = http.NewRequest("GET", apiURL, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %w", apiURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token", string(token))
resp, err = client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot obtain response for %q: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}
// getRoleWebIdentityCredentials obtains credentials fo the given roleARN with webToken.
// https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html
// aws IRSA for kubernetes.
// https://aws.amazon.com/blogs/opensource/introducing-fine-grained-iam-roles-service-accounts/
func (cfg *Config) getRoleWebIdentityCredentials(token string) (*credentials, error) {
data, err := cfg.getSTSAPIResponse("AssumeRoleWithWebIdentity", func(apiURL string) (*http.Request, error) {
apiURL += fmt.Sprintf("&WebIdentityToken=%s", url.QueryEscape(token))
return http.NewRequest("GET", apiURL, nil)
})
if err != nil {
return nil, err
}
return parseARNCredentials(data, "AssumeRoleWithWebIdentity")
}
// getSTSAPIResponse makes request to aws sts api with the given cfg and returns temporary credentials with expiration time.
//
// See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
func (cfg *Config) getSTSAPIResponse(action string, reqBuilder func(apiURL string) (*http.Request, error)) ([]byte, error) {
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Query-Requests.html
apiURL := fmt.Sprintf("%s?Action=%s", cfg.stsEndpoint, action)
apiURL += "&Version=2011-06-15"
apiURL += fmt.Sprintf("&RoleArn=%s", cfg.roleARN)
// we have to provide unique session name for cloudtrail audit
apiURL += "&RoleSessionName=vmagent-ec2-discovery"
req, err := reqBuilder(apiURL)
if err != nil {
return nil, fmt.Errorf("cannot create signed request: %w", err)
}
resp, err := cfg.client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot perform http request to %q: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}
// getRoleARNCredentials obtains credentials fo the given roleARN.
func (cfg *Config) getRoleARNCredentials(creds *credentials) (*credentials, error) {
data, err := cfg.getSTSAPIResponse("AssumeRole", func(apiURL string) (*http.Request, error) {
return newSignedGetRequest(apiURL, "sts", cfg.region, creds)
})
if err != nil {
return nil, err
}
return parseARNCredentials(data, "AssumeRole")
}
// parseARNCredentials parses apiCredentials from AssumeRole response.
//
// See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
func parseARNCredentials(data []byte, role string) (*credentials, error) {
var arr AssumeRoleResponse
if err := xml.Unmarshal(data, &arr); err != nil {
return nil, fmt.Errorf("cannot parse AssumeRoleResponse response from %q: %w", data, err)
}
var cred assumeCredentials
switch role {
case "AssumeRole":
cred = arr.AssumeRoleResult.Credentials
case "AssumeRoleWithWebIdentity":
cred = arr.AssumeRoleWithWebIdentityResult.Credentials
default:
logger.Panicf("BUG: unexpected role: %q", role)
}
return &credentials{
AccessKeyID: cred.AccessKeyID,
SecretAccessKey: cred.SecretAccessKey,
Token: cred.SessionToken,
Expiration: cred.Expiration,
}, nil
}
type assumeCredentials struct {
AccessKeyID string `xml:"AccessKeyId"`
SecretAccessKey string `xml:"SecretAccessKey"`
SessionToken string `xml:"SessionToken"`
Expiration time.Time `xml:"Expiration"`
}
// AssumeRoleResponse represents AssumeRole response
//
// See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
type AssumeRoleResponse struct {
AssumeRoleResult struct {
Credentials assumeCredentials `xml:"Credentials"`
} `xml:"AssumeRoleResult"`
AssumeRoleWithWebIdentityResult struct {
Credentials assumeCredentials `xml:"Credentials"`
} `xml:"AssumeRoleWithWebIdentityResult"`
}
// buildAPIEndpoint creates endpoint for aws api access
func buildAPIEndpoint(customEndpoint, region, service string) string {
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Query-Requests.html
if len(customEndpoint) == 0 {
return fmt.Sprintf("https://%s.%s.amazonaws.com/", service, region)
}
endpoint := customEndpoint
// endpoint may contain only hostname. Convert it to proper url then.
if !strings.Contains(endpoint, "://") {
endpoint = "https://" + endpoint
}
if !strings.HasSuffix(endpoint, "/") {
endpoint += "/"
}
return endpoint
}

View file

@ -1,4 +1,4 @@
package ec2 package awsapi
import ( import (
"fmt" "fmt"
@ -37,7 +37,7 @@ func TestParseMetadataSecurityCredentialsSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
credsExpected := &apiCredentials{ credsExpected := &credentials{
AccessKeyID: "ASIAIOSFODNN7EXAMPLE", AccessKeyID: "ASIAIOSFODNN7EXAMPLE",
SecretAccessKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", SecretAccessKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
Token: "token", Token: "token",
@ -64,7 +64,7 @@ func TestParseARNCredentialsFailure(t *testing.T) {
} }
func TestParseARNCredentialsSuccess(t *testing.T) { func TestParseARNCredentialsSuccess(t *testing.T) {
f := func(data, role string, credsExpected *apiCredentials) { f := func(data, role string, credsExpected *credentials) {
t.Helper() t.Helper()
creds, err := parseARNCredentials([]byte(data), role) creds, err := parseARNCredentials([]byte(data), role)
if err != nil { if err != nil {
@ -102,7 +102,7 @@ func TestParseARNCredentialsSuccess(t *testing.T) {
</ResponseMetadata> </ResponseMetadata>
</AssumeRoleResponse> </AssumeRoleResponse>
` `
credsExpected := &apiCredentials{ credsExpected := &credentials{
AccessKeyID: "ASIAIOSFODNN7EXAMPLE", AccessKeyID: "ASIAIOSFODNN7EXAMPLE",
SecretAccessKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY", SecretAccessKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY",
Token: ` Token: `
@ -134,7 +134,7 @@ func TestParseARNCredentialsSuccess(t *testing.T) {
<RequestId>1214124-7bb0-4673-ad6d-af9e67fc1141</RequestId> <RequestId>1214124-7bb0-4673-ad6d-af9e67fc1141</RequestId>
</ResponseMetadata> </ResponseMetadata>
</AssumeRoleWithWebIdentityResponse>` </AssumeRoleWithWebIdentityResponse>`
credsExpected2 := &apiCredentials{ credsExpected2 := &credentials{
AccessKeyID: "ASIABYASSDASF", AccessKeyID: "ASIABYASSDASF",
SecretAccessKey: "asffasfasf/RvxIQpCid4iRMGm56nnRs2oKgV", SecretAccessKey: "asffasfasf/RvxIQpCid4iRMGm56nnRs2oKgV",
Token: "asfafsassssssssss/MlyKUPOYAiEAq5HgS19Mf8SJ3kIKU3NCztDeZW5EUW4NrPrPyXQ8om0q/AQIjv//////////", Token: "asfafsassssssssss/MlyKUPOYAiEAq5HgS19Mf8SJ3kIKU3NCztDeZW5EUW4NrPrPyXQ8om0q/AQIjv//////////",

View file

@ -1,4 +1,4 @@
package ec2 package awsapi
import ( import (
"crypto/hmac" "crypto/hmac"
@ -6,37 +6,46 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net/http" "net/http"
"net/url"
"strings" "strings"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// newSignedRequest signed request for apiURL according to aws signature algorithm. // for get requests there is no need to calculate payload hash each time.
var emptyPayloadHash = hashHex("")
// newSignedGetRequest creates signed http get request for apiURL according to aws signature algorithm.
// //
// See the algorithm at https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html // See the algorithm at https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
func newSignedRequest(apiURL, service, region string, creds *apiCredentials) (*http.Request, error) { func newSignedGetRequest(apiURL, service, region string, creds *credentials) (*http.Request, error) {
t := time.Now().UTC() return newSignedGetRequestWithTime(apiURL, service, region, creds, time.Now().UTC())
return newSignedRequestWithTime(apiURL, service, region, creds, t)
} }
func newSignedRequestWithTime(apiURL, service, region string, creds *apiCredentials, t time.Time) (*http.Request, error) { func newSignedGetRequestWithTime(apiURL, service, region string, creds *credentials, t time.Time) (*http.Request, error) {
uri, err := url.Parse(apiURL) req, err := http.NewRequest("GET", apiURL, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse %q: %w", apiURL, err) return nil, fmt.Errorf("cannot create http request with given apiURL: %s, err: %w", apiURL, err)
}
if err := signRequestWithTime(req, service, region, emptyPayloadHash, creds, t); err != nil {
return nil, err
}
return req, nil
} }
// signRequestWithTime - signs http request with AWS API credentials for given payload
func signRequestWithTime(req *http.Request, service, region, payloadHash string, creds *credentials, t time.Time) error {
uri := req.URL
// Create canonicalRequest // Create canonicalRequest
amzdate := t.Format("20060102T150405Z") amzdate := t.Format("20060102T150405Z")
datestamp := t.Format("20060102") datestamp := t.Format("20060102")
canonicalURL := uri.Path canonicalURL := uri.Path
canonicalQS := uri.Query().Encode() canonicalQS := uri.Query().Encode()
canonicalHeaders := fmt.Sprintf("host:%s\nx-amz-date:%s\n", uri.Host, amzdate) canonicalHeaders := fmt.Sprintf("host:%s\nx-amz-date:%s\n", uri.Host, amzdate)
signedHeaders := "host;x-amz-date" signedHeaders := "host;x-amz-date"
payloadHash := hashHex("")
tmp := []string{ tmp := []string{
"GET", req.Method,
canonicalURL, canonicalURL,
canonicalQS, canonicalQS,
canonicalHeaders, canonicalHeaders,
@ -45,7 +54,6 @@ func newSignedRequestWithTime(apiURL, service, region string, creds *apiCredenti
} }
canonicalRequest := strings.Join(tmp, "\n") canonicalRequest := strings.Join(tmp, "\n")
// Create stringToSign
algorithm := "AWS4-HMAC-SHA256" algorithm := "AWS4-HMAC-SHA256"
credentialScope := fmt.Sprintf("%s/%s/%s/aws4_request", datestamp, region, service) credentialScope := fmt.Sprintf("%s/%s/%s/aws4_request", datestamp, region, service)
tmp = []string{ tmp = []string{
@ -63,16 +71,13 @@ func newSignedRequestWithTime(apiURL, service, region string, creds *apiCredenti
// Calculate autheader // Calculate autheader
authHeader := fmt.Sprintf("%s Credential=%s/%s, SignedHeaders=%s, Signature=%s", algorithm, creds.AccessKeyID, credentialScope, signedHeaders, signature) authHeader := fmt.Sprintf("%s Credential=%s/%s, SignedHeaders=%s, Signature=%s", algorithm, creds.AccessKeyID, credentialScope, signedHeaders, signature)
req, err := http.NewRequest("GET", apiURL, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request from %q: %w", apiURL, err)
}
req.Header.Set("x-amz-date", amzdate) req.Header.Set("x-amz-date", amzdate)
req.Header.Set("Authorization", authHeader) req.Header.Set("Authorization", authHeader)
// special case for token auth
if creds.Token != "" { if creds.Token != "" {
req.Header.Set("X-Amz-Security-Token", creds.Token) req.Header.Set("X-Amz-Security-Token", creds.Token)
} }
return req, nil return nil
} }
func getSignatureKey(key, datestamp, region, service string) string { func getSignatureKey(key, datestamp, region, service string) string {
@ -83,7 +88,12 @@ func getSignatureKey(key, datestamp, region, service string) string {
} }
func hashHex(s string) string { func hashHex(s string) string {
h := sha256.Sum256([]byte(s)) return HashHex([]byte(s))
}
// HashHex hashes given s
func HashHex(s []byte) string {
h := sha256.Sum256(s)
return hex.EncodeToString(h[:]) return hex.EncodeToString(h[:])
} }

View file

@ -1,4 +1,4 @@
package ec2 package awsapi
import ( import (
"testing" "testing"
@ -10,12 +10,12 @@ func TestNewSignedRequest(t *testing.T) {
t.Helper() t.Helper()
service := "ec2" service := "ec2"
region := "us-east-1" region := "us-east-1"
ac := &apiCredentials{ ac := &credentials{
AccessKeyID: "fake-access-key", AccessKeyID: "fake-access-key",
SecretAccessKey: "foobar", SecretAccessKey: "foobar",
} }
ct := time.Unix(0, 0).UTC() ct := time.Unix(0, 0).UTC()
req, err := newSignedRequestWithTime(apiURL, service, region, ac, ct) req, err := newSignedGetRequestWithTime(apiURL, service, region, ac, ct)
if err != nil { if err != nil {
t.Fatalf("error in newSignedRequest: %s", err) t.Fatalf("error in newSignedRequest: %s", err)
} }

View file

@ -1,52 +1,24 @@
package ec2 package ec2
import ( import (
"encoding/json"
"encoding/xml"
"fmt" "fmt"
"io/ioutil"
"net/http"
"net/url" "net/url"
"os"
"strings" "strings"
"sync" "sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
type apiConfig struct { type apiConfig struct {
region string awsConfig *awsapi.Config
roleARN string
webTokenPath string
filters string
port int port int
ec2Endpoint string
stsEndpoint string
// these keys are needed for obtaining creds.
defaultAccessKey string
defaultSecretKey string
// Real credentials used for accessing EC2 API.
creds *apiCredentials
credsLock sync.Mutex
// A map from AZ name to AZ id. // A map from AZ name to AZ id.
azMap map[string]string azMap map[string]string
azMapLock sync.Mutex azMapLock sync.Mutex
} }
// apiCredentials represents aws api credentials
type apiCredentials struct {
AccessKeyID string
SecretAccessKey string
Token string
Expiration time.Time
}
var configMap = discoveryutils.NewConfigMap() var configMap = discoveryutils.NewConfigMap()
func getAPIConfig(sdc *SDConfig) (*apiConfig, error) { func getAPIConfig(sdc *SDConfig) (*apiConfig, error) {
@ -58,48 +30,19 @@ func getAPIConfig(sdc *SDConfig) (*apiConfig, error) {
} }
func newAPIConfig(sdc *SDConfig) (*apiConfig, error) { func newAPIConfig(sdc *SDConfig) (*apiConfig, error) {
region := sdc.Region fqs := getFiltersQueryString(sdc.Filters)
if len(region) == 0 {
r, err := getDefaultRegion()
if err != nil {
return nil, fmt.Errorf("cannot determine default ec2 region; probably, `region` param in `ec2_sd_configs` is missing; the error: %w", err)
}
region = r
}
filters := getFiltersQueryString(sdc.Filters)
port := 80 port := 80
if sdc.Port != nil { if sdc.Port != nil {
port = *sdc.Port port = *sdc.Port
} }
awsCfg, err := awsapi.NewConfig(sdc.Region, sdc.RoleARN, sdc.AccessKey, sdc.SecretKey.String(), fqs)
if err != nil {
return nil, err
}
cfg := &apiConfig{ cfg := &apiConfig{
region: region, awsConfig: awsCfg,
roleARN: sdc.RoleARN,
filters: filters,
port: port, port: port,
} }
cfg.ec2Endpoint = buildAPIEndpoint(sdc.Endpoint, region, "ec2")
cfg.stsEndpoint = buildAPIEndpoint(sdc.Endpoint, region, "sts")
if cfg.roleARN == "" {
cfg.roleARN = os.Getenv("AWS_ROLE_ARN")
}
cfg.webTokenPath = os.Getenv("AWS_WEB_IDENTITY_TOKEN_FILE")
if cfg.webTokenPath != "" && cfg.roleARN == "" {
return nil, fmt.Errorf("roleARN is missing for AWS_WEB_IDENTITY_TOKEN_FILE=%q, set it either in `ec2_sd_config` or via env var AWS_ROLE_ARN", cfg.webTokenPath)
}
// explicitly set credentials has priority over env variables
cfg.defaultAccessKey = os.Getenv("AWS_ACCESS_KEY_ID")
cfg.defaultSecretKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
if len(sdc.AccessKey) > 0 {
cfg.defaultAccessKey = sdc.AccessKey
}
if sdc.SecretKey != nil {
cfg.defaultSecretKey = sdc.SecretKey.String()
}
cfg.creds = &apiCredentials{
AccessKeyID: cfg.defaultAccessKey,
SecretAccessKey: cfg.defaultSecretKey,
}
return cfg, nil return cfg, nil
} }
@ -114,339 +57,3 @@ func getFiltersQueryString(filters []Filter) string {
} }
return strings.Join(args, "&") return strings.Join(args, "&")
} }
func getDefaultRegion() (string, error) {
envRegion := os.Getenv("AWS_REGION")
if envRegion != "" {
return envRegion, nil
}
data, err := getMetadataByPath("dynamic/instance-identity/document")
if err != nil {
return "", err
}
var id IdentityDocument
if err := json.Unmarshal(data, &id); err != nil {
return "", fmt.Errorf("cannot parse identity document: %w", err)
}
return id.Region, nil
}
// IdentityDocument is identity document returned from AWS metadata server.
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html
type IdentityDocument struct {
Region string
}
// getFreshAPICredentials returns fresh EC2 API credentials.
//
// The credentials are refreshed if needed.
func (cfg *apiConfig) getFreshAPICredentials() (*apiCredentials, error) {
cfg.credsLock.Lock()
defer cfg.credsLock.Unlock()
if len(cfg.defaultAccessKey) > 0 && len(cfg.defaultSecretKey) > 0 && len(cfg.roleARN) == 0 {
// There is no need in refreshing statically set api credentials if `role_arn` isn't set.
return cfg.creds, nil
}
if time.Until(cfg.creds.Expiration) > 10*time.Second {
// Credentials aren't expired yet.
return cfg.creds, nil
}
// Credentials have been expired. Update them.
ac, err := getAPICredentials(cfg)
if err != nil {
return nil, err
}
cfg.creds = ac
return ac, nil
}
// getAPICredentials obtains new EC2 API credentials from instance metadata and role_arn.
func getAPICredentials(cfg *apiConfig) (*apiCredentials, error) {
acNew := &apiCredentials{
AccessKeyID: cfg.defaultAccessKey,
SecretAccessKey: cfg.defaultSecretKey,
}
if len(cfg.webTokenPath) > 0 {
token, err := ioutil.ReadFile(cfg.webTokenPath)
if err != nil {
return nil, fmt.Errorf("cannot read webToken from path: %q, err: %w", cfg.webTokenPath, err)
}
return getRoleWebIdentityCredentials(cfg.stsEndpoint, cfg.roleARN, string(token))
}
if ecsMetaURI := os.Getenv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"); len(ecsMetaURI) > 0 {
path := "http://169.254.170.2" + ecsMetaURI
return getECSRoleCredentialsByPath(path)
}
// we need instance credentials if dont have access keys
if len(acNew.AccessKeyID) == 0 && len(acNew.SecretAccessKey) == 0 {
ac, err := getInstanceRoleCredentials()
if err != nil {
return nil, err
}
acNew = ac
}
// read credentials from sts api, if role_arn is defined
if len(cfg.roleARN) > 0 {
ac, err := getRoleARNCredentials(cfg.region, cfg.stsEndpoint, cfg.roleARN, acNew)
if err != nil {
return nil, fmt.Errorf("cannot get credentials for role_arn %q: %w", cfg.roleARN, err)
}
acNew = ac
}
if len(acNew.AccessKeyID) == 0 {
return nil, fmt.Errorf("missing `access_key`, you can set it with env var AWS_ACCESS_KEY_ID, " +
"directly at `ec2_sd_config` as `access_key` or use instance iam role")
}
if len(acNew.SecretAccessKey) == 0 {
return nil, fmt.Errorf("missing `secret_key`, you can set it with env var AWS_SECRET_ACCESS_KEY," +
"directly at `ec2_sd_config` as `secret_key` or use instance iam role")
}
return acNew, nil
}
// getECSRoleCredentialsByPath makes request to ecs metadata service
// and retrieves instances credentails
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
func getECSRoleCredentialsByPath(path string) (*apiCredentials, error) {
client := discoveryutils.GetHTTPClient()
resp, err := client.Get(path)
if err != nil {
return nil, fmt.Errorf("cannot get ECS instance role credentials: %w", err)
}
data, err := readResponseBody(resp, path)
if err != nil {
return nil, err
}
return parseMetadataSecurityCredentials(data)
}
// getInstanceRoleCredentials makes request to local ec2 instance metadata service
// and tries to retrieve credentials from assigned iam role.
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
func getInstanceRoleCredentials() (*apiCredentials, error) {
instanceRoleName, err := getMetadataByPath("meta-data/iam/security-credentials/")
if err != nil {
return nil, fmt.Errorf("cannot get instanceRoleName: %w", err)
}
data, err := getMetadataByPath("meta-data/iam/security-credentials/" + string(instanceRoleName))
if err != nil {
return nil, fmt.Errorf("cannot get security credentails for instanceRoleName %q: %w", instanceRoleName, err)
}
return parseMetadataSecurityCredentials(data)
}
// parseMetadataSecurityCredentials parses apiCredentials from metadata response to http://169.254.169.254/latest/meta-data/iam/security-credentials/*
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
func parseMetadataSecurityCredentials(data []byte) (*apiCredentials, error) {
var msc MetadataSecurityCredentials
if err := json.Unmarshal(data, &msc); err != nil {
return nil, fmt.Errorf("cannot parse metadata security credentials from %q: %w", data, err)
}
return &apiCredentials{
AccessKeyID: msc.AccessKeyID,
SecretAccessKey: msc.SecretAccessKey,
Token: msc.Token,
Expiration: msc.Expiration,
}, nil
}
// MetadataSecurityCredentials represents credentials obtained from http://169.254.169.254/latest/meta-data/iam/security-credentials/*
//
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
type MetadataSecurityCredentials struct {
AccessKeyID string `json:"AccessKeyId"`
SecretAccessKey string `json:"SecretAccessKey"`
Token string `json:"Token"`
Expiration time.Time `json:"Expiration"`
}
// getMetadataByPath returns instance metadata by url path
func getMetadataByPath(apiPath string) ([]byte, error) {
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
client := discoveryutils.GetHTTPClient()
// Obtain session token
sessionTokenURL := "http://169.254.169.254/latest/api/token"
req, err := http.NewRequest("PUT", sessionTokenURL, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request for IMDSv2 session token at url %q: %w", sessionTokenURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "60")
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot obtain IMDSv2 session token from %q; probably, `region` is missing in `ec2_sd_config`; error: %w", sessionTokenURL, err)
}
token, err := readResponseBody(resp, sessionTokenURL)
if err != nil {
return nil, fmt.Errorf("cannot read IMDSv2 session token from %q; probably, `region` is missing in `ec2_sd_config`; error: %w", sessionTokenURL, err)
}
// Use session token in the request.
apiURL := "http://169.254.169.254/latest/" + apiPath
req, err = http.NewRequest("GET", apiURL, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %w", apiURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token", string(token))
resp, err = client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot obtain response for %q: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}
// getRoleWebIdentityCredentials obtains credentials fo the given roleARN with webToken.
// https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithWebIdentity.html
// aws IRSA for kubernetes.
// https://aws.amazon.com/blogs/opensource/introducing-fine-grained-iam-roles-service-accounts/
func getRoleWebIdentityCredentials(stsEndpoint, roleARN string, token string) (*apiCredentials, error) {
data, err := getSTSAPIResponse("AssumeRoleWithWebIdentity", stsEndpoint, roleARN, func(apiURL string) (*http.Request, error) {
apiURL += fmt.Sprintf("&WebIdentityToken=%s", url.QueryEscape(token))
return http.NewRequest("GET", apiURL, nil)
})
if err != nil {
return nil, err
}
return parseARNCredentials(data, "AssumeRoleWithWebIdentity")
}
// getRoleARNCredentials obtains credentials fo the given roleARN.
func getRoleARNCredentials(region, stsEndpoint, roleARN string, creds *apiCredentials) (*apiCredentials, error) {
data, err := getSTSAPIResponse("AssumeRole", stsEndpoint, roleARN, func(apiURL string) (*http.Request, error) {
return newSignedRequest(apiURL, "sts", region, creds)
})
if err != nil {
return nil, err
}
return parseARNCredentials(data, "AssumeRole")
}
// parseARNCredentials parses apiCredentials from AssumeRole response.
//
// See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
func parseARNCredentials(data []byte, role string) (*apiCredentials, error) {
var arr AssumeRoleResponse
if err := xml.Unmarshal(data, &arr); err != nil {
return nil, fmt.Errorf("cannot parse AssumeRoleResponse response from %q: %w", data, err)
}
var cred assumeCredentials
switch role {
case "AssumeRole":
cred = arr.AssumeRoleResult.Credentials
case "AssumeRoleWithWebIdentity":
cred = arr.AssumeRoleWithWebIdentityResult.Credentials
default:
logger.Panicf("BUG: unexpected role: %q", role)
}
return &apiCredentials{
AccessKeyID: cred.AccessKeyID,
SecretAccessKey: cred.SecretAccessKey,
Token: cred.SessionToken,
Expiration: cred.Expiration,
}, nil
}
type assumeCredentials struct {
AccessKeyID string `xml:"AccessKeyId"`
SecretAccessKey string `xml:"SecretAccessKey"`
SessionToken string `xml:"SessionToken"`
Expiration time.Time `xml:"Expiration"`
}
// AssumeRoleResponse represents AssumeRole response
//
// See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
type AssumeRoleResponse struct {
AssumeRoleResult struct {
Credentials assumeCredentials
}
AssumeRoleWithWebIdentityResult struct {
Credentials assumeCredentials
}
}
// buildAPIEndpoint creates endpoint for aws api access
func buildAPIEndpoint(customEndpoint, region, service string) string {
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Query-Requests.html
if len(customEndpoint) == 0 {
return fmt.Sprintf("https://%s.%s.amazonaws.com/", service, region)
}
endpoint := customEndpoint
// endpoint may contain only hostname. Convert it to proper url then.
if !strings.Contains(endpoint, "://") {
endpoint = "https://" + endpoint
}
if !strings.HasSuffix(endpoint, "/") {
endpoint += "/"
}
return endpoint
}
// getSTSAPIResponse makes request to aws sts api with roleARN
// and returns temporary credentials with expiration time
//
// See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
func getSTSAPIResponse(action, stsEndpoint, roleARN string, reqBuilder func(apiURL string) (*http.Request, error)) ([]byte, error) {
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Query-Requests.html
apiURL := fmt.Sprintf("%s?Action=%s", stsEndpoint, action)
apiURL += "&Version=2011-06-15"
apiURL += fmt.Sprintf("&RoleArn=%s", roleARN)
// we have to provide unique session name for cloudtrail audit
apiURL += "&RoleSessionName=vmagent-ec2-discovery"
req, err := reqBuilder(apiURL)
if err != nil {
return nil, fmt.Errorf("cannot create signed request: %w", err)
}
resp, err := discoveryutils.GetHTTPClient().Do(req)
if err != nil {
return nil, fmt.Errorf("cannot perform http request to %q: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}
// getEC2APIResponse performs EC2 API request with given action.
func getEC2APIResponse(cfg *apiConfig, action, filters, nextPageToken string) ([]byte, error) {
ac, err := cfg.getFreshAPICredentials()
if err != nil {
return nil, fmt.Errorf("cannot obtain fresh credentials for EC2 API: %w", err)
}
apiURL := fmt.Sprintf("%s?Action=%s", cfg.ec2Endpoint, url.QueryEscape(action))
if len(filters) > 0 {
apiURL += "&" + filters
}
if len(nextPageToken) > 0 {
apiURL += fmt.Sprintf("&NextToken=%s", url.QueryEscape(nextPageToken))
}
apiURL += "&Version=2013-10-15"
req, err := newSignedRequest(apiURL, "ec2", cfg.region, ac)
if err != nil {
return nil, fmt.Errorf("cannot create signed request: %w", err)
}
resp, err := discoveryutils.GetHTTPClient().Do(req)
if err != nil {
return nil, fmt.Errorf("cannot perform http request to %q: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}
func readResponseBody(resp *http.Response, apiURL string) ([]byte, error) {
data, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("cannot read response from %q: %w", apiURL, err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code for %q; got %d; want %d; response body: %q",
apiURL, resp.StatusCode, http.StatusOK, data)
}
return data, nil
}

View file

@ -0,0 +1,65 @@
package ec2
import (
"encoding/xml"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
func getAZMap(cfg *apiConfig) map[string]string {
cfg.azMapLock.Lock()
defer cfg.azMapLock.Unlock()
if cfg.azMap != nil {
return cfg.azMap
}
azs, err := getAvailabilityZones(cfg)
cfg.azMap = make(map[string]string, len(azs))
if err != nil {
logger.Warnf("couldn't load availability zones map, so __meta_ec2_availability_zone_id label isn't set: %s", err)
return cfg.azMap
}
for _, az := range azs {
cfg.azMap[az.ZoneName] = az.ZoneID
}
return cfg.azMap
}
func getAvailabilityZones(cfg *apiConfig) ([]AvailabilityZone, error) {
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeAvailabilityZones.html
data, err := cfg.awsConfig.GetEC2APIResponse("DescribeAvailabilityZones", "")
if err != nil {
return nil, fmt.Errorf("cannot obtain availability zones: %w", err)
}
azr, err := parseAvailabilityZonesResponse(data)
if err != nil {
return nil, fmt.Errorf("cannot parse availability zones list: %w", err)
}
return azr.AvailabilityZoneInfo.Items, nil
}
// AvailabilityZonesResponse represents the response for https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeAvailabilityZones.html
type AvailabilityZonesResponse struct {
AvailabilityZoneInfo AvailabilityZoneInfo `xml:"availabilityZoneInfo"`
}
// AvailabilityZoneInfo represents availabilityZoneInfo for https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeAvailabilityZones.html
type AvailabilityZoneInfo struct {
Items []AvailabilityZone `xml:"item"`
}
// AvailabilityZone represents availabilityZone for https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_AvailabilityZone.html
type AvailabilityZone struct {
ZoneName string `xml:"zoneName"`
ZoneID string `xml:"zoneId"`
}
func parseAvailabilityZonesResponse(data []byte) (*AvailabilityZonesResponse, error) {
var v AvailabilityZonesResponse
if err := xml.Unmarshal(data, &v); err != nil {
return nil, fmt.Errorf("cannot unmarshal DescribeAvailabilityZonesResponse from %q: %w", data, err)
}
return &v, nil
}

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
@ -30,7 +29,7 @@ func getReservations(cfg *apiConfig) ([]Reservation, error) {
var rs []Reservation var rs []Reservation
pageToken := "" pageToken := ""
for { for {
data, err := getEC2APIResponse(cfg, "DescribeInstances", cfg.filters, pageToken) data, err := cfg.awsConfig.GetEC2APIResponse("DescribeInstances", pageToken)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain instances: %w", err) return nil, fmt.Errorf("cannot obtain instances: %w", err)
} }
@ -133,63 +132,6 @@ func parseInstancesResponse(data []byte) (*InstancesResponse, error) {
return &v, nil return &v, nil
} }
func getAZMap(cfg *apiConfig) map[string]string {
cfg.azMapLock.Lock()
defer cfg.azMapLock.Unlock()
if cfg.azMap != nil {
return cfg.azMap
}
azs, err := getAvailabilityZones(cfg)
cfg.azMap = make(map[string]string, len(azs))
if err != nil {
logger.Warnf("couldn't load availability zones map, so __meta_ec2_availability_zone_id label isn't set: %s", err)
return cfg.azMap
}
for _, az := range azs {
cfg.azMap[az.ZoneName] = az.ZoneID
}
return cfg.azMap
}
func getAvailabilityZones(cfg *apiConfig) ([]AvailabilityZone, error) {
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeAvailabilityZones.html
data, err := getEC2APIResponse(cfg, "DescribeAvailabilityZones", "", "")
if err != nil {
return nil, fmt.Errorf("cannot obtain availability zones: %w", err)
}
azr, err := parseAvailabilityZonesResponse(data)
if err != nil {
return nil, fmt.Errorf("cannot parse availability zones list: %w", err)
}
return azr.AvailabilityZoneInfo.Items, nil
}
// AvailabilityZonesResponse represents the response for https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeAvailabilityZones.html
type AvailabilityZonesResponse struct {
AvailabilityZoneInfo AvailabilityZoneInfo `xml:"availabilityZoneInfo"`
}
// AvailabilityZoneInfo represents availabilityZoneInfo for https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeAvailabilityZones.html
type AvailabilityZoneInfo struct {
Items []AvailabilityZone `xml:"item"`
}
// AvailabilityZone represents availabilityZone for https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_AvailabilityZone.html
type AvailabilityZone struct {
ZoneName string `xml:"zoneName"`
ZoneID string `xml:"zoneId"`
}
func parseAvailabilityZonesResponse(data []byte) (*AvailabilityZonesResponse, error) {
var v AvailabilityZonesResponse
if err := xml.Unmarshal(data, &v); err != nil {
return nil, fmt.Errorf("cannot unmarshal DescribeAvailabilityZonesResponse from %q: %w", data, err)
}
return &v, nil
}
func (inst *Instance) appendTargetLabels(ms []map[string]string, ownerID string, port int, azMap map[string]string) []map[string]string { func (inst *Instance) appendTargetLabels(ms []map[string]string, ownerID string, port int, azMap map[string]string) []map[string]string {
if len(inst.PrivateIPAddress) == 0 { if len(inst.PrivateIPAddress) == 0 {
// Cannot scrape instance without private IP address // Cannot scrape instance without private IP address