From 6e5ac32fbadb40092cebb09c115e349a40aec867 Mon Sep 17 00:00:00 2001 From: Igor Tiunov <igortiunov@gmail.com> Date: Thu, 4 Aug 2022 20:44:16 +0300 Subject: [PATCH] YC service discovery (#2923) * YC service discovery https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1386 * Fixed linter suggestions * fixed golint errors --- README.md | 2 + app/vmagent/README.md | 2 + docs/README.md | 1 + docs/sd_configs.md | 46 +++ docs/vmagent.md | 3 + lib/promscrape/config.go | 29 ++ lib/promscrape/discovery/yandexcloud/api.go | 272 ++++++++++++++++++ .../discovery/yandexcloud/instance.go | 81 ++++++ .../discovery/yandexcloud/instance_test.go | 182 ++++++++++++ .../discovery/yandexcloud/resources.go | 179 ++++++++++++ .../discovery/yandexcloud/yandexcloud.go | 177 ++++++++++++ lib/promscrape/scraper.go | 2 + 12 files changed, 976 insertions(+) create mode 100644 docs/sd_configs.md create mode 100644 lib/promscrape/discovery/yandexcloud/api.go create mode 100644 lib/promscrape/discovery/yandexcloud/instance.go create mode 100644 lib/promscrape/discovery/yandexcloud/instance_test.go create mode 100644 lib/promscrape/discovery/yandexcloud/resources.go create mode 100644 lib/promscrape/discovery/yandexcloud/yandexcloud.go diff --git a/README.md b/README.md index f7833e1d0b..0cfef66c36 100644 --- a/README.md +++ b/README.md @@ -2117,6 +2117,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Whether to suppress scrape errors logging. The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed. See also -promscrape.suppressScrapeErrorsDelay -promscrape.suppressScrapeErrorsDelay duration The delay for suppressing repeated scrape errors logging per each scrape targets. This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors + -promscrape.yandexcloudSDCheckInterval duration + Interval for checking for changes in Yandex Cloud. This works only if yandexcloud_sd_configs is configured in '-promscrape.config' file. (default 30s) -pushmetrics.extraLabel array Optional labels to add to metrics pushed to -pushmetrics.url . For example, -pushmetrics.extraLabel='instance="foo"' adds instance="foo" label to all the metrics pushed to -pushmetrics.url Supports an array of values separated by comma or specified via multiple flags. diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 25297e4279..08901dbdba 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -1079,6 +1079,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Whether to suppress scrape errors logging. The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed. See also -promscrape.suppressScrapeErrorsDelay -promscrape.suppressScrapeErrorsDelay duration The delay for suppressing repeated scrape errors logging per each scrape targets. This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors + -promscrape.yandexcloudSDCheckInterval duration + Interval for checking for changes in Yandex Cloud. This works only if yandexcloud_sd_configs is configured in '-promscrape.config' file. (default 30s) -pushmetrics.extraLabel array Optional labels to add to metrics pushed to -pushmetrics.url . For example, -pushmetrics.extraLabel='instance="foo"' adds instance="foo" label to all the metrics pushed to -pushmetrics.url Supports an array of values separated by comma or specified via multiple flags. diff --git a/docs/README.md b/docs/README.md index f7833e1d0b..16edc70a3c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -326,6 +326,7 @@ VictoriaMetrics can be used as drop-in replacement for Prometheus for scraping t * [eureka_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config) * [digitalocean_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config) * [http_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config) +* [yandexcloud_sd_config](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/sd_configs.md#yandex_cloud_service_discovery_config) File a [feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need support for other `*_sd_config` types. diff --git a/docs/sd_configs.md b/docs/sd_configs.md new file mode 100644 index 0000000000..b420658a3f --- /dev/null +++ b/docs/sd_configs.md @@ -0,0 +1,46 @@ +## Yandex Cloud Service Discovery Configs + +Yandex Cloud SD configurations allow retrieving scrape targets from accessible folders. + +Only compute instances currently supported and the following meta labels are available on targets during relabeling: + +* `__meta_yandexcloud_instance_name`: the name of instance +* `__meta_yandexcloud_instance_id`: the id of instance +* `__meta_yandexcloud_instance_fqdn`: generated FQDN for instance +* `__meta_yandexcloud_instance_status`: the status of instance +* `__meta_yandexcloud_instance_platform_id`: instance platform ID (i.e. "standard-v3") +* `__meta_yandexcloud_instance_resources_cores`: instance vCPU cores +* `__meta_yandexcloud_instance_resources_core_fraction`: instance core fraction +* `__meta_yandexcloud_instance_resources_memory`: instance memory +* `__meta_yandexcloud_folder_id`: instance folder ID +* `__meta_yandexcloud_instance_label_<label name>`: each label from instance +* `__meta_yandexcloud_instance_private_ip_<interface index>`: private IP of <interface index> network interface +* `__meta_yandexcloud_instance_public_ip_<interface index>`: public (NAT) IP of <interface index> network interface +* `__meta_yandexcloud_instance_private_dns_<record number>`: if configured DNS records for private IP +* `__meta_yandexcloud_instance_public_dns_<record number>`: if configured DNS records for public IP + +Yandex Cloud SD support both user [OAuth token](https://cloud.yandex.com/en-ru/docs/iam/concepts/authorization/oauth-token) and [instance service account](https://cloud.yandex.com/en-ru/docs/compute/operations/vm-connect/auth-inside-vm) if OAuth is omitted. + +```yaml +--- +global: + scrape_interval: 10s + +scrape_configs: + - job_name: YC_with_oauth + yandexcloud_sd_configs: + - service: "compute" + yandex_passport_oauth_token: "AQAAAAAsfasah<...>7E10SaotuL0" + relabel_configs: + - source_labels: [__meta_yandexcloud_instance_public_ip_0] + target_label: __address__ + replacement: "$1:9100" + + - job_name: YC_with_Instance_service_account + yandexcloud_sd_configs: + - service: "compute" + relabel_configs: + - source_labels: [__meta_yandexcloud_instance_private_ip_0] + target_label: __address__ + replacement: "$1:9100" +``` \ No newline at end of file diff --git a/docs/vmagent.md b/docs/vmagent.md index 72e16b1e11..462b4f23de 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -180,6 +180,7 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh * `eureka_sd_configs` is for discovering and scraping targets registered in [Netflix Eureka](https://github.com/Netflix/eureka). See [eureka_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config) for details. * `digitalocean_sd_configs` is for discovering and scraping targerts registered in [DigitalOcean](https://www.digitalocean.com/). See [digitalocean_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config) for details. * `http_sd_configs` is for discovering and scraping targerts provided by external http-based service discovery. See [http_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config) for details. +* `yandexcloud_sd_configs` is for discovering and scraping targets registered in [Yandex Cloud](https://cloud.yandex.ru/). See [yandexcloud_sd_configs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/sd_configs.md#yandex_cloud_service_discovery_configs) for details. Note that `vmagent` doesn't support `refresh_interval` option for these scrape configs. Use the corresponding `-promscrape.*CheckInterval` command-line flag instead. For example, `-promscrape.consulSDCheckInterval=60s` sets `refresh_interval` for all the `consul_sd_configs` @@ -1083,6 +1084,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . Whether to suppress scrape errors logging. The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed. See also -promscrape.suppressScrapeErrorsDelay -promscrape.suppressScrapeErrorsDelay duration The delay for suppressing repeated scrape errors logging per each scrape targets. This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors + -promscrape.yandexcloudSDCheckInterval duration + Interval for checking for changes in Yandex Cloud. This works only if yandexcloud_sd_configs is configured in '-promscrape.config' file. (default 30s) -pushmetrics.extraLabel array Optional labels to add to metrics pushed to -pushmetrics.url . For example, -pushmetrics.extraLabel='instance="foo"' adds instance="foo" label to all the metrics pushed to -pushmetrics.url Supports an array of values separated by comma or specified via multiple flags. diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 3a9df2fc16..efedc29f3a 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -33,6 +33,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/metrics" @@ -261,6 +262,7 @@ type ScrapeConfig struct { KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs,omitempty"` OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs,omitempty"` StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"` + YandexCloudSDConfigs []yandexcloud.SDConfig `yaml:"yandexcloud_sd_configs,omitempty"` // These options are supported only by lib/promscrape. RelabelDebug bool `yaml:"relabel_debug,omitempty"` @@ -824,6 +826,33 @@ func (cfg *Config) getOpenStackSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { return dst } +// getYandexCloudSDScrapeWork returns `yandexcloud_sd_configs` ScrapeWork from cfg. +func (cfg *Config) getYandexCloudSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork { + swsPrevByJob := getSWSByJob(prev) + dst := make([]*ScrapeWork, 0, len(prev)) + for _, sc := range cfg.ScrapeConfigs { + dstLen := len(dst) + ok := true + for j := range sc.YandexCloudSDConfigs { + sdc := &sc.YandexCloudSDConfigs[j] + var okLocal bool + dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "yandexcloud_sd_config") + if ok { + ok = okLocal + } + } + if ok { + continue + } + swsPrev := swsPrevByJob[sc.swc.jobName] + if len(swsPrev) > 0 { + logger.Errorf("there were errors when discovering yandexcloud targets for job %q, so preserving the previous targets", sc.swc.jobName) + dst = append(dst[:dstLen], swsPrev...) + } + } + return dst +} + // getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg. func (cfg *Config) getStaticScrapeWork() []*ScrapeWork { var dst []*ScrapeWork diff --git a/lib/promscrape/discovery/yandexcloud/api.go b/lib/promscrape/discovery/yandexcloud/api.go new file mode 100644 index 0000000000..f84ec61fc7 --- /dev/null +++ b/lib/promscrape/discovery/yandexcloud/api.go @@ -0,0 +1,272 @@ +package yandexcloud + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +const ( + defaultInstanceCredsEndpoint = "http://169.254.169.254/latest/meta-data/iam/security-credentials/default" + defaultAPIEndpoint = "https://api.cloud.yandex.net" + defaultAPIVersion = "v1" +) + +var configMap = discoveryutils.NewConfigMap() + +type apiCredentials struct { + Token string `json:"Token"` + Expiration time.Time `json:"Expiration"` +} + +// yandexPassportOAuth is a struct for Yandex Cloud IAM token request +// https://cloud.yandex.com/en-ru/docs/iam/operations/iam-token/create +type yandexPassportOAuth struct { + YandexPassportOAuthToken string `json:"yandexPassportOauthToken"` +} + +// iamToken Yandex Cloud IAM token response +// https://cloud.yandex.com/en-ru/docs/iam/operations/iam-token/create +type iamToken struct { + IAMToken string `json:"iamToken"` + ExpiresAt time.Time `json:"expiresAt"` +} + +type apiConfig struct { + client *http.Client + tokenLock sync.Mutex + creds *apiCredentials + yandexPassportOAuth *yandexPassportOAuth + serviceEndpoints map[string]*url.URL +} + +func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { + v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) }) + if err != nil { + return nil, err + } + return v.(*apiConfig), nil +} + +func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { + cfg := &apiConfig{ + client: &http.Client{ + Transport: &http.Transport{ + MaxIdleConnsPerHost: 100, + }, + }, + } + if sdc.TLSConfig != nil { + opts := &promauth.Options{ + BaseDir: baseDir, + TLSConfig: sdc.TLSConfig, + } + ac, err := opts.NewConfig() + if err != nil { + return nil, err + } + cfg.client.Transport = &http.Transport{ + TLSClientConfig: ac.NewTLSConfig(), + MaxIdleConnsPerHost: 100, + } + } + + if err := cfg.getEndpoints(sdc.APIEndpoint); err != nil { + return nil, err + } + + if sdc.YandexPassportOAuthToken != nil { + logger.Infof("Using yandex passport OAuth token") + + cfg.yandexPassportOAuth = &yandexPassportOAuth{ + YandexPassportOAuthToken: sdc.YandexPassportOAuthToken.String(), + } + } + + return cfg, nil +} + +// getFreshAPICredentials checks token lifetime and update if needed +func (cfg *apiConfig) getFreshAPICredentials() (*apiCredentials, error) { + cfg.tokenLock.Lock() + defer cfg.tokenLock.Unlock() + + if cfg.creds != nil && time.Until(cfg.creds.Expiration) > 10*time.Second { + // Credentials aren't expired yet. + return cfg.creds, nil + } + + newCreds, err := getCreds(cfg) + if err != nil { + return nil, fmt.Errorf("cannot refresh service account api token: %w", err) + } + cfg.creds = newCreds + + logger.Infof("successfully refreshed service account api token; expiration: %.3f seconds", time.Until(newCreds.Expiration).Seconds()) + + return newCreds, nil +} + +// getCreds get Yandex Cloud IAM token based on configuration +func getCreds(cfg *apiConfig) (*apiCredentials, error) { + if cfg.yandexPassportOAuth == nil { + return getInstanceCreds(cfg) + } + + it, err := getIAMToken(cfg) + if err != nil { + return nil, err + } + + return &apiCredentials{ + Token: it.IAMToken, + Expiration: it.ExpiresAt, + }, nil +} + +// getInstanceCreds gets Yandex Cloud IAM token using instance Service Account +// https://cloud.yandex.com/en-ru/docs/compute/operations/vm-connect/auth-inside-vm +func getInstanceCreds(cfg *apiConfig) (*apiCredentials, error) { + resp, err := cfg.client.Get(defaultInstanceCredsEndpoint) + if err != nil { + return nil, fmt.Errorf("failed query security credentials api, url: %s, err: %w", defaultInstanceCredsEndpoint, err) + } + r, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("cannot read response from %q: %w", defaultInstanceCredsEndpoint, err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("auth failed, bad status code: %d, want: 200", resp.StatusCode) + } + + var ac apiCredentials + if err := json.Unmarshal(r, &ac); err != nil { + return nil, fmt.Errorf("cannot parse auth credentials response: %w", err) + } + + return &ac, nil +} + +// getIAMToken gets Yandex Cloud IAM token using OAuth: +// https://cloud.yandex.com/en-ru/docs/iam/operations/iam-token/create +func getIAMToken(cfg *apiConfig) (*iamToken, error) { + iamURL := *cfg.serviceEndpoints["iam"] + iamURL.Path = path.Join(iamURL.Path, "iam", defaultAPIVersion, "tokens") + + passport, err := json.Marshal(cfg.yandexPassportOAuth) + if err != nil { + return nil, fmt.Errorf("failed marshall yandex passport OAuth token, err: %w", err) + } + + resp, err := cfg.client.Post(iamURL.String(), "application/json", bytes.NewBuffer(passport)) + if err != nil { + return nil, fmt.Errorf("failed query yandex cloud iam api, url: %s, err: %w", iamURL.String(), err) + } + + r, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("cannot read response from %q: %w", iamURL.String(), err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("auth failed, bad status code: %d, want: 200", resp.StatusCode) + } + + it := iamToken{} + if err := json.Unmarshal(r, &it); err != nil { + return nil, fmt.Errorf("cannot parse auth credentials response: %w", err) + } + + return &it, nil +} + +// getEndpoints makes services endpoints map: +// https://cloud.yandex.com/en-ru/docs/api-design-guide/concepts/endpoints +func (cfg *apiConfig) getEndpoints(apiEndpoint string) error { + if apiEndpoint == "" { + apiEndpoint = defaultAPIEndpoint + } + + apiEndpointURL, err := url.Parse(apiEndpoint) + if err != nil { + return fmt.Errorf("cannot parse api_endpoint: %s as url, err: %w", apiEndpoint, err) + } + + apiEndpointURL.Path = path.Join(apiEndpointURL.Path, "endpoints") + + resp, err := cfg.client.Get(apiEndpointURL.String()) + if err != nil { + return fmt.Errorf("failed query endpoints, url: %s, err: %w", apiEndpointURL.String(), err) + } + r, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return fmt.Errorf("cannot read response from %q: %w", apiEndpointURL.String(), err) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("auth failed, bad status code: %d, want: 200", resp.StatusCode) + } + + endpoints, err := parseEndpoints(r) + if err != nil { + return err + } + + cfg.serviceEndpoints = make(map[string]*url.URL, len(endpoints.Endpoints)) + for _, endpoint := range endpoints.Endpoints { + cfg.serviceEndpoints[endpoint.ID] = &url.URL{ + Scheme: apiEndpointURL.Scheme, + Host: endpoint.Address, + } + } + + return nil +} + +// readResponseBody reads body from http.Response. +func readResponseBody(resp *http.Response, apiURL string) ([]byte, error) { + data, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("cannot read response from %q: %w", apiURL, err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code for %q; got %d; want %d; response body: %q", + apiURL, resp.StatusCode, http.StatusOK, data) + } + + return data, nil +} + +// getAPIResponse calls Yandex Cloud apiURL and returns response body. +func getAPIResponse(apiURL string, cfg *apiConfig) ([]byte, error) { + creds, err := cfg.getFreshAPICredentials() + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", apiURL, nil) + if err != nil { + return nil, fmt.Errorf("cannot create new request for yandex cloud api url %s: %w", apiURL, err) + } + + req.Header.Set("Authorization", "Bearer "+creds.Token) + resp, err := cfg.client.Do(req) + if err != nil { + return nil, fmt.Errorf("cannot query yandex cloud api url %s: %w", apiURL, err) + } + + return readResponseBody(resp, apiURL) +} diff --git a/lib/promscrape/discovery/yandexcloud/instance.go b/lib/promscrape/discovery/yandexcloud/instance.go new file mode 100644 index 0000000000..c3ca4541c4 --- /dev/null +++ b/lib/promscrape/discovery/yandexcloud/instance.go @@ -0,0 +1,81 @@ +package yandexcloud + +import ( + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +func getInstancesLabels(cfg *apiConfig) ([]map[string]string, error) { + organizations, err := cfg.getOrganizations() + if err != nil { + return nil, err + } + clouds, err := cfg.getClouds(organizations) + if err != nil { + return nil, err + } + folders, err := cfg.getFolders(clouds) + if err != nil { + return nil, err + } + + var instances []instance + for _, fld := range folders { + inst, err := cfg.getInstances(fld.ID) + if err != nil { + return nil, err + } + + instances = append(instances, inst...) + } + + logger.Infof("Collected %d instances", len(instances)) + + return addInstanceLabels(instances), nil +} + +func addInstanceLabels(instances []instance) []map[string]string { + var ms []map[string]string + for _, server := range instances { + m := map[string]string{ + "__address__": server.FQDN, + "__meta_yandexcloud_instance_name": server.Name, + "__meta_yandexcloud_instance_fqdn": server.FQDN, + "__meta_yandexcloud_instance_id": server.ID, + "__meta_yandexcloud_instance_status": server.Status, + "__meta_yandexcloud_instance_platform_id": server.PlatformID, + "__meta_yandexcloud_instance_resources_cores": server.Resources.Cores, + "__meta_yandexcloud_instance_resources_core_fraction": server.Resources.CoreFraction, + "__meta_yandexcloud_instance_resources_memory": server.Resources.Memory, + "__meta_yandexcloud_folder_id": server.FolderID, + } + for k, v := range server.Labels { + m["__meta_yandexcloud_instance_label_"+discoveryutils.SanitizeLabelName(k)] = v + } + + for _, netInterface := range server.NetworkInterfaces { + privateIPLabel := fmt.Sprintf("__meta_yandexcloud_instance_private_ip_%s", netInterface.Index) + m[privateIPLabel] = netInterface.PrimaryV4Address.Address + if len(netInterface.PrimaryV4Address.OneToOneNat.Address) > 0 { + publicIPLabel := fmt.Sprintf("__meta_yandexcloud_instance_public_ip_%s", netInterface.Index) + m[publicIPLabel] = netInterface.PrimaryV4Address.OneToOneNat.Address + } + + for j, dnsRecord := range netInterface.PrimaryV4Address.DNSRecords { + dnsRecordLabel := fmt.Sprintf("__meta_yandexcloud_instance_private_dns_%d", j) + m[dnsRecordLabel] = dnsRecord.FQDN + } + + for j, dnsRecord := range netInterface.PrimaryV4Address.OneToOneNat.DNSRecords { + dnsRecordLabel := fmt.Sprintf("__meta_yandexcloud_instance_public_dns_%d", j) + m[dnsRecordLabel] = dnsRecord.FQDN + } + } + + ms = append(ms, m) + } + + return ms +} diff --git a/lib/promscrape/discovery/yandexcloud/instance_test.go b/lib/promscrape/discovery/yandexcloud/instance_test.go new file mode 100644 index 0000000000..c0468ddc21 --- /dev/null +++ b/lib/promscrape/discovery/yandexcloud/instance_test.go @@ -0,0 +1,182 @@ +package yandexcloud + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +func Test_addInstanceLabels(t *testing.T) { + type args struct { + instances []instance + } + tests := []struct { + name string + args args + want [][]prompbmarshal.Label + }{ + { + name: "empty_response", + args: args{}, + }, + { + name: "one_server", + args: args{ + instances: []instance{ + { + Name: "server-1", + ID: "test", + FQDN: "server-1.ru-central1.internal", + FolderID: "test", + Status: "RUNNING", + PlatformID: "s2.micro", + Resources: resources{ + Cores: "2", + CoreFraction: "20", + Memory: "4", + }, + NetworkInterfaces: []networkInterface{ + { + Index: "0", + PrimaryV4Address: primaryV4Address{ + Address: "192.168.1.1", + }, + }, + }, + }, + }, + }, + want: [][]prompbmarshal.Label{ + discoveryutils.GetSortedLabels(map[string]string{ + "__address__": "server-1.ru-central1.internal", + "__meta_yandexcloud_instance_name": "server-1", + "__meta_yandexcloud_instance_fqdn": "server-1.ru-central1.internal", + "__meta_yandexcloud_instance_id": "test", + "__meta_yandexcloud_instance_status": "RUNNING", + "__meta_yandexcloud_instance_platform_id": "s2.micro", + "__meta_yandexcloud_instance_resources_cores": "2", + "__meta_yandexcloud_instance_resources_core_fraction": "20", + "__meta_yandexcloud_instance_resources_memory": "4", + "__meta_yandexcloud_folder_id": "test", + "__meta_yandexcloud_instance_private_ip_0": "192.168.1.1", + }), + }, + }, + { + name: "with_public_ip", + args: args{ + instances: []instance{ + { + Name: "server-1", + ID: "test", + FQDN: "server-1.ru-central1.internal", + FolderID: "test", + Status: "RUNNING", + PlatformID: "s2.micro", + Resources: resources{ + Cores: "2", + CoreFraction: "20", + Memory: "4", + }, + NetworkInterfaces: []networkInterface{ + { + Index: "0", + PrimaryV4Address: primaryV4Address{ + Address: "192.168.1.1", + OneToOneNat: oneToOneNat{ + Address: "1.1.1.1", + }, + }, + }, + }, + }, + }, + }, + want: [][]prompbmarshal.Label{ + discoveryutils.GetSortedLabels(map[string]string{ + "__address__": "server-1.ru-central1.internal", + "__meta_yandexcloud_instance_fqdn": "server-1.ru-central1.internal", + "__meta_yandexcloud_instance_name": "server-1", + "__meta_yandexcloud_instance_id": "test", + "__meta_yandexcloud_instance_status": "RUNNING", + "__meta_yandexcloud_instance_platform_id": "s2.micro", + "__meta_yandexcloud_instance_resources_cores": "2", + "__meta_yandexcloud_instance_resources_core_fraction": "20", + "__meta_yandexcloud_instance_resources_memory": "4", + "__meta_yandexcloud_folder_id": "test", + "__meta_yandexcloud_instance_private_ip_0": "192.168.1.1", + "__meta_yandexcloud_instance_public_ip_0": "1.1.1.1", + }), + }, + }, + { + name: "with_dns_record", + args: args{ + instances: []instance{ + { + Name: "server-1", + ID: "test", + FQDN: "server-1.ru-central1.internal", + FolderID: "test", + Status: "RUNNING", + PlatformID: "s2.micro", + Resources: resources{ + Cores: "2", + CoreFraction: "20", + Memory: "4", + }, + NetworkInterfaces: []networkInterface{ + { + Index: "0", + PrimaryV4Address: primaryV4Address{ + Address: "192.168.1.1", + OneToOneNat: oneToOneNat{ + Address: "1.1.1.1", + DNSRecords: []dnsRecord{ + {FQDN: "server-1.example.com"}, + }, + }, + DNSRecords: []dnsRecord{ + {FQDN: "server-1.example.local"}, + }, + }, + }, + }, + }, + }, + }, + want: [][]prompbmarshal.Label{ + discoveryutils.GetSortedLabels(map[string]string{ + "__address__": "server-1.ru-central1.internal", + "__meta_yandexcloud_instance_name": "server-1", + "__meta_yandexcloud_instance_fqdn": "server-1.ru-central1.internal", + "__meta_yandexcloud_instance_id": "test", + "__meta_yandexcloud_instance_status": "RUNNING", + "__meta_yandexcloud_instance_platform_id": "s2.micro", + "__meta_yandexcloud_instance_resources_cores": "2", + "__meta_yandexcloud_instance_resources_core_fraction": "20", + "__meta_yandexcloud_instance_resources_memory": "4", + "__meta_yandexcloud_folder_id": "test", + "__meta_yandexcloud_instance_private_ip_0": "192.168.1.1", + "__meta_yandexcloud_instance_public_ip_0": "1.1.1.1", + "__meta_yandexcloud_instance_private_dns_0": "server-1.example.local", + "__meta_yandexcloud_instance_public_dns_0": "server-1.example.com", + }), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := addInstanceLabels(tt.args.instances) + var sortedLabelss [][]prompbmarshal.Label + for _, labels := range got { + sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels)) + } + if !reflect.DeepEqual(sortedLabelss, tt.want) { + t.Errorf("addInstanceLabels() = \n got: %v,\nwant: %v", sortedLabelss, tt.want) + } + }) + } +} diff --git a/lib/promscrape/discovery/yandexcloud/resources.go b/lib/promscrape/discovery/yandexcloud/resources.go new file mode 100644 index 0000000000..d65f663d2c --- /dev/null +++ b/lib/promscrape/discovery/yandexcloud/resources.go @@ -0,0 +1,179 @@ +package yandexcloud + +import ( + "encoding/json" + "errors" + "fmt" + "time" +) + +type endpoint struct { + ID string `json:"id"` + Address string `json:"address"` +} + +type endpoints struct { + Endpoints []endpoint `json:"endpoints"` +} + +// See https://cloud.yandex.com/en-ru/docs/api-design-guide/concepts/endpoints +func parseEndpoints(data []byte) (*endpoints, error) { + var endpointsResponse endpoints + if err := json.Unmarshal(data, &endpointsResponse); err != nil { + return nil, fmt.Errorf("cannot parse endpoints list: %w", err) + } + + if endpointsResponse.Endpoints == nil { + return nil, errors.New("yandex cloud API endpoints list is empty") + } + + return &endpointsResponse, nil +} + +type organization struct { + Name string `json:"name"` + ID string `json:"id"` + Labels map[string]string `json:"labels"` + Title string `json:"title"` + Description string `json:"description"` + CreatedAt time.Time `json:"createdAt"` +} + +type organizationsPage struct { + Organizations []organization `json:"organizations"` + NextPageToken string `json:"nextPageToken"` +} + +// See https://cloud.yandex.com/en-ru/docs/organization/api-ref/Organization/list +func parseOrganizationsPage(data []byte) (*organizationsPage, error) { + var page organizationsPage + if err := json.Unmarshal(data, &page); err != nil { + return nil, fmt.Errorf("cannot parse organizations page: %w", err) + } + + if page.Organizations == nil { + page.Organizations = make([]organization, 0) + } + + return &page, nil +} + +type cloud struct { + Name string `json:"name"` + ID string `json:"id"` + Labels map[string]string `json:"labels"` + OrganizationID string `json:"organizationId"` + Description string `json:"description"` + CreatedAt time.Time `json:"createdAt"` +} + +type cloudsPage struct { + Clouds []cloud `json:"clouds"` + NextPageToken string `json:"nextPageToken"` +} + +// See https://cloud.yandex.com/en-ru/docs/resource-manager/api-ref/Cloud/list +func parseCloudsPage(data []byte) (*cloudsPage, error) { + var page cloudsPage + if err := json.Unmarshal(data, &page); err != nil { + return nil, fmt.Errorf("cannot parse clouds page: %w", err) + } + + if page.Clouds == nil { + page.Clouds = make([]cloud, 0) + } + + return &page, nil +} + +type folder struct { + Name string `json:"name"` + ID string `json:"id"` + CloudID string `json:"cloudId"` + Description string `json:"description"` + Status string `json:"status"` + Labels map[string]string `json:"labels"` + CreatedAt time.Time `json:"createdAt"` +} + +type foldersPage struct { + Folders []folder `json:"folders"` + NextPageToken string `json:"nextPageToken"` +} + +// See https://cloud.yandex.com/en-ru/docs/resource-manager/api-ref/Folder/list +func parseFoldersPage(data []byte) (*foldersPage, error) { + var page foldersPage + if err := json.Unmarshal(data, &page); err != nil { + return nil, fmt.Errorf("cannot parse folders page: %w", err) + } + + if page.Folders == nil { + page.Folders = make([]folder, 0) + } + + return &page, nil +} + +type dnsRecord struct { + FQDN string `json:"fqdn"` + DNSZoneID string `json:"dnsZoneId"` + TTL string `json:"ttl"` + PTR bool `json:"ptr"` +} + +type oneToOneNat struct { + Address string `json:"address"` + IPVersion string `json:"ipVersion"` + DNSRecords []dnsRecord `json:"dnsRecords"` +} + +type primaryV4Address struct { + Address string `json:"address"` + OneToOneNat oneToOneNat `json:"oneToOneNat"` + DNSRecords []dnsRecord `json:"dnsRecords"` +} + +type networkInterface struct { + Index string `json:"index"` + MacAddress string `json:"macAddress"` + SubnetID string `json:"subnetId"` + PrimaryV4Address primaryV4Address `json:"primaryV4Address"` +} + +type resources struct { + Cores string `json:"cores"` + CoreFraction string `json:"coreFraction"` + Memory string `json:"memory"` +} + +type instance struct { + ID string `json:"id"` + Name string `json:"name"` + FQDN string `json:"fqdn"` + Status string `json:"status"` + FolderID string `json:"folderId"` + PlatformID string `json:"platformId"` + Resources resources `json:"resources"` + NetworkInterfaces []networkInterface `json:"networkInterfaces"` + Labels map[string]string `json:"labels,omitempty"` +} + +type instancesPage struct { + Instances []instance `json:"instances"` + NextPageToken string `json:"nextPageToken"` +} + +// See https://cloud.yandex.com/en-ru/docs/compute/api-ref/Instance/list +func parseInstancesPage(data []byte) (*instancesPage, error) { + var page instancesPage + if err := json.Unmarshal(data, &page); err != nil { + return nil, fmt.Errorf("cannot parse instances page: %w", err) + } + + if page.Instances == nil { + page.Instances = make([]instance, 0) + } + + return &page, nil +} diff --git a/lib/promscrape/discovery/yandexcloud/yandexcloud.go b/lib/promscrape/discovery/yandexcloud/yandexcloud.go new file mode 100644 index 0000000000..6542cd24fe --- /dev/null +++ b/lib/promscrape/discovery/yandexcloud/yandexcloud.go @@ -0,0 +1,177 @@ +package yandexcloud + +import ( + "flag" + "fmt" + "path" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" +) + +// SDCheckInterval defines interval for targets refresh. +var SDCheckInterval = flag.Duration("promscrape.yandexcloudSDCheckInterval", 30*time.Second, "Interval for checking for changes in Yandex Cloud API. "+ + "This works only if yandexcloud_sd_configs is configured in '-promscrape.config' file.") + +// SDConfig is the configuration for Yandex Cloud service discovery. +type SDConfig struct { + Service string `yaml:"service"` + YandexPassportOAuthToken *promauth.Secret `yaml:"yandex_passport_oauth_token,omitempty"` + APIEndpoint string `yaml:"api_endpoint,omitempty"` + TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` +} + +// GetLabels returns labels for Yandex Cloud according to service discover config. +func (sdc *SDConfig) GetLabels(baseDir string) ([]map[string]string, error) { + cfg, err := getAPIConfig(sdc, baseDir) + if err != nil { + return nil, fmt.Errorf("cannot get API config: %w", err) + } + switch sdc.Service { + case "compute": + return getInstancesLabels(cfg) + default: + return nil, fmt.Errorf("unexpected `service`: %q; only `compute` supported yet; skipping it", sdc.Service) + } +} + +func (cfg *apiConfig) getInstances(folderID string) ([]instance, error) { + computeURL := *cfg.serviceEndpoints["compute"] + computeURL.Path = path.Join(computeURL.Path, "compute", defaultAPIVersion, "instances") + q := computeURL.Query() + q.Set("folderId", folderID) + computeURL.RawQuery = q.Encode() + nextLink := computeURL.String() + + instances := make([]instance, 0) + for { + resp, err := getAPIResponse(nextLink, cfg) + if err != nil { + return nil, err + } + instancesPage, err := parseInstancesPage(resp) + if err != nil { + return nil, err + } + instances = append(instances, instancesPage.Instances...) + if len(instancesPage.NextPageToken) == 0 { + return instances, nil + } + + q.Set("pageToken", instancesPage.NextPageToken) + computeURL.RawQuery = q.Encode() + nextLink = computeURL.String() + } +} + +func (cfg *apiConfig) getFolders(clouds []cloud) ([]folder, error) { + rmURL := *cfg.serviceEndpoints["resource-manager"] + rmURL.Path = path.Join(rmURL.Path, "resource-manager", defaultAPIVersion, "folders") + q := rmURL.Query() + + folders := make([]folder, 0) + for _, cl := range clouds { + q.Set("cloudId", cl.ID) + rmURL.RawQuery = q.Encode() + + nextLink := rmURL.String() + for { + resp, err := getAPIResponse(nextLink, cfg) + if err != nil { + return nil, err + } + + foldersPage, err := parseFoldersPage(resp) + if err != nil { + return nil, err + } + + folders = append(folders, foldersPage.Folders...) + + if len(foldersPage.NextPageToken) == 0 { + break + } + + q.Set("pageToken", foldersPage.NextPageToken) + rmURL.RawQuery = q.Encode() + nextLink = rmURL.String() + } + } + + return folders, nil +} + +func (cfg *apiConfig) getClouds(organizations []organization) ([]cloud, error) { + rmURL := *cfg.serviceEndpoints["resource-manager"] + rmURL.Path = path.Join(rmURL.Path, "resource-manager", defaultAPIVersion, "clouds") + q := rmURL.Query() + + if len(organizations) == 0 { + organizations = append(organizations, organization{ + ID: "", + }) + } + + clouds := make([]cloud, 0) + for _, org := range organizations { + if org.ID != "" { + q.Set("organizationId", org.ID) + rmURL.RawQuery = q.Encode() + } + + nextLink := rmURL.String() + for { + resp, err := getAPIResponse(nextLink, cfg) + if err != nil { + return nil, err + } + + cloudsPage, err := parseCloudsPage(resp) + if err != nil { + return nil, err + } + + clouds = append(clouds, cloudsPage.Clouds...) + + if len(cloudsPage.NextPageToken) == 0 { + break + } + + q.Set("pageToken", cloudsPage.NextPageToken) + rmURL.RawQuery = q.Encode() + nextLink = rmURL.String() + } + } + + return clouds, nil +} + +func (cfg *apiConfig) getOrganizations() ([]organization, error) { + omURL := *cfg.serviceEndpoints["organization-manager"] + omURL.Path = path.Join(omURL.Path, "organization-manager", defaultAPIVersion, "organizations") + q := omURL.Query() + nextLink := omURL.String() + + organizations := make([]organization, 0) + for { + resp, err := getAPIResponse(nextLink, cfg) + if err != nil { + return nil, err + } + + organizationsPage, err := parseOrganizationsPage(resp) + if err != nil { + return nil, err + } + + organizations = append(organizations, organizationsPage.Organizations...) + + if len(organizationsPage.NextPageToken) == 0 { + return organizations, nil + } + + q.Set("pageToken", organizationsPage.NextPageToken) + omURL.RawQuery = q.Encode() + nextLink = omURL.String() + } +} diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index d72edad5d1..eb60d63a04 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -24,6 +24,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud" "github.com/VictoriaMetrics/metrics" ) @@ -124,6 +125,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) scs.add("http_sd_configs", *http.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getHTTPDScrapeWork(swsPrev) }) scs.add("kubernetes_sd_configs", *kubernetes.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) }) scs.add("openstack_sd_configs", *openstack.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) }) + scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) }) scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() }) var tickerCh <-chan time.Time