diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 71b952103..50a078a5c 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -143,10 +143,11 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh * if `zone` arg is missing, then `vmagent` uses the zone for the instance where it runs; * if `zone` arg equals to `"*"`, then `vmagent` discovers all the zones for the given project; * `zone` may contain arbitrary number of zones, i.e. `zone: [us-east1-a, us-east1-b]`. +* `consul_sd_configs` - for scraping targets registered in Consul. + See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details. The following service discovery mechanisms will be added to `vmagent` soon: -* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) * [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 1109fabec..84b85a74d 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -262,6 +262,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la * [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) * [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) * [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) +* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) In the future other `*_sd_config` types will be supported. diff --git a/docs/vmagent.md b/docs/vmagent.md index 71b952103..50a078a5c 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -143,10 +143,11 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh * if `zone` arg is missing, then `vmagent` uses the zone for the instance where it runs; * if `zone` arg equals to `"*"`, then `vmagent` discovers all the zones for the given project; * `zone` may contain arbitrary number of zones, i.e. `zone: [us-east1-a, us-east1-b]`. +* `consul_sd_configs` - for scraping targets registered in Consul. + See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details. The following service discovery mechanisms will be added to `vmagent` soon: -* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) * [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index f41e164e9..5cfaa4051 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes" @@ -62,6 +63,7 @@ type ScrapeConfig struct { StaticConfigs []StaticConfig `yaml:"static_configs"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"` + ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs"` EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs"` GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs"` RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"` @@ -156,6 +158,19 @@ func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork { return dst } +// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg. +func (cfg *Config) getConsulSDScrapeWork() []ScrapeWork { + var dst []ScrapeWork + for i := range cfg.ScrapeConfigs { + sc := &cfg.ScrapeConfigs[i] + for j := range sc.ConsulSDConfigs { + sdc := &sc.ConsulSDConfigs[j] + dst = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc) + } + } + return dst +} + // getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg. func (cfg *Config) getEC2SDScrapeWork() []ScrapeWork { var dst []ScrapeWork @@ -309,6 +324,15 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config") } +func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { + targetLabels, err := consul.GetLabels(sdc, baseDir) + if err != nil { + logger.Errorf("error when discovering consul nodes for `job_name` %q: %s; skipping it", swc.jobName, err) + return dst + } + return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config") +} + func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) []ScrapeWork { targetLabels, err := ec2.GetLabels(sdc) if err != nil { diff --git a/lib/promscrape/discovery/consul/agent.go b/lib/promscrape/discovery/consul/agent.go new file mode 100644 index 000000000..a1f612efb --- /dev/null +++ b/lib/promscrape/discovery/consul/agent.go @@ -0,0 +1,28 @@ +package consul + +import ( + "encoding/json" + "fmt" +) + +// Agent is Consul agent. +// +// See https://www.consul.io/api/agent.html#read-configuration +type Agent struct { + Config AgentConfig +} + +// AgentConfig is Consul agent config. +// +// See https://www.consul.io/api/agent.html#read-configuration +type AgentConfig struct { + Datacenter string +} + +func parseAgent(data []byte) (*Agent, error) { + var a Agent + if err := json.Unmarshal(data, &a); err != nil { + return nil, fmt.Errorf("cannot unmarshal agent info from %q: %s", data, err) + } + return &a, nil +} diff --git a/lib/promscrape/discovery/consul/agent_test.go b/lib/promscrape/discovery/consul/agent_test.go new file mode 100644 index 000000000..cc917d8d9 --- /dev/null +++ b/lib/promscrape/discovery/consul/agent_test.go @@ -0,0 +1,81 @@ +package consul + +import ( + "reflect" + "testing" +) + +func TestParseAgentFailure(t *testing.T) { + f := func(s string) { + t.Helper() + a, err := parseAgent([]byte(s)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if a != nil { + t.Fatalf("unexpected non-nil Agent: %v", a) + } + } + f(``) + f(`[1,23]`) +} + +func TestParseAgentSuccess(t *testing.T) { + data := ` +{ + "Config": { + "Datacenter": "dc1", + "NodeName": "foobar", + "NodeID": "9d754d17-d864-b1d3-e758-f3fe25a9874f", + "Server": true, + "Revision": "deadbeef", + "Version": "1.0.0" + }, + "DebugConfig": { + }, + "Coord": { + "Adjustment": 0, + "Error": 1.5, + "Vec": [0,0,0,0,0,0,0,0] + }, + "Member": { + "Name": "foobar", + "Addr": "10.1.10.12", + "Port": 8301, + "Tags": { + "bootstrap": "1", + "dc": "dc1", + "id": "40e4a748-2192-161a-0510-9bf59fe950b5", + "port": "8300", + "role": "consul", + "vsn": "1", + "vsn_max": "1", + "vsn_min": "1" + }, + "Status": 1, + "ProtocolMin": 1, + "ProtocolMax": 2, + "ProtocolCur": 2, + "DelegateMin": 2, + "DelegateMax": 4, + "DelegateCur": 4 + }, + "Meta": { + "instance_type": "i2.xlarge", + "os_version": "ubuntu_16.04" + } +} +` + a, err := parseAgent([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + aExpected := &Agent{ + Config: AgentConfig{ + Datacenter: "dc1", + }, + } + if !reflect.DeepEqual(a, aExpected) { + t.Fatalf("unexpected Agent parsed;\ngot\n%v\nwant\n%v", a, aExpected) + } +} diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go new file mode 100644 index 000000000..8eff2d8d0 --- /dev/null +++ b/lib/promscrape/discovery/consul/api.go @@ -0,0 +1,135 @@ +package consul + +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +// apiConfig contains config for API server +type apiConfig struct { + client *discoveryutils.Client + tagSeparator string + services []string + tags []string + datacenter string + allowStale bool + nodeMeta map[string]string +} + +var configMap = discoveryutils.NewConfigMap() + +func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { + v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) }) + if err != nil { + return nil, err + } + return v.(*apiConfig), nil +} + +func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { + token, err := getToken(sdc.Token) + if err != nil { + return nil, err + } + var ba *promauth.BasicAuthConfig + if len(sdc.Username) > 0 { + ba = &promauth.BasicAuthConfig{ + Username: sdc.Username, + Password: sdc.Password, + } + token = "" + } + ac, err := promauth.NewConfig(baseDir, ba, token, "", sdc.TLSConfig) + if err != nil { + return nil, fmt.Errorf("cannot parse auth config: %s", err) + } + apiServer := sdc.Server + if apiServer == "" { + apiServer = "localhost:8500" + } + if !strings.Contains(apiServer, "://") { + scheme := sdc.Scheme + if scheme == "" { + scheme = "http" + } + apiServer = scheme + "://" + apiServer + } + client, err := discoveryutils.NewClient(apiServer, ac) + if err != nil { + return nil, fmt.Errorf("cannot create HTTP client for %q: %s", apiServer, err) + } + tagSeparator := "," + if sdc.TagSeparator != nil { + tagSeparator = *sdc.TagSeparator + } + dc, err := getDatacenter(client, sdc.Datacenter) + if err != nil { + return nil, err + } + cfg := &apiConfig{ + client: client, + + tagSeparator: tagSeparator, + services: sdc.Services, + tags: sdc.Tags, + datacenter: dc, + allowStale: sdc.AllowStale, + nodeMeta: sdc.NodeMeta, + } + return cfg, nil +} + +func getToken(token string) (string, error) { + if token != "" { + return token, nil + } + if tokenFile := os.Getenv("CONSUL_HTTP_TOKEN_FILE"); tokenFile != "" { + data, err := ioutil.ReadFile(tokenFile) + if err != nil { + return "", fmt.Errorf("cannot read consul token file %q; probably, `token` arg is missing in `consul_sd_config`? error: %s", tokenFile, err) + } + return string(data), nil + } + token = os.Getenv("CONSUL_HTTP_TOKEN") + // Allow empty token - it shouls work if authorization is disabled in Consul + return token, nil +} + +func getDatacenter(client *discoveryutils.Client, dc string) (string, error) { + if dc != "" { + return dc, nil + } + // See https://www.consul.io/api/agent.html#read-configuration + data, err := client.GetAPIResponse("/v1/agent/self") + if err != nil { + return "", fmt.Errorf("cannot query consul agent info: %s", err) + } + a, err := parseAgent(data) + if err != nil { + return "", err + } + return a.Config.Datacenter, nil +} + +func getAPIResponse(cfg *apiConfig, path string) ([]byte, error) { + separator := "?" + if strings.Contains(path, "?") { + separator = "&" + } + path += fmt.Sprintf("%sdc=%s", separator, url.QueryEscape(cfg.datacenter)) + if cfg.allowStale { + path += "&stale" + } + if len(cfg.nodeMeta) > 0 { + for k, v := range cfg.nodeMeta { + path += fmt.Sprintf("&node-meta=%s", url.QueryEscape(k+":"+v)) + } + } + return cfg.client.GetAPIResponse(path) +} diff --git a/lib/promscrape/discovery/consul/consul.go b/lib/promscrape/discovery/consul/consul.go new file mode 100644 index 000000000..38225ce8c --- /dev/null +++ b/lib/promscrape/discovery/consul/consul.go @@ -0,0 +1,40 @@ +package consul + +import ( + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" +) + +// SDConfig represents service discovery config for Consul. +// +// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config +type SDConfig struct { + Server string `yaml:"server"` + Token string `yaml:"token"` + Datacenter string `yaml:"datacenter"` + Scheme string `yaml:"scheme"` + Username string `yaml:"username"` + Password string `yaml:"password"` + TLSConfig *promauth.TLSConfig `yaml:"tls_config"` + Services []string `yaml:"services"` + Tags []string `yaml:"tags"` + NodeMeta map[string]string `yaml:"node_meta"` + TagSeparator *string `yaml:"tag_separator"` + AllowStale bool `yaml:"allow_stale"` + // RefreshInterval time.Duration `yaml:"refresh_interval"` + // refresh_interval is obtained from `-promscrape.consulSDCheckInterval` command-line option. +} + +// GetLabels returns Consul labels according to sdc. +func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) { + cfg, err := getAPIConfig(sdc, baseDir) + if err != nil { + return nil, fmt.Errorf("cannot get API config: %s", err) + } + ms, err := getServiceNodesLabels(cfg) + if err != nil { + return nil, fmt.Errorf("error when fetching service nodes data from Consul: %s", err) + } + return ms, nil +} diff --git a/lib/promscrape/discovery/consul/service_node.go b/lib/promscrape/discovery/consul/service_node.go new file mode 100644 index 000000000..83141b363 --- /dev/null +++ b/lib/promscrape/discovery/consul/service_node.go @@ -0,0 +1,245 @@ +package consul + +import ( + "encoding/json" + "fmt" + "net/url" + "strconv" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +// getServiceNodesLabels returns labels for Consul service nodes obtained from the given cfg +func getServiceNodesLabels(cfg *apiConfig) ([]map[string]string, error) { + sns, err := getAllServiceNodes(cfg) + if err != nil { + return nil, err + } + var ms []map[string]string + for _, sn := range sns { + ms = sn.appendTargetLabels(ms, cfg.tagSeparator) + } + return ms, nil +} + +func getAllServiceNodes(cfg *apiConfig) ([]ServiceNode, error) { + // Obtain a list of services + // See https://www.consul.io/api/catalog.html#list-services + data, err := getAPIResponse(cfg, "/v1/catalog/services") + if err != nil { + return nil, fmt.Errorf("cannot obtain services: %s", err) + } + var m map[string][]string + if err := json.Unmarshal(data, &m); err != nil { + return nil, fmt.Errorf("cannot parse services response %q: %s", data, err) + } + serviceNames := make(map[string]bool) + for serviceName, tags := range m { + if !shouldCollectServiceByName(cfg.services, serviceName) { + continue + } + if !shouldCollectServiceByTags(cfg.tags, tags) { + continue + } + serviceNames[serviceName] = true + } + + // Query all the serviceNames in parallel + type response struct { + sns []ServiceNode + err error + } + responsesCh := make(chan response, len(serviceNames)) + for serviceName := range serviceNames { + go func(serviceName string) { + sns, err := getServiceNodes(cfg, serviceName) + responsesCh <- response{ + sns: sns, + err: err, + } + }(serviceName) + } + var sns []ServiceNode + err = nil + for i := 0; i < len(serviceNames); i++ { + resp := <-responsesCh + if resp.err != nil && err == nil { + err = resp.err + } + sns = append(sns, resp.sns...) + } + if err != nil { + return nil, err + } + return sns, nil +} + +func shouldCollectServiceByName(filterServices []string, service string) bool { + if len(filterServices) == 0 { + return true + } + for _, filterService := range filterServices { + if filterService == service { + return true + } + } + return false +} + +func shouldCollectServiceByTags(filterTags, tags []string) bool { + if len(filterTags) == 0 { + return true + } + for _, filterTag := range filterTags { + hasTag := false + for _, tag := range tags { + if tag == filterTag { + hasTag = true + break + } + } + if !hasTag { + return false + } + } + return true +} + +func getServiceNodes(cfg *apiConfig, serviceName string) ([]ServiceNode, error) { + // See https://www.consul.io/api/health.html#list-nodes-for-service + path := fmt.Sprintf("/v1/health/service/%s", serviceName) + var tagsArgs []string + for _, tag := range cfg.tags { + tagsArgs = append(tagsArgs, fmt.Sprintf("tag=%s", url.QueryEscape(tag))) + } + if len(tagsArgs) > 0 { + path += "?" + strings.Join(tagsArgs, "&") + } + data, err := getAPIResponse(cfg, path) + if err != nil { + return nil, fmt.Errorf("cannot obtain instances for serviceName=%q: %s", serviceName, err) + } + return parseServiceNodes(data) +} + +// ServiceNode is Consul service node. +// +// See https://www.consul.io/api/health.html#list-nodes-for-service +type ServiceNode struct { + Service Service + Node Node + Checks []Check +} + +// Service is Consul service. +// +// See https://www.consul.io/api/health.html#list-nodes-for-service +type Service struct { + ID string + Service string + Address string + Port int + Tags []string + Meta map[string]string +} + +// Node is Consul node. +// +// See https://www.consul.io/api/health.html#list-nodes-for-service +type Node struct { + Address string + Datacenter string + Node string + Meta map[string]string + TaggedAddresses map[string]string +} + +// Check is Consul check. +// +// See https://www.consul.io/api/health.html#list-nodes-for-service +type Check struct { + CheckID string + Status string +} + +func parseServiceNodes(data []byte) ([]ServiceNode, error) { + var sns []ServiceNode + if err := json.Unmarshal(data, &sns); err != nil { + return nil, fmt.Errorf("cannot unmarshal ServiceNodes from %q: %s", data, err) + } + return sns, nil +} + +func (sn *ServiceNode) appendTargetLabels(ms []map[string]string, tagSeparator string) []map[string]string { + var addr string + if sn.Service.Address != "" { + addr = discoveryutils.JoinHostPort(sn.Service.Address, sn.Service.Port) + } else { + addr = discoveryutils.JoinHostPort(sn.Node.Address, sn.Service.Port) + } + m := map[string]string{ + "__address__": addr, + "__meta_consul_address": sn.Node.Address, + "__meta_consul_dc": sn.Node.Datacenter, + "__meta_consul_health": aggregatedStatus(sn.Checks), + "__meta_consul_node": sn.Node.Node, + "__meta_consul_service": sn.Service.Service, + "__meta_consul_service_address": sn.Service.Address, + "__meta_consul_service_id": sn.Service.ID, + "__meta_consul_service_port": strconv.Itoa(sn.Service.Port), + } + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + m["__meta_consul_tags"] = tagSeparator + strings.Join(sn.Service.Tags, tagSeparator) + tagSeparator + + for k, v := range sn.Node.Meta { + key := discoveryutils.SanitizeLabelName(k) + m["__meta_consul_metadata_"+key] = v + } + for k, v := range sn.Service.Meta { + key := discoveryutils.SanitizeLabelName(k) + m["__meta_consul_service_metadata_"+key] = v + } + for k, v := range sn.Node.TaggedAddresses { + key := discoveryutils.SanitizeLabelName(k) + m["__meta_consul_tagged_address_"+key] = v + } + ms = append(ms, m) + return ms +} + +func aggregatedStatus(checks []Check) string { + // The code has been copy-pasted from HealthChecks.AggregatedStatus in Consul + var passing, warning, critical, maintenance bool + for _, check := range checks { + id := check.CheckID + if id == "_node_maintenance" || strings.HasPrefix(id, "_service_maintenance:") { + maintenance = true + continue + } + + switch check.Status { + case "passing": + passing = true + case "warning": + warning = true + case "critical": + critical = true + default: + return "" + } + } + switch { + case maintenance: + return "maintenance" + case critical: + return "critical" + case warning: + return "warning" + case passing: + return "passing" + default: + return "passing" + } +} diff --git a/lib/promscrape/discovery/consul/service_node_test.go b/lib/promscrape/discovery/consul/service_node_test.go new file mode 100644 index 000000000..0b65a1f20 --- /dev/null +++ b/lib/promscrape/discovery/consul/service_node_test.go @@ -0,0 +1,135 @@ +package consul + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" +) + +func TestParseServiceNodesFailure(t *testing.T) { + f := func(s string) { + t.Helper() + sns, err := parseServiceNodes([]byte(s)) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if sns != nil { + t.Fatalf("unexpected non-nil ServiceNodes: %v", sns) + } + } + f(``) + f(`[1,23]`) + f(`{"items":[{"metadata":1}]}`) +} + +func TestParseServiceNodesSuccess(t *testing.T) { + data := ` +[ + { + "Node": { + "ID": "40e4a748-2192-161a-0510-9bf59fe950b5", + "Node": "foobar", + "Address": "10.1.10.12", + "Datacenter": "dc1", + "TaggedAddresses": { + "lan": "10.1.10.12", + "wan": "10.1.10.12" + }, + "Meta": { + "instance_type": "t2.medium" + } + }, + "Service": { + "ID": "redis", + "Service": "redis", + "Tags": ["primary"], + "Address": "10.1.10.12", + "TaggedAddresses": { + "lan": { + "address": "10.1.10.12", + "port": 8000 + }, + "wan": { + "address": "198.18.1.2", + "port": 80 + } + }, + "Meta": { + "redis_version": "4.0" + }, + "Port": 8000, + "Weights": { + "Passing": 10, + "Warning": 1 + }, + "Namespace": "default" + }, + "Checks": [ + { + "Node": "foobar", + "CheckID": "service:redis", + "Name": "Service 'redis' check", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "redis", + "ServiceName": "redis", + "ServiceTags": ["primary"], + "Namespace": "default" + }, + { + "Node": "foobar", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "", + "ServiceName": "", + "ServiceTags": [], + "Namespace": "default" + } + ] + } +] +` + sns, err := parseServiceNodes([]byte(data)) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(sns) != 1 { + t.Fatalf("unexpected length of ServiceNodes; got %d; want %d", len(sns), 1) + } + sn := sns[0] + + // Check sn.appendTargetLabels() + tagSeparator := "," + labelss := sn.appendTargetLabels(nil, tagSeparator) + var sortedLabelss [][]prompbmarshal.Label + for _, labels := range labelss { + sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels)) + } + expectedLabelss := [][]prompbmarshal.Label{ + discoveryutils.GetSortedLabels(map[string]string{ + "__address__": "10.1.10.12:8000", + "__meta_consul_address": "10.1.10.12", + "__meta_consul_dc": "dc1", + "__meta_consul_health": "passing", + "__meta_consul_metadata_instance_type": "t2.medium", + "__meta_consul_node": "foobar", + "__meta_consul_service": "redis", + "__meta_consul_service_address": "10.1.10.12", + "__meta_consul_service_id": "redis", + "__meta_consul_service_metadata_redis_version": "4.0", + "__meta_consul_service_port": "8000", + "__meta_consul_tagged_address_lan": "10.1.10.12", + "__meta_consul_tagged_address_wan": "10.1.10.12", + "__meta_consul_tags": ",primary,", + }), + } + if !reflect.DeepEqual(sortedLabelss, expectedLabelss) { + t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss) + } +} diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index 1b254821a..142a9e468 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -1,25 +1,17 @@ package kubernetes import ( - "crypto/tls" "fmt" "net" "os" - "strings" - "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" - "github.com/VictoriaMetrics/fasthttp" ) // apiConfig contains config for API server type apiConfig struct { - client *fasthttp.HostClient - server string - hostPort string - authConfig *promauth.Config + client *discoveryutils.Client namespaces []string selectors []Selector } @@ -39,67 +31,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { if err != nil { return nil, fmt.Errorf("cannot parse auth config: %s", err) } - hcv, err := newHostClient(sdc.APIServer, ac) - if err != nil { - return nil, fmt.Errorf("cannot create HTTP client for %q: %s", sdc.APIServer, err) - } - cfg := &apiConfig{ - client: hcv.hc, - server: hcv.apiServer, - hostPort: hcv.hostPort, - authConfig: hcv.ac, - namespaces: sdc.Namespaces.Names, - selectors: sdc.Selectors, - } - return cfg, nil -} - -func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) { - query := joinSelectors(role, cfg.namespaces, cfg.selectors) - if len(query) > 0 { - path += "?" + query - } - requestURL := cfg.server + path - var u fasthttp.URI - u.Update(requestURL) - var req fasthttp.Request - req.SetRequestURIBytes(u.RequestURI()) - req.SetHost(cfg.hostPort) - req.Header.Set("Accept-Encoding", "gzip") - if cfg.authConfig != nil && cfg.authConfig.Authorization != "" { - req.Header.Set("Authorization", cfg.authConfig.Authorization) - } - var resp fasthttp.Response - // There is no need in calling DoTimeout, since the timeout is already set in hc.ReadTimeout above. - if err := cfg.client.Do(&req, &resp); err != nil { - return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err) - } - var data []byte - if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" { - dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body()) - if err != nil { - return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err) - } - data = dst - } else { - data = append(data[:0], resp.Body()...) - } - statusCode := resp.StatusCode() - if statusCode != fasthttp.StatusOK { - return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q", - requestURL, statusCode, fasthttp.StatusOK, data) - } - return data, nil -} - -type hcValue struct { - hc *fasthttp.HostClient - ac *promauth.Config - apiServer string - hostPort string -} - -func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) { + apiServer := sdc.APIServer if len(apiServer) == 0 { // Assume we run at k8s pod. // Discover apiServer and auth config according to k8s docs. @@ -124,36 +56,22 @@ func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) { } ac = acNew } - - var u fasthttp.URI - u.Update(apiServer) - hostPort := string(u.Host()) - isTLS := string(u.Scheme()) == "https" - var tlsCfg *tls.Config - if isTLS && ac != nil { - tlsCfg = ac.NewTLSConfig() + client, err := discoveryutils.NewClient(apiServer, ac) + if err != nil { + return nil, fmt.Errorf("cannot create HTTP client for %q: %s", apiServer, err) } - if !strings.Contains(hostPort, ":") { - port := "80" - if isTLS { - port = "443" - } - hostPort = net.JoinHostPort(hostPort, port) + cfg := &apiConfig{ + client: client, + namespaces: sdc.Namespaces.Names, + selectors: sdc.Selectors, } - hc := &fasthttp.HostClient{ - Addr: hostPort, - Name: "vm_promscrape/discovery", - DialDualStack: netutil.TCP6Enabled(), - IsTLS: isTLS, - TLSConfig: tlsCfg, - ReadTimeout: time.Minute, - WriteTimeout: 10 * time.Second, - MaxResponseBodySize: 300 * 1024 * 1024, - } - return &hcValue{ - hc: hc, - ac: ac, - apiServer: apiServer, - hostPort: hostPort, - }, nil + return cfg, nil +} + +func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) { + query := joinSelectors(role, cfg.namespaces, cfg.selectors) + if len(query) > 0 { + path += "?" + query + } + return cfg.client.GetAPIResponse(path) } diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index f320f82e6..3bdde100c 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -1,8 +1,16 @@ package discoveryutils import ( + "crypto/tls" + "fmt" + "net" "net/http" + "strings" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/fasthttp" ) var defaultClient = &http.Client{ @@ -13,3 +21,81 @@ var defaultClient = &http.Client{ func GetHTTPClient() *http.Client { return defaultClient } + +// Client is http client, which talks to the given apiServer. +type Client struct { + hc *fasthttp.HostClient + ac *promauth.Config + apiServer string + hostPort string +} + +// NewClient returns new Client for the given apiServer and the given ac. +func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { + var u fasthttp.URI + u.Update(apiServer) + hostPort := string(u.Host()) + isTLS := string(u.Scheme()) == "https" + var tlsCfg *tls.Config + if isTLS && ac != nil { + tlsCfg = ac.NewTLSConfig() + } + if !strings.Contains(hostPort, ":") { + port := "80" + if isTLS { + port = "443" + } + hostPort = net.JoinHostPort(hostPort, port) + } + hc := &fasthttp.HostClient{ + Addr: hostPort, + Name: "vm_promscrape/discovery", + DialDualStack: netutil.TCP6Enabled(), + IsTLS: isTLS, + TLSConfig: tlsCfg, + ReadTimeout: time.Minute, + WriteTimeout: 10 * time.Second, + MaxResponseBodySize: 300 * 1024 * 1024, + } + return &Client{ + hc: hc, + ac: ac, + apiServer: apiServer, + hostPort: hostPort, + }, nil +} + +// GetAPIResponse returns response for the given absolute path. +func (c *Client) GetAPIResponse(path string) ([]byte, error) { + requestURL := c.apiServer + path + var u fasthttp.URI + u.Update(requestURL) + var req fasthttp.Request + req.SetRequestURIBytes(u.RequestURI()) + req.SetHost(c.hostPort) + req.Header.Set("Accept-Encoding", "gzip") + if c.ac != nil && c.ac.Authorization != "" { + req.Header.Set("Authorization", c.ac.Authorization) + } + var resp fasthttp.Response + // There is no need in calling DoTimeout, since the timeout is already set in c.hc.ReadTimeout above. + if err := c.hc.Do(&req, &resp); err != nil { + return nil, fmt.Errorf("cannot fetch %q: %s", requestURL, err) + } + var data []byte + if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" { + dst, err := fasthttp.AppendGunzipBytes(nil, resp.Body()) + if err != nil { + return nil, fmt.Errorf("cannot ungzip response from %q: %s", requestURL, err) + } + data = dst + } else { + data = append(data[:0], resp.Body()...) + } + statusCode := resp.StatusCode() + if statusCode != fasthttp.StatusOK { + return nil, fmt.Errorf("unexpected status code returned from %q: %d; expecting %d; response body: %q", + requestURL, statusCode, fasthttp.StatusOK, data) + } + return data, nil +} diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 1853cac44..c93b9d813 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -23,6 +23,9 @@ var ( kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+ "This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details") + consulSDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+ + "This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+ + "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details") ec2SDCheckInterval = flag.Duration("promscrape.ec2SDCheckInterval", time.Minute, "Interval for checking for changes in ec2. "+ "This works only if `ec2_sd_configs` is configured in '-promscrape.config' file. "+ "See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config for details") @@ -72,6 +75,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() }) scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) }) scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork() }) + scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork() }) scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork() }) scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork() })