mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
Add improvements to ec2_sd_discovery (#775)
* Add improvements to ec2 discovery https://github.com/VictoriaMetrics/VictoriaMetrics/issues/771 role_arn support with aws sts instance iam_role support refreshing temporary tokens * Apply suggestions from code review Co-authored-by: Roman Khavronenko <hagen1778@gmail.com> * changed implementation, removed tests, clean up code * moved endpoint builder into getEC2APIResponse Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
This commit is contained in:
parent
1e1a27d803
commit
312fead9a2
6 changed files with 244 additions and 55 deletions
|
@ -135,7 +135,7 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
|
|||
See [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) for details.
|
||||
* `ec2_sd_configs` - for scraping targets in Amazon EC2.
|
||||
See [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) for details.
|
||||
`vmagent` doesn't support `role_arn` config param yet.
|
||||
`vmagent` doesn't support `profile` config param and aws credentials file yet.
|
||||
* `gce_sd_configs` - for scraping targets in Google Compute Engine (GCE).
|
||||
See [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) for details.
|
||||
`vmagent` provides the following additional functionality for `gce_sd_config`:
|
||||
|
|
|
@ -2,29 +2,64 @@ package ec2
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
const (
|
||||
awsAccessKeyEnv = "AWS_ACCESS_KEY_ID"
|
||||
awsSecretKeyEnv = "AWS_SECRET_ACCESS_KEY"
|
||||
)
|
||||
|
||||
type apiConfig struct {
|
||||
endpoint string
|
||||
region string
|
||||
accessKey string
|
||||
secretKey string
|
||||
filters string
|
||||
port int
|
||||
endpoint string
|
||||
region string
|
||||
roleARN string
|
||||
filters string
|
||||
port int
|
||||
// this keys needed,
|
||||
// when we are using temporary credentials
|
||||
defaultAccessKey string
|
||||
defaultSecretKey string
|
||||
apiCredentials atomic.Value
|
||||
}
|
||||
|
||||
// apiCredentials represents aws api credentials
|
||||
type apiCredentials struct {
|
||||
AccessKeyID string `json:"AccessKeyId"`
|
||||
SecretAccessKey string `json:"SecretAccessKey"`
|
||||
Token string `json:"Token"`
|
||||
Expiration time.Time `json:"Expiration"`
|
||||
}
|
||||
|
||||
// stsCredentialsResponse - represents aws sts AssumeRole response
|
||||
// https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
|
||||
type stsCredentialsResponse struct {
|
||||
AssumeRoleResult struct {
|
||||
Credentials struct {
|
||||
AccessKeyID string `xml:"AccessKeyId"`
|
||||
SecretAccessKey string `xml:"SecretAccessKey"`
|
||||
SessionToken string `xml:"SessionToken"`
|
||||
Expiration time.Time `xml:"Expiration"`
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var configMap = discoveryutils.NewConfigMap()
|
||||
|
||||
func getAPIConfig(sdc *SDConfig) (*apiConfig, error) {
|
||||
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc) })
|
||||
v, err := configMap.Get(sdc, func() (interface{}, error) {
|
||||
return newAPIConfig(sdc)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -40,33 +75,42 @@ func newAPIConfig(sdc *SDConfig) (*apiConfig, error) {
|
|||
}
|
||||
region = r
|
||||
}
|
||||
accessKey := sdc.AccessKey
|
||||
if len(accessKey) == 0 {
|
||||
accessKey = os.Getenv("AWS_ACCESS_KEY_ID")
|
||||
if len(accessKey) == 0 {
|
||||
return nil, fmt.Errorf("missing `access_key` in AWS_ACCESS_KEY_ID env var; probably, `access_key` must be set in `ec2_sd_config`?")
|
||||
}
|
||||
}
|
||||
secretKey := sdc.SecretKey
|
||||
if len(secretKey) == 0 {
|
||||
secretKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
|
||||
if len(secretKey) == 0 {
|
||||
return nil, fmt.Errorf("miising `secret_key` in AWS_SECRET_ACCESS_KEY env var; probably, `secret_key` must be set in `ec2_sd_config`?")
|
||||
}
|
||||
}
|
||||
filters := getFiltersQueryString(sdc.Filters)
|
||||
port := 80
|
||||
if sdc.Port != nil {
|
||||
port = *sdc.Port
|
||||
}
|
||||
return &apiConfig{
|
||||
endpoint: sdc.Endpoint,
|
||||
region: region,
|
||||
accessKey: accessKey,
|
||||
secretKey: secretKey,
|
||||
filters: filters,
|
||||
port: port,
|
||||
}, nil
|
||||
cfg := &apiConfig{
|
||||
endpoint: sdc.Endpoint,
|
||||
region: region,
|
||||
roleARN: sdc.RoleARN,
|
||||
filters: filters,
|
||||
port: port,
|
||||
}
|
||||
// explicitly set credentials has priority over env variables
|
||||
cfg.defaultAccessKey = os.Getenv(awsAccessKeyEnv)
|
||||
cfg.defaultSecretKey = os.Getenv(awsSecretKeyEnv)
|
||||
if len(sdc.AccessKey) > 0 {
|
||||
cfg.defaultAccessKey = sdc.AccessKey
|
||||
}
|
||||
if len(sdc.SecretKey) > 0 {
|
||||
cfg.defaultSecretKey = sdc.SecretKey
|
||||
}
|
||||
|
||||
// fast return if credentials are set and there is no roleARN
|
||||
if len(cfg.defaultAccessKey) > 0 && len(cfg.defaultSecretKey) > 0 && len(sdc.RoleARN) == 0 {
|
||||
cfg.apiCredentials.Store(&apiCredentials{
|
||||
AccessKeyID: cfg.defaultAccessKey,
|
||||
SecretAccessKey: cfg.defaultSecretKey,
|
||||
})
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
if err := cfg.refreshAPICredentials(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func getFiltersQueryString(filters []Filter) string {
|
||||
|
@ -93,6 +137,89 @@ func getDefaultRegion() (string, error) {
|
|||
return id.Region, nil
|
||||
}
|
||||
|
||||
// refreshAPICredentials updates expired tokens.
|
||||
func (cfg *apiConfig) refreshAPICredentials() error {
|
||||
|
||||
newAPICredentials := &apiCredentials{
|
||||
AccessKeyID: cfg.defaultAccessKey,
|
||||
SecretAccessKey: cfg.defaultSecretKey,
|
||||
}
|
||||
|
||||
// we need instance credentials
|
||||
// if dont have key and secret
|
||||
if len(newAPICredentials.AccessKeyID) == 0 && len(newAPICredentials.SecretAccessKey) == 0 {
|
||||
|
||||
ac, err := getInstanceRoleCredentials()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newAPICredentials.Token = ac.Token
|
||||
newAPICredentials.SecretAccessKey = ac.SecretAccessKey
|
||||
newAPICredentials.AccessKeyID = ac.AccessKeyID
|
||||
newAPICredentials.Expiration = ac.Expiration
|
||||
|
||||
}
|
||||
|
||||
// read credentials from sts api, if role_arn is defined
|
||||
if cfg.roleARN != "" {
|
||||
|
||||
ac, err := getRoleARNCredentials(cfg.region, cfg.endpoint, cfg.roleARN, newAPICredentials)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get credentials for role_arn: %s: %w", cfg.roleARN, err)
|
||||
}
|
||||
if newAPICredentials.Expiration.IsZero() || ac.Expiration.Before(newAPICredentials.Expiration) {
|
||||
newAPICredentials.Expiration = ac.Expiration
|
||||
}
|
||||
newAPICredentials.AccessKeyID = ac.AccessKeyID
|
||||
newAPICredentials.SecretAccessKey = ac.SecretAccessKey
|
||||
newAPICredentials.Token = ac.Token
|
||||
}
|
||||
|
||||
if len(newAPICredentials.AccessKeyID) == 0 {
|
||||
return fmt.Errorf("missing `access_key`, you can set it with %s env var, "+
|
||||
"directly at `ec2_sd_config` as `access_key` or use instance iam role", awsAccessKeyEnv)
|
||||
}
|
||||
if len(newAPICredentials.SecretAccessKey) == 0 {
|
||||
return fmt.Errorf("missing `secret_key`, you can set it with %s env var,"+
|
||||
"directly at `ec2_sd_config` as `secret_key` or use instance iam role", awsSecretKeyEnv)
|
||||
}
|
||||
|
||||
cfg.apiCredentials.Store(newAPICredentials)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// credentialsExpired - checks if tokens refresh is needed
|
||||
func (cfg *apiConfig) credentialsExpired() bool {
|
||||
ac := cfg.credentials()
|
||||
return !ac.Expiration.IsZero() && time.Since(ac.Expiration) > -5*time.Second
|
||||
}
|
||||
|
||||
// credentials - returns current api credentials
|
||||
func (cfg *apiConfig) credentials() *apiCredentials {
|
||||
return cfg.apiCredentials.Load().(*apiCredentials)
|
||||
}
|
||||
|
||||
// getInstanceRoleCredentials makes request to local ec2 instance metadata service
|
||||
// and tries to retrieve credentials from assigned iam role.
|
||||
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
|
||||
func getInstanceRoleCredentials() (*apiCredentials, error) {
|
||||
instanceRoleName, err := getMetadataByPath("meta-data/iam/security-credentials/")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get instanceRoleName: %w", err)
|
||||
}
|
||||
|
||||
credentials, err := getMetadataByPath("meta-data/iam/security-credentials/" + string(instanceRoleName))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get instanceCredentials with role %s, error: %w", instanceRoleName, err)
|
||||
}
|
||||
ac := &apiCredentials{}
|
||||
if err := json.Unmarshal(credentials, ac); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse instance metadata role response : %w", err)
|
||||
}
|
||||
return ac, nil
|
||||
}
|
||||
|
||||
// IdentityDocument is identity document returned from AWS metadata server.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html
|
||||
|
@ -100,11 +227,10 @@ type IdentityDocument struct {
|
|||
Region string
|
||||
}
|
||||
|
||||
// getMetadataByPath returns instance metadata by url path
|
||||
func getMetadataByPath(apiPath string) ([]byte, error) {
|
||||
// See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
|
||||
|
||||
client := discoveryutils.GetHTTPClient()
|
||||
|
||||
// Obtain session token
|
||||
sessionTokenURL := "http://169.254.169.254/latest/api/token"
|
||||
req, err := http.NewRequest("PUT", sessionTokenURL, nil)
|
||||
|
@ -112,7 +238,7 @@ func getMetadataByPath(apiPath string) ([]byte, error) {
|
|||
return nil, fmt.Errorf("cannot create request for IMDSv2 session token at url %q: %w", sessionTokenURL, err)
|
||||
}
|
||||
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "60")
|
||||
resp, err := client.Do(req)
|
||||
resp, err := discoveryutils.GetHTTPClient().Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain IMDSv2 session token from %q; probably, `region` is missing in `ec2_sd_config`; error: %w", sessionTokenURL, err)
|
||||
}
|
||||
|
@ -128,26 +254,83 @@ func getMetadataByPath(apiPath string) ([]byte, error) {
|
|||
return nil, fmt.Errorf("cannot create request to %q: %w", apiURL, err)
|
||||
}
|
||||
req.Header.Set("X-aws-ec2-metadata-token", string(token))
|
||||
resp, err = client.Do(req)
|
||||
resp, err = discoveryutils.GetHTTPClient().Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain response for %q: %w", apiURL, err)
|
||||
}
|
||||
return readResponseBody(resp, apiURL)
|
||||
}
|
||||
|
||||
func getAPIResponse(cfg *apiConfig, action, nextPageToken string) ([]byte, error) {
|
||||
// getRoleARNCredentials - retrieves credentials from aws sts with role_arn
|
||||
func getRoleARNCredentials(region, endpoint, roleARN string, credentials *apiCredentials) (*apiCredentials, error) {
|
||||
|
||||
data, err := getSTSAPIResponse(region, endpoint, roleARN, credentials)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
scr := &stsCredentialsResponse{}
|
||||
if err := xml.Unmarshal(data, scr); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse sts api response: %w", err)
|
||||
}
|
||||
return &apiCredentials{
|
||||
AccessKeyID: scr.AssumeRoleResult.Credentials.AccessKeyID,
|
||||
SecretAccessKey: scr.AssumeRoleResult.Credentials.SecretAccessKey,
|
||||
Token: scr.AssumeRoleResult.Credentials.SessionToken,
|
||||
Expiration: scr.AssumeRoleResult.Credentials.Expiration,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
// buildAPIEndpoint - creates endpoint for aws api access
|
||||
func buildAPIEndpoint(region, cfgEndpoint, service string) string {
|
||||
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Query-Requests.html
|
||||
endpoint := fmt.Sprintf("https://ec2.%s.amazonaws.com/", cfg.region)
|
||||
if len(cfg.endpoint) > 0 {
|
||||
endpoint = cfg.endpoint
|
||||
apiEndpoint := fmt.Sprintf("https://%s.%s.amazonaws.com/", service, region)
|
||||
if len(cfgEndpoint) > 0 {
|
||||
apiEndpoint = cfgEndpoint
|
||||
// endpoint may contain only hostname. Convert it to proper url then.
|
||||
if !strings.Contains(endpoint, "://") {
|
||||
endpoint = "https://" + endpoint
|
||||
if !strings.Contains(apiEndpoint, "://") {
|
||||
apiEndpoint = "https://" + apiEndpoint
|
||||
}
|
||||
if !strings.HasSuffix(endpoint, "/") {
|
||||
endpoint += "/"
|
||||
if !strings.HasSuffix(apiEndpoint, "/") {
|
||||
apiEndpoint += "/"
|
||||
}
|
||||
}
|
||||
return apiEndpoint
|
||||
}
|
||||
|
||||
// getSTSAPIResponse makes request to aws sts api with role_arn
|
||||
// and returns temporary credentials with expiration time
|
||||
// https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html
|
||||
func getSTSAPIResponse(region, endpoint, roleARN string, credentials *apiCredentials) ([]byte, error) {
|
||||
endpoint = buildAPIEndpoint(region, endpoint, "sts")
|
||||
// See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Query-Requests.html
|
||||
apiURL := fmt.Sprintf("%s?Action=%s", endpoint, "AssumeRole")
|
||||
apiURL += "&Version=2011-06-15"
|
||||
apiURL += fmt.Sprintf("&RoleArn=%s", roleARN)
|
||||
// we have to provide unique session name for cloudtrail audit
|
||||
apiURL += "&RoleSessionName=vmagent-ec2-discovery"
|
||||
req, err := newSignedRequest(apiURL, "sts", region, credentials)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create signed request: %w", err)
|
||||
}
|
||||
resp, err := discoveryutils.GetHTTPClient().Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot perform http request to %q: %w", apiURL, err)
|
||||
}
|
||||
return readResponseBody(resp, apiURL)
|
||||
|
||||
}
|
||||
|
||||
// getEC2APIResponse lists ec2 instances with given action
|
||||
func getEC2APIResponse(cfg *apiConfig, action, nextPageToken string) ([]byte, error) {
|
||||
endpoint := buildAPIEndpoint(cfg.region, cfg.endpoint, "ec2")
|
||||
// refresh api credentials if needed
|
||||
if cfg.credentialsExpired() {
|
||||
if err := cfg.refreshAPICredentials(); err != nil {
|
||||
return nil, fmt.Errorf("failed to update expired aws credentials: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
apiURL := fmt.Sprintf("%s?Action=%s", endpoint, url.QueryEscape(action))
|
||||
if len(cfg.filters) > 0 {
|
||||
apiURL += "&" + cfg.filters
|
||||
|
@ -156,7 +339,7 @@ func getAPIResponse(cfg *apiConfig, action, nextPageToken string) ([]byte, error
|
|||
apiURL += fmt.Sprintf("&NextToken=%s", url.QueryEscape(nextPageToken))
|
||||
}
|
||||
apiURL += "&Version=2013-10-15"
|
||||
req, err := newSignedRequest(apiURL, "ec2", cfg.region, cfg.accessKey, cfg.secretKey)
|
||||
req, err := newSignedRequest(apiURL, "ec2", cfg.region, cfg.credentials())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create signed request: %w", err)
|
||||
}
|
||||
|
|
|
@ -12,9 +12,9 @@ type SDConfig struct {
|
|||
Endpoint string `yaml:"endpoint"`
|
||||
AccessKey string `yaml:"access_key"`
|
||||
SecretKey string `yaml:"secret_key"`
|
||||
Profile string `yaml:"profile"`
|
||||
// TODO: add support for RoleARN
|
||||
// RoleARN string `yaml:"role_arn"`
|
||||
// TODO add support for Profile, not working atm
|
||||
Profile string `yaml:"profile"`
|
||||
RoleARN string `yaml:"role_arn"`
|
||||
// RefreshInterval time.Duration `yaml:"refresh_interval"`
|
||||
// refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option.
|
||||
Port *int `yaml:"port"`
|
||||
|
|
|
@ -29,7 +29,7 @@ func getReservations(cfg *apiConfig) ([]Reservation, error) {
|
|||
var rs []Reservation
|
||||
pageToken := ""
|
||||
for {
|
||||
data, err := getAPIResponse(cfg, action, pageToken)
|
||||
data, err := getEC2APIResponse(cfg, action, pageToken)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain instances: %w", err)
|
||||
}
|
||||
|
|
|
@ -16,12 +16,12 @@ import (
|
|||
// newSignedRequest signed request for apiURL according to aws signature algorithm.
|
||||
//
|
||||
// See the algorithm at https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
|
||||
func newSignedRequest(apiURL, service, region, accessKey, secretKey string) (*http.Request, error) {
|
||||
func newSignedRequest(apiURL, service, region string, credentials *apiCredentials) (*http.Request, error) {
|
||||
t := time.Now().UTC()
|
||||
return newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey, t)
|
||||
return newSignedRequestWithTime(apiURL, service, region, credentials, t)
|
||||
}
|
||||
|
||||
func newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey string, t time.Time) (*http.Request, error) {
|
||||
func newSignedRequestWithTime(apiURL, service, region string, credentials *apiCredentials, t time.Time) (*http.Request, error) {
|
||||
uri, err := url.Parse(apiURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse %q: %w", apiURL, err)
|
||||
|
@ -57,11 +57,11 @@ func newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey stri
|
|||
stringToSign := strings.Join(tmp, "\n")
|
||||
|
||||
// Calculate the signature
|
||||
signingKey := getSignatureKey(secretKey, datestamp, region, service)
|
||||
signingKey := getSignatureKey(credentials.SecretAccessKey, datestamp, region, service)
|
||||
signature := hmacHex(signingKey, stringToSign)
|
||||
|
||||
// Calculate autheader
|
||||
authHeader := fmt.Sprintf("%s Credential=%s/%s, SignedHeaders=%s, Signature=%s", algorithm, accessKey, credentialScope, signedHeaders, signature)
|
||||
authHeader := fmt.Sprintf("%s Credential=%s/%s, SignedHeaders=%s, Signature=%s", algorithm, credentials.AccessKeyID, credentialScope, signedHeaders, signature)
|
||||
|
||||
req, err := http.NewRequest("GET", apiURL, nil)
|
||||
if err != nil {
|
||||
|
@ -69,6 +69,10 @@ func newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey stri
|
|||
}
|
||||
req.Header.Set("x-amz-date", amzdate)
|
||||
req.Header.Set("Authorization", authHeader)
|
||||
if credentials.Token != "" {
|
||||
req.Header.Set("X-Amz-Security-Token", credentials.Token)
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -10,10 +10,12 @@ func TestNewSignedRequest(t *testing.T) {
|
|||
t.Helper()
|
||||
service := "ec2"
|
||||
region := "us-east-1"
|
||||
accessKey := "fake-access-key"
|
||||
secretKey := "foobar"
|
||||
ac := &apiCredentials{
|
||||
AccessKeyID: "fake-access-key",
|
||||
SecretAccessKey: "foobar",
|
||||
}
|
||||
ct := time.Unix(0, 0).UTC()
|
||||
req, err := newSignedRequestWithTime(apiURL, service, region, accessKey, secretKey, ct)
|
||||
req, err := newSignedRequestWithTime(apiURL, service, region, ac, ct)
|
||||
if err != nil {
|
||||
t.Fatalf("error in newSignedRequest: %s", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue