mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
parent
2eb27ddb22
commit
56b84140a9
19 changed files with 870 additions and 69 deletions
|
@ -1351,6 +1351,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Wait time used by Consul service discovery. Default value is used if not set
|
||||
-promscrape.consulSDCheckInterval duration
|
||||
Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs for details (default 30s)
|
||||
-promscrape.consulagentSDCheckInterval duration
|
||||
Interval for checking for changes in Consul Agent. This works only if consulagent_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs for details (default 30s)
|
||||
-promscrape.digitaloceanSDCheckInterval duration
|
||||
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#digitalocean_sd_configs for details (default 1m0s)
|
||||
-promscrape.disableCompression
|
||||
|
|
|
@ -30,6 +30,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager.html): add `created_at` field to the output of `/api/v1/backups` API and `vmbackupmanager backup list` command. See this [doc](https://docs.victoriametrics.com/vmbackupmanager.html#api-methods) for data format details.
|
||||
* FEATURE: deprecate `-bigMergeConcurrency` command-line flag, since improper configuration for this flag frequently led to uncontrolled growth of unmerged parts, which, in turn, could lead to queries slowdown and increased CPU usage. The concurrency for [background merges](https://docs.victoriametrics.com/#storage) can be controlled via `-smallMergeConcurrency` command-line flag, though it isn't recommended to do in general case.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support new filtering options `filter` and `node_filter` for [consul service discovery](https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4183) for details.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support new [consulagent service discovery](https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3953) for details.
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add support for the different time formats for `--vm-native-filter-time-start` and `--vm-native-filter-time-end` flags if the native binary protocol is used for migration. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4091).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): integrate WITH template playground. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3811).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add a comparison of data from the previous day with data from the current day to the `Cardinality Explorer`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3967).
|
||||
|
|
|
@ -18,6 +18,7 @@ aliases:
|
|||
|
||||
* `azure_sd_configs` is for scraping the targets registered in [Azure Cloud](https://azure.microsoft.com/en-us/). See [these docs](#azure_sd_configs).
|
||||
* `consul_sd_configs` is for discovering and scraping targets registered in [Consul](https://www.consul.io/). See [these docs](#consul_sd_configs).
|
||||
* `consulagent_sd_configs` is for discovering and scraping targets registered in [Consul Agent](https://developer.hashicorp.com/consul/api-docs/agent/service). See [these docs](#consulagent_sd_configs).
|
||||
* `digitalocean_sd_configs` is for discovering and scraping targerts registered in [DigitalOcean](https://www.digitalocean.com/). See [these docs](#digitalocean_sd_configs).
|
||||
* `dns_sd_configs` is for discovering and scraping targets from [DNS](https://it.wikipedia.org/wiki/Domain_Name_System) records (SRV, A and AAAA). See [these docs](#dns_sd_configs).
|
||||
* `docker_sd_configs` is for discovering and scraping [Docker](https://www.docker.com/) targets. See [these docs](#docker_sd_configs).
|
||||
|
@ -203,6 +204,89 @@ The following meta labels are available on discovered targets during [relabeling
|
|||
|
||||
The list of discovered Consul targets is refreshed at the interval, which can be configured via `-promscrape.consulSDCheckInterval` command-line flag.
|
||||
|
||||
If you have performance issues with consul_sd_configs on a large cluster, then consider using [consulagent_sd_configs](#consulagent_sd_configs) instead.
|
||||
|
||||
## consulagent_sd_configs
|
||||
|
||||
Consul Agent SD configuration allows retrieving scrape targets from [Consul's Agent API](https://developer.hashicorp.com/consul/api-docs/agent/service).
|
||||
When using the Agent API, each running vmagent will only get services registered in the local Consul Agent running on the same node when discovering new targets.
|
||||
It's suitable for huge clusters for which using the [Catalog API](https://developer.hashicorp.com/consul/api-docs/catalog#list-services) would be too slow or resource intensive,
|
||||
in other cases we recommend to use [consul_sd_configs](#consul_sd_configs).
|
||||
|
||||
Configuration example:
|
||||
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- job_name: consul
|
||||
consulagent_sd_configs:
|
||||
|
||||
# server is an optional Consul agent to connect to. By default localhost:8500 is used
|
||||
- server: "localhost:8500"
|
||||
|
||||
# token is an optional Consul API token.
|
||||
# If the token isn't specified, then it is read from a file pointed by CONSUL_HTTP_TOKEN_FILE
|
||||
# environment var or from the CONSUL_HTTP_TOKEN environment var.
|
||||
# token: "..."
|
||||
|
||||
# datacenter is an optional Consul API datacenter.
|
||||
# If the datacenter isn't specified, then it is read from Consul server.
|
||||
# See https://www.consul.io/api-docs/agent#read-configuration
|
||||
# datacenter: "..."
|
||||
|
||||
# namespace is an optional Consul namespace.
|
||||
# See https://developer.hashicorp.com/consul/docs/enterprise/namespaces
|
||||
# If the namespace isn't specified, then it is read from CONSUL_NAMESPACE environment var.
|
||||
# namespace: "..."
|
||||
|
||||
# scheme is an optional scheme (http or https) to use for connecting to Consul server.
|
||||
# By default http scheme is used.
|
||||
# scheme: "..."
|
||||
|
||||
# services is an optional list of services for which targets are retrieved.
|
||||
# If omitted, all services are scraped.
|
||||
# See https://www.consul.io/api-docs/catalog#list-nodes-for-service .
|
||||
# services: ["...", "..."]
|
||||
|
||||
# tag_separator is an optional string by which Consul tags are joined into the __meta_consul_tags label.
|
||||
# By default "," is used as a tag separator.
|
||||
# Individual tags are also available via __meta_consul_tag_<tagname> labels - see below.
|
||||
# tag_separator: "..."
|
||||
|
||||
# filter is optional filter for service nodes discovery request.
|
||||
# Replaces tags and node_metadata options.
|
||||
# consul supports it since 1.14 version
|
||||
# list of supported filters https://developer.hashicorp.com/consul/api-docs/catalog#filtering-1
|
||||
# syntax examples https://developer.hashicorp.com/consul/api-docs/features/filtering
|
||||
# filter: "..."
|
||||
|
||||
# Additional HTTP API client options can be specified here.
|
||||
# See https://docs.victoriametrics.com/sd_configs.html#http-api-client-options
|
||||
```
|
||||
|
||||
Each discovered target has an [`__address__`](https://docs.victoriametrics.com/relabeling.html#how-to-modify-scrape-urls-in-targets) label set
|
||||
to `<service_or_node_addr>:<service_port>`, where `<service_or_node_addr>` is the service address. If the service address is empty,
|
||||
then the node address is used instead. The `<service_port>` is the service port.
|
||||
|
||||
The following meta labels are available on discovered targets during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling):
|
||||
|
||||
* `__meta_consulagent_address`: the address of the target
|
||||
* `__meta_consulagent_dc`: the datacenter name for the target
|
||||
* `__meta_consulagent_health`: the health status of the service
|
||||
* `__meta_consulagent_metadata_<key>`: each node metadata key value of the target
|
||||
* `__meta_consulagent_namespace`: namespace of the service - see [namespace docs](https://developer.hashicorp.com/consul/docs/enterprise/namespaces)
|
||||
* `__meta_consulagent_node`: the node name defined for the target
|
||||
* `__meta_consulagent_service_address`: the service address of the target
|
||||
* `__meta_consulagent_service_id`: the service ID of the target
|
||||
* `__meta_consulagent_service_metadata_<key>`: each service metadata key value of the target
|
||||
* `__meta_consulagent_service_port`: the service port of the target
|
||||
* `__meta_consulagent_service`: the name of the service the target belongs to
|
||||
* `__meta_consulagent_tagged_address_<key>`: each node tagged address key value of the target
|
||||
* `__meta_consulagent_tag_<tagname>`: the value for the given <tagname> tag of the target
|
||||
* `__meta_consulagent_tagpresent_<tagname>`: "true" for every <tagname> tag of the target
|
||||
* `__meta_consulagent_tags`: the list of tags of the target joined by the `tag_separator`
|
||||
|
||||
The list of discovered Consul Agent targets is refreshed at the interval, which can be configured via `-promscrape.consulagentSDCheckInterval` command-line flag.
|
||||
|
||||
## digitalocean_sd_configs
|
||||
|
||||
DigitalOcean SD configuration allows retrieving scrape targets from [DigitalOcean's Droplets API](https://docs.digitalocean.com/reference/api/api-reference/#tag/Droplets).
|
||||
|
|
|
@ -1355,6 +1355,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Wait time used by Consul service discovery. Default value is used if not set
|
||||
-promscrape.consulSDCheckInterval duration
|
||||
Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consul_sd_configs for details (default 30s)
|
||||
-promscrape.consulagentSDCheckInterval duration
|
||||
Interval for checking for changes in Consul Agent. This works only if consulagent_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs for details (default 30s)
|
||||
-promscrape.digitaloceanSDCheckInterval duration
|
||||
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#digitalocean_sd_configs for details (default 1m0s)
|
||||
-promscrape.disableCompression
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consulagent"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
|
||||
|
@ -255,6 +256,7 @@ type ScrapeConfig struct {
|
|||
|
||||
AzureSDConfigs []azure.SDConfig `yaml:"azure_sd_configs,omitempty"`
|
||||
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs,omitempty"`
|
||||
ConsulAgentSDConfigs []consulagent.SDConfig `yaml:"consulagent_sd_configs,omitempty"`
|
||||
DigitaloceanSDConfigs []digitalocean.SDConfig `yaml:"digitalocean_sd_configs,omitempty"`
|
||||
DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs,omitempty"`
|
||||
DockerSDConfigs []docker.SDConfig `yaml:"docker_sd_configs,omitempty"`
|
||||
|
@ -307,6 +309,9 @@ func (sc *ScrapeConfig) mustStop() {
|
|||
for i := range sc.ConsulSDConfigs {
|
||||
sc.ConsulSDConfigs[i].MustStop()
|
||||
}
|
||||
for i := range sc.ConsulAgentSDConfigs {
|
||||
sc.ConsulAgentSDConfigs[i].MustStop()
|
||||
}
|
||||
for i := range sc.DigitaloceanSDConfigs {
|
||||
sc.DigitaloceanSDConfigs[i].MustStop()
|
||||
}
|
||||
|
@ -547,6 +552,33 @@ func (cfg *Config) getConsulSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|||
return dst
|
||||
}
|
||||
|
||||
// getConsulAgentSDScrapeWork returns `consulagent_sd_configs` ScrapeWork from cfg.
|
||||
func (cfg *Config) getConsulAgentSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
||||
swsPrevByJob := getSWSByJob(prev)
|
||||
dst := make([]*ScrapeWork, 0, len(prev))
|
||||
for _, sc := range cfg.ScrapeConfigs {
|
||||
dstLen := len(dst)
|
||||
ok := true
|
||||
for j := range sc.ConsulAgentSDConfigs {
|
||||
sdc := &sc.ConsulAgentSDConfigs[j]
|
||||
var okLocal bool
|
||||
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "consulagent_sd_config")
|
||||
if ok {
|
||||
ok = okLocal
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
swsPrev := swsPrevByJob[sc.swc.jobName]
|
||||
if len(swsPrev) > 0 {
|
||||
logger.Errorf("there were errors when discovering consulagent targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
||||
dst = append(dst[:dstLen], swsPrev...)
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// getDigitalOceanDScrapeWork returns `digitalocean_sd_configs` ScrapeWork from cfg.
|
||||
func (cfg *Config) getDigitalOceanDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
||||
swsPrevByJob := getSWSByJob(prev)
|
||||
|
|
|
@ -10,6 +10,8 @@ import (
|
|||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
type Agent struct {
|
||||
Config AgentConfig
|
||||
Member AgentMember
|
||||
Meta map[string]string
|
||||
}
|
||||
|
||||
// AgentConfig is Consul agent config.
|
||||
|
@ -17,9 +19,18 @@ type Agent struct {
|
|||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
type AgentConfig struct {
|
||||
Datacenter string
|
||||
NodeName string
|
||||
}
|
||||
|
||||
func parseAgent(data []byte) (*Agent, error) {
|
||||
// AgentMember is Consul agent member info.
|
||||
//
|
||||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
type AgentMember struct {
|
||||
Addr string
|
||||
}
|
||||
|
||||
// ParseAgent parses Consul agent information from bytes.
|
||||
func ParseAgent(data []byte) (*Agent, error) {
|
||||
var a Agent
|
||||
if err := json.Unmarshal(data, &a); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal agent info from %q: %w", data, err)
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
func TestParseAgentFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
a, err := parseAgent([]byte(s))
|
||||
a, err := ParseAgent([]byte(s))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
|
@ -66,13 +66,21 @@ func TestParseAgentSuccess(t *testing.T) {
|
|||
}
|
||||
}
|
||||
`
|
||||
a, err := parseAgent([]byte(data))
|
||||
a, err := ParseAgent([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
aExpected := &Agent{
|
||||
Config: AgentConfig{
|
||||
Datacenter: "dc1",
|
||||
NodeName: "foobar",
|
||||
},
|
||||
Member: AgentMember{
|
||||
Addr: "10.1.10.12",
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"instance_type": "i2.xlarge",
|
||||
"os_version": "ubuntu_16.04",
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(a, aExpected) {
|
||||
|
|
|
@ -39,7 +39,7 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
|
||||
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
hcc := sdc.HTTPClientConfig
|
||||
token, err := getToken(sdc.Token)
|
||||
token, err := GetToken(sdc.Token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -107,7 +107,8 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
return cfg, nil
|
||||
}
|
||||
|
||||
func getToken(token *promauth.Secret) (string, error) {
|
||||
// GetToken returns Consul token.
|
||||
func GetToken(token *promauth.Secret) (string, error) {
|
||||
if token != nil {
|
||||
return token.String(), nil
|
||||
}
|
||||
|
@ -123,20 +124,29 @@ func getToken(token *promauth.Secret) (string, error) {
|
|||
return t, nil
|
||||
}
|
||||
|
||||
// GetAgentInfo returns information about current consul agent.
|
||||
func GetAgentInfo(client *discoveryutils.Client) (*Agent, error) {
|
||||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
data, err := client.GetAPIResponse("/v1/agent/self")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot query consul agent info: %w", err)
|
||||
}
|
||||
a, err := ParseAgent(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func getDatacenter(client *discoveryutils.Client, dc string) (string, error) {
|
||||
if dc != "" {
|
||||
return dc, nil
|
||||
}
|
||||
// See https://www.consul.io/api/agent.html#read-configuration
|
||||
data, err := client.GetAPIResponse("/v1/agent/self")
|
||||
agent, err := GetAgentInfo(client)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot query consul agent info: %w", err)
|
||||
}
|
||||
a, err := parseAgent(data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return a.Config.Datacenter, nil
|
||||
return agent.Config.Datacenter, nil
|
||||
}
|
||||
|
||||
// maxWaitTime is duration for consul blocking request.
|
||||
|
|
|
@ -35,14 +35,24 @@ type ServiceNode struct {
|
|||
//
|
||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||
type Service struct {
|
||||
ID string
|
||||
Service string
|
||||
Address string
|
||||
Namespace string
|
||||
Partition string
|
||||
Port int
|
||||
Tags []string
|
||||
Meta map[string]string
|
||||
ID string
|
||||
Service string
|
||||
Address string
|
||||
Namespace string
|
||||
Partition string
|
||||
Port int
|
||||
Tags []string
|
||||
Meta map[string]string
|
||||
TaggedAddresses map[string]ServiceTaggedAddress
|
||||
Datacenter string
|
||||
}
|
||||
|
||||
// ServiceTaggedAddress is Consul service.
|
||||
//
|
||||
// See https://www.consul.io/api/health.html#list-nodes-for-service
|
||||
type ServiceTaggedAddress struct {
|
||||
Address string
|
||||
Port int
|
||||
}
|
||||
|
||||
// Node is Consul node.
|
||||
|
@ -64,7 +74,8 @@ type Check struct {
|
|||
Status string
|
||||
}
|
||||
|
||||
func parseServiceNodes(data []byte) ([]ServiceNode, error) {
|
||||
// ParseServiceNodes return parsed slice of ServiceNode by data.
|
||||
func ParseServiceNodes(data []byte) ([]ServiceNode, error) {
|
||||
var sns []ServiceNode
|
||||
if err := json.Unmarshal(data, &sns); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal ServiceNodes from %q: %w", data, err)
|
||||
|
@ -83,7 +94,7 @@ func (sn *ServiceNode) appendTargetLabels(ms []*promutils.Labels, serviceName, t
|
|||
m.Add("__address__", addr)
|
||||
m.Add("__meta_consul_address", sn.Node.Address)
|
||||
m.Add("__meta_consul_dc", sn.Node.Datacenter)
|
||||
m.Add("__meta_consul_health", aggregatedStatus(sn.Checks))
|
||||
m.Add("__meta_consul_health", AggregatedStatus(sn.Checks))
|
||||
m.Add("__meta_consul_namespace", sn.Service.Namespace)
|
||||
m.Add("__meta_consul_partition", sn.Service.Partition)
|
||||
m.Add("__meta_consul_node", sn.Node.Node)
|
||||
|
@ -91,27 +102,8 @@ func (sn *ServiceNode) appendTargetLabels(ms []*promutils.Labels, serviceName, t
|
|||
m.Add("__meta_consul_service_address", sn.Service.Address)
|
||||
m.Add("__meta_consul_service_id", sn.Service.ID)
|
||||
m.Add("__meta_consul_service_port", strconv.Itoa(sn.Service.Port))
|
||||
// We surround the separated list with the separator as well. This way regular expressions
|
||||
// in relabeling rules don't have to consider tag positions.
|
||||
m.Add("__meta_consul_tags", tagSeparator+strings.Join(sn.Service.Tags, tagSeparator)+tagSeparator)
|
||||
|
||||
// Expose individual tags via __meta_consul_tag_* labels, so users could move all the tags
|
||||
// into the discovered scrape target with the following relabeling rule in the way similar to kubernetes_sd_configs:
|
||||
//
|
||||
// - action: labelmap
|
||||
// regex: __meta_consul_tag_(.+)
|
||||
//
|
||||
// This solves https://stackoverflow.com/questions/44339461/relabeling-in-prometheus
|
||||
for _, tag := range sn.Service.Tags {
|
||||
k := tag
|
||||
v := ""
|
||||
if n := strings.IndexByte(tag, '='); n >= 0 {
|
||||
k = tag[:n]
|
||||
v = tag[n+1:]
|
||||
}
|
||||
m.Add(discoveryutils.SanitizeLabelName("__meta_consul_tag_"+k), v)
|
||||
m.Add(discoveryutils.SanitizeLabelName("__meta_consul_tagpresent_"+k), "true")
|
||||
}
|
||||
discoveryutils.AddTagsToLabels(m, sn.Service.Tags, "__meta_consul_", tagSeparator)
|
||||
|
||||
for k, v := range sn.Node.Meta {
|
||||
m.Add(discoveryutils.SanitizeLabelName("__meta_consul_metadata_"+k), v)
|
||||
|
@ -126,7 +118,8 @@ func (sn *ServiceNode) appendTargetLabels(ms []*promutils.Labels, serviceName, t
|
|||
return ms
|
||||
}
|
||||
|
||||
func aggregatedStatus(checks []Check) string {
|
||||
// AggregatedStatus returns aggregated status of service node checks.
|
||||
func AggregatedStatus(checks []Check) string {
|
||||
// The code has been copy-pasted from HealthChecks.AggregatedStatus in Consul
|
||||
var passing, warning, critical, maintenance bool
|
||||
for _, check := range checks {
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
func TestParseServiceNodesFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
sns, err := parseServiceNodes([]byte(s))
|
||||
sns, err := ParseServiceNodes([]byte(s))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ func TestParseServiceNodesSuccess(t *testing.T) {
|
|||
}
|
||||
]
|
||||
`
|
||||
sns, err := parseServiceNodes([]byte(data))
|
||||
sns, err := ParseServiceNodes([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
|
|
@ -235,7 +235,7 @@ func (cw *consulWatcher) getBlockingServiceNames(index int64) ([]string, int64,
|
|||
}
|
||||
serviceNames := make([]string, 0, len(m))
|
||||
for serviceName, tags := range m {
|
||||
if !shouldCollectServiceByName(cw.watchServices, serviceName) {
|
||||
if !ShouldCollectServiceByName(cw.watchServices, serviceName) {
|
||||
continue
|
||||
}
|
||||
if !shouldCollectServiceByTags(cw.watchTags, tags) {
|
||||
|
@ -265,7 +265,7 @@ func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulWatcher, initWG
|
|||
// Nothing changed.
|
||||
return
|
||||
}
|
||||
sns, err := parseServiceNodes(data)
|
||||
sns, err := ParseServiceNodes(data)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse Consul serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
return
|
||||
|
@ -307,7 +307,8 @@ func (cw *consulWatcher) getServiceNodesSnapshot() map[string][]ServiceNode {
|
|||
return sns
|
||||
}
|
||||
|
||||
func shouldCollectServiceByName(filterServices []string, serviceName string) bool {
|
||||
// ShouldCollectServiceByName returns true if the given serviceName must be collected (present in filterServices).
|
||||
func ShouldCollectServiceByName(filterServices []string, serviceName string) bool {
|
||||
if len(filterServices) == 0 {
|
||||
return true
|
||||
}
|
||||
|
|
107
lib/promscrape/discovery/consulagent/api.go
Normal file
107
lib/promscrape/discovery/consulagent/api.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package consulagent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
// apiConfig contains config for API server.
|
||||
type apiConfig struct {
|
||||
tagSeparator string
|
||||
consulWatcher *consulAgentWatcher
|
||||
agent *consul.Agent
|
||||
}
|
||||
|
||||
func (ac *apiConfig) mustStop() {
|
||||
ac.consulWatcher.mustStop()
|
||||
}
|
||||
|
||||
var configMap = discoveryutils.NewConfigMap()
|
||||
|
||||
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v.(*apiConfig), nil
|
||||
}
|
||||
|
||||
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
hcc := sdc.HTTPClientConfig
|
||||
token, err := consul.GetToken(sdc.Token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if token != "" {
|
||||
if hcc.BearerToken != nil {
|
||||
return nil, fmt.Errorf("cannot set both token and bearer_token configs")
|
||||
}
|
||||
hcc.BearerToken = promauth.NewSecret(token)
|
||||
}
|
||||
if len(sdc.Username) > 0 {
|
||||
if hcc.BasicAuth != nil {
|
||||
return nil, fmt.Errorf("cannot set both username and basic_auth configs")
|
||||
}
|
||||
hcc.BasicAuth = &promauth.BasicAuthConfig{
|
||||
Username: sdc.Username,
|
||||
Password: sdc.Password,
|
||||
}
|
||||
}
|
||||
ac, err := hcc.NewConfig(baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse auth config: %w", err)
|
||||
}
|
||||
apiServer := sdc.Server
|
||||
if apiServer == "" {
|
||||
apiServer = "localhost:8500"
|
||||
}
|
||||
if !strings.Contains(apiServer, "://") {
|
||||
scheme := sdc.Scheme
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
if hcc.TLSConfig != nil {
|
||||
scheme = "https"
|
||||
}
|
||||
}
|
||||
apiServer = scheme + "://" + apiServer
|
||||
}
|
||||
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
|
||||
}
|
||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL, proxyAC)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||
}
|
||||
tagSeparator := ","
|
||||
if sdc.TagSeparator != nil {
|
||||
tagSeparator = *sdc.TagSeparator
|
||||
}
|
||||
agent, err := consul.GetAgentInfo(client)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain consul datacenter: %w", err)
|
||||
}
|
||||
dc := sdc.Datacenter
|
||||
if dc == "" {
|
||||
dc = agent.Config.Datacenter
|
||||
}
|
||||
|
||||
namespace := sdc.Namespace
|
||||
// default namespace can be detected from env var.
|
||||
if namespace == "" {
|
||||
namespace = os.Getenv("CONSUL_NAMESPACE")
|
||||
}
|
||||
|
||||
cw := newConsulAgentWatcher(client, sdc, dc, namespace)
|
||||
cfg := &apiConfig{
|
||||
tagSeparator: tagSeparator,
|
||||
consulWatcher: cw,
|
||||
agent: agent,
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
57
lib/promscrape/discovery/consulagent/consulagent.go
Normal file
57
lib/promscrape/discovery/consulagent/consulagent.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package consulagent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||
)
|
||||
|
||||
// SDConfig represents service discovery config for Consul Agent.
|
||||
//
|
||||
// See https://grafana.com/docs/loki/latest/clients/promtail/configuration/#consulagent_sd_config
|
||||
type SDConfig struct {
|
||||
Server string `yaml:"server,omitempty"`
|
||||
Token *promauth.Secret `yaml:"token"`
|
||||
Datacenter string `yaml:"datacenter"`
|
||||
|
||||
// Namespace only supported at enterprise consul.
|
||||
// https://www.consul.io/docs/enterprise/namespaces
|
||||
Namespace string `yaml:"namespace,omitempty"`
|
||||
|
||||
Scheme string `yaml:"scheme,omitempty"`
|
||||
Username string `yaml:"username"`
|
||||
Password *promauth.Secret `yaml:"password"`
|
||||
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
|
||||
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
|
||||
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
|
||||
Services []string `yaml:"services,omitempty"`
|
||||
TagSeparator *string `yaml:"tag_separator,omitempty"`
|
||||
// See https://developer.hashicorp.com/consul/api-docs/features/filtering
|
||||
// list of supported filters https://developer.hashicorp.com/consul/api-docs/catalog#filtering-1
|
||||
Filter string `yaml:"filter,omitempty"`
|
||||
|
||||
// RefreshInterval time.Duration `yaml:"refresh_interval"`
|
||||
// refresh_interval is obtained from `-promscrape.consulagentSDCheckInterval` command-line option.
|
||||
}
|
||||
|
||||
// GetLabels returns Consul labels according to sdc.
|
||||
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||
cfg, err := getAPIConfig(sdc, baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get API config: %w", err)
|
||||
}
|
||||
ms := getServiceNodesLabels(cfg)
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
// MustStop stops further usage for sdc.
|
||||
func (sdc *SDConfig) MustStop() {
|
||||
v := configMap.Delete(sdc)
|
||||
if v != nil {
|
||||
// v can be nil if GetLabels wasn't called yet.
|
||||
cfg := v.(*apiConfig)
|
||||
cfg.mustStop()
|
||||
}
|
||||
}
|
65
lib/promscrape/discovery/consulagent/service_node.go
Normal file
65
lib/promscrape/discovery/consulagent/service_node.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package consulagent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
)
|
||||
|
||||
// getServiceNodesLabels returns labels for Consul service nodes with given cfg.
|
||||
func getServiceNodesLabels(cfg *apiConfig) []*promutils.Labels {
|
||||
sns := cfg.consulWatcher.getServiceNodesSnapshot()
|
||||
var ms []*promutils.Labels
|
||||
for svc, sn := range sns {
|
||||
for i := range sn {
|
||||
ms = appendTargetLabels(sn[i], ms, svc, cfg.tagSeparator, cfg.agent)
|
||||
}
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func appendTargetLabels(sn consul.ServiceNode, ms []*promutils.Labels, serviceName, tagSeparator string, agent *consul.Agent) []*promutils.Labels {
|
||||
const metaPrefix = "__meta_consulagent_"
|
||||
var addr string
|
||||
|
||||
// If the service address is not empty it should be used instead of the node address
|
||||
// since the service may be registered remotely through a different node.
|
||||
if sn.Service.Address != "" {
|
||||
addr = discoveryutils.JoinHostPort(sn.Service.Address, sn.Service.Port)
|
||||
} else {
|
||||
addr = discoveryutils.JoinHostPort(agent.Member.Addr, sn.Service.Port)
|
||||
}
|
||||
|
||||
m := promutils.NewLabels(16)
|
||||
m.Add("__address__", addr)
|
||||
m.Add(metaPrefix+"address", agent.Member.Addr)
|
||||
m.Add(metaPrefix+"dc", agent.Config.Datacenter)
|
||||
m.Add(metaPrefix+"health", consul.AggregatedStatus(sn.Checks))
|
||||
m.Add(metaPrefix+"namespace", sn.Service.Namespace)
|
||||
m.Add(metaPrefix+"node", agent.Config.NodeName)
|
||||
m.Add(metaPrefix+"service", serviceName)
|
||||
m.Add(metaPrefix+"service_address", sn.Service.Address)
|
||||
m.Add(metaPrefix+"service_id", sn.Service.ID)
|
||||
m.Add(metaPrefix+"service_port", strconv.Itoa(sn.Service.Port))
|
||||
|
||||
discoveryutils.AddTagsToLabels(m, sn.Service.Tags, metaPrefix, tagSeparator)
|
||||
|
||||
for k, v := range agent.Meta {
|
||||
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"metadata_"+k), v)
|
||||
}
|
||||
for k, v := range sn.Service.Meta {
|
||||
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"service_metadata_"+k), v)
|
||||
}
|
||||
for k, v := range sn.Node.TaggedAddresses {
|
||||
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"tagged_address_"+k), v)
|
||||
}
|
||||
for k, v := range sn.Service.TaggedAddresses {
|
||||
address := fmt.Sprintf("%s:%d", v.Address, v.Port)
|
||||
m.Add(discoveryutils.SanitizeLabelName(metaPrefix+"tagged_address_"+k), address)
|
||||
}
|
||||
ms = append(ms, m)
|
||||
return ms
|
||||
}
|
141
lib/promscrape/discovery/consulagent/service_node_test.go
Normal file
141
lib/promscrape/discovery/consulagent/service_node_test.go
Normal file
|
@ -0,0 +1,141 @@
|
|||
package consulagent
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
)
|
||||
|
||||
func TestParseServiceNodesFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
sns, err := consul.ParseServiceNodes([]byte(s))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
if sns != nil {
|
||||
t.Fatalf("unexpected non-nil ServiceNodes: %v", sns)
|
||||
}
|
||||
}
|
||||
f(``)
|
||||
f(`[1,23]`)
|
||||
f(`{"items":[{"metadata":1}]}`)
|
||||
}
|
||||
|
||||
func TestParseServiceNodesSuccess(t *testing.T) {
|
||||
data := `
|
||||
[
|
||||
{
|
||||
"Service": {
|
||||
"ID": "redis",
|
||||
"Service": "redis",
|
||||
"Tags": ["primary","foo=bar"],
|
||||
"Address": "10.1.10.12",
|
||||
"TaggedAddresses": {
|
||||
"lan": {
|
||||
"address": "10.1.10.12",
|
||||
"port": 8000
|
||||
},
|
||||
"wan": {
|
||||
"address": "198.18.1.2",
|
||||
"port": 80
|
||||
}
|
||||
},
|
||||
"Meta": {
|
||||
"redis_version": "4.0"
|
||||
},
|
||||
"Port": 8000,
|
||||
"Weights": {
|
||||
"Passing": 10,
|
||||
"Warning": 1
|
||||
},
|
||||
"Namespace": "ns-dev",
|
||||
"Partition": "part-foobar"
|
||||
},
|
||||
"Checks": [
|
||||
{
|
||||
"Node": "foobar",
|
||||
"CheckID": "service:redis",
|
||||
"Name": "Service 'redis' check",
|
||||
"Status": "passing",
|
||||
"Notes": "",
|
||||
"Output": "",
|
||||
"ServiceID": "redis",
|
||||
"ServiceName": "redis",
|
||||
"ServiceTags": ["primary"],
|
||||
"Namespace": "default"
|
||||
},
|
||||
{
|
||||
"Node": "foobar",
|
||||
"CheckID": "serfHealth",
|
||||
"Name": "Serf Health Status",
|
||||
"Status": "passing",
|
||||
"Notes": "",
|
||||
"Output": "",
|
||||
"ServiceID": "",
|
||||
"ServiceName": "",
|
||||
"ServiceTags": [],
|
||||
"Namespace": "default"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
`
|
||||
sns, err := consul.ParseServiceNodes([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if len(sns) != 1 {
|
||||
t.Fatalf("unexpected length of ServiceNodes; got %d; want %d", len(sns), 1)
|
||||
}
|
||||
sn := sns[0]
|
||||
|
||||
agentData := `
|
||||
{
|
||||
"Member": {
|
||||
"Addr": "10.1.10.12"
|
||||
},
|
||||
"Config": {
|
||||
"Datacenter": "dc1",
|
||||
"NodeName": "foobar"
|
||||
},
|
||||
"Meta": {
|
||||
"instance_type": "t2.medium"
|
||||
}
|
||||
}
|
||||
`
|
||||
agent, err := consul.ParseAgent([]byte(agentData))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// Check sn.appendTargetLabels()
|
||||
tagSeparator := ","
|
||||
labelss := appendTargetLabels(sn, nil, "redis", tagSeparator, agent)
|
||||
expectedLabelss := []*promutils.Labels{
|
||||
promutils.NewLabelsFromMap(map[string]string{
|
||||
"__address__": "10.1.10.12:8000",
|
||||
"__meta_consulagent_address": "10.1.10.12",
|
||||
"__meta_consulagent_dc": "dc1",
|
||||
"__meta_consulagent_health": "passing",
|
||||
"__meta_consulagent_metadata_instance_type": "t2.medium",
|
||||
"__meta_consulagent_namespace": "ns-dev",
|
||||
"__meta_consulagent_node": "foobar",
|
||||
"__meta_consulagent_service": "redis",
|
||||
"__meta_consulagent_service_address": "10.1.10.12",
|
||||
"__meta_consulagent_service_id": "redis",
|
||||
"__meta_consulagent_service_metadata_redis_version": "4.0",
|
||||
"__meta_consulagent_service_port": "8000",
|
||||
"__meta_consulagent_tagged_address_lan": "10.1.10.12:8000",
|
||||
"__meta_consulagent_tagged_address_wan": "198.18.1.2:80",
|
||||
"__meta_consulagent_tag_foo": "bar",
|
||||
"__meta_consulagent_tag_primary": "",
|
||||
"__meta_consulagent_tagpresent_foo": "true",
|
||||
"__meta_consulagent_tagpresent_primary": "true",
|
||||
"__meta_consulagent_tags": ",primary,foo=bar,",
|
||||
}),
|
||||
}
|
||||
discoveryutils.TestEqualLabelss(t, labelss, expectedLabelss)
|
||||
}
|
280
lib/promscrape/discovery/consulagent/watch.go
Normal file
280
lib/promscrape/discovery/consulagent/watch.go
Normal file
|
@ -0,0 +1,280 @@
|
|||
package consulagent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// SDCheckInterval is check interval for Consul Agent service discovery.
|
||||
var SDCheckInterval = flag.Duration("promscrape.consulagentSDCheckInterval", 30*time.Second, "Interval for checking for changes in Consul Agent. "+
|
||||
"This works only if consulagent_sd_configs is configured in '-promscrape.config' file. "+
|
||||
"See https://docs.victoriametrics.com/sd_configs.html#consulagent_sd_configs for details")
|
||||
|
||||
// consulAgentWatcher is a watcher for consul api, updates services map in background with long-polling.
|
||||
type consulAgentWatcher struct {
|
||||
client *discoveryutils.Client
|
||||
|
||||
servicesQueryArgs string
|
||||
|
||||
watchServices []string
|
||||
watchDatacenter string
|
||||
|
||||
// servicesLock protects services
|
||||
servicesLock sync.Mutex
|
||||
services map[string]*serviceWatcher
|
||||
|
||||
stoppedCh chan struct{}
|
||||
}
|
||||
|
||||
type serviceWatcher struct {
|
||||
serviceName string
|
||||
serviceNodes []consul.ServiceNode
|
||||
|
||||
stoppedCh chan struct{}
|
||||
|
||||
requestCtx context.Context
|
||||
requestCancel context.CancelFunc
|
||||
}
|
||||
|
||||
// newConsulAgentWatcher creates new watcher and starts background service discovery for Consul.
|
||||
func newConsulAgentWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, namespace string) *consulAgentWatcher {
|
||||
var qv = url.Values{}
|
||||
if namespace != "" {
|
||||
qv.Set("ns", namespace)
|
||||
}
|
||||
if len(sdc.Filter) > 0 {
|
||||
qv.Set("filter", sdc.Filter)
|
||||
}
|
||||
|
||||
cw := &consulAgentWatcher{
|
||||
client: client,
|
||||
servicesQueryArgs: "?" + qv.Encode(),
|
||||
watchServices: sdc.Services,
|
||||
watchDatacenter: datacenter,
|
||||
services: make(map[string]*serviceWatcher),
|
||||
stoppedCh: make(chan struct{}),
|
||||
}
|
||||
initCh := make(chan struct{})
|
||||
go func() {
|
||||
cw.watchForServicesUpdates(initCh)
|
||||
close(cw.stoppedCh)
|
||||
}()
|
||||
// wait for initialization to complete
|
||||
<-initCh
|
||||
return cw
|
||||
}
|
||||
|
||||
func (cw *consulAgentWatcher) mustStop() {
|
||||
cw.client.Stop()
|
||||
<-cw.stoppedCh
|
||||
}
|
||||
|
||||
func (cw *consulAgentWatcher) updateServices(serviceNames []string) {
|
||||
var initWG sync.WaitGroup
|
||||
|
||||
// Start watchers for new services.
|
||||
cw.servicesLock.Lock()
|
||||
for _, serviceName := range serviceNames {
|
||||
if _, ok := cw.services[serviceName]; ok {
|
||||
// The watcher for serviceName already exists.
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithCancel(cw.client.Context())
|
||||
sw := &serviceWatcher{
|
||||
serviceName: serviceName,
|
||||
stoppedCh: make(chan struct{}),
|
||||
requestCtx: ctx,
|
||||
requestCancel: cancel,
|
||||
}
|
||||
cw.services[serviceName] = sw
|
||||
serviceWatchersCreated.Inc()
|
||||
initWG.Add(1)
|
||||
go func() {
|
||||
serviceWatchersCount.Inc()
|
||||
sw.watchForServiceNodesUpdates(cw, &initWG)
|
||||
serviceWatchersCount.Dec()
|
||||
close(sw.stoppedCh)
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop watchers for removed services.
|
||||
newServiceNamesMap := make(map[string]struct{}, len(serviceNames))
|
||||
for _, serviceName := range serviceNames {
|
||||
newServiceNamesMap[serviceName] = struct{}{}
|
||||
}
|
||||
var swsStopped []*serviceWatcher
|
||||
for serviceName, sw := range cw.services {
|
||||
if _, ok := newServiceNamesMap[serviceName]; ok {
|
||||
continue
|
||||
}
|
||||
sw.requestCancel()
|
||||
delete(cw.services, serviceName)
|
||||
swsStopped = append(swsStopped, sw)
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
|
||||
// Wait until deleted service watchers are stopped.
|
||||
for _, sw := range swsStopped {
|
||||
<-sw.stoppedCh
|
||||
serviceWatchersStopped.Inc()
|
||||
}
|
||||
|
||||
// Wait until added service watchers are initialized.
|
||||
initWG.Wait()
|
||||
}
|
||||
|
||||
// watchForServicesUpdates watches for new services and updates it in cw.
|
||||
//
|
||||
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
|
||||
func (cw *consulAgentWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
apiServer := cw.client.APIServer()
|
||||
f := func() {
|
||||
serviceNames, err := cw.getServiceNames()
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Consul Agent services from %q: %s", apiServer, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
cw.updateServices(serviceNames)
|
||||
}
|
||||
|
||||
logger.Infof("started Consul Agent service watcher for %q", apiServer)
|
||||
f()
|
||||
|
||||
// send signal that initialization is complete
|
||||
close(initCh)
|
||||
|
||||
ticker := time.NewTicker(getCheckInterval())
|
||||
defer ticker.Stop()
|
||||
stopCh := cw.client.Context().Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-stopCh:
|
||||
logger.Infof("stopping Consul Agent service watchers for %q", apiServer)
|
||||
startTime := time.Now()
|
||||
var swsStopped []*serviceWatcher
|
||||
|
||||
cw.servicesLock.Lock()
|
||||
for _, sw := range cw.services {
|
||||
sw.requestCancel()
|
||||
swsStopped = append(swsStopped, sw)
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
|
||||
for _, sw := range swsStopped {
|
||||
<-sw.stoppedCh
|
||||
serviceWatchersStopped.Inc()
|
||||
}
|
||||
logger.Infof("stopped Consul Agent service watcher for %q in %.3f seconds", apiServer, time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
serviceWatchersCreated = metrics.NewCounter("vm_promscrape_discovery_consulagent_service_watchers_created_total")
|
||||
serviceWatchersStopped = metrics.NewCounter("vm_promscrape_discovery_consulagent_service_watchers_stopped_total")
|
||||
serviceWatchersCount = metrics.NewCounter("vm_promscrape_discovery_consulagent_service_watchers")
|
||||
)
|
||||
|
||||
// getServiceNames obtains serviceNames via request to Consul Agent.
|
||||
//
|
||||
// It returns an empty serviceNames list if response contains the same index.
|
||||
func (cw *consulAgentWatcher) getServiceNames() ([]string, error) {
|
||||
path := "/v1/agent/services" + cw.servicesQueryArgs
|
||||
data, err := cw.client.GetAPIResponse(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot perform Consul Agent API request at %q: %w", path, err)
|
||||
}
|
||||
|
||||
var m map[string]consul.Service
|
||||
if err := json.Unmarshal(data, &m); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse response from %q: %w; data=%q", path, err, data)
|
||||
}
|
||||
serviceNames := make([]string, 0, len(m))
|
||||
for serviceName, service := range m {
|
||||
if service.Datacenter != cw.watchDatacenter {
|
||||
continue
|
||||
}
|
||||
if !consul.ShouldCollectServiceByName(cw.watchServices, serviceName) {
|
||||
continue
|
||||
}
|
||||
serviceNames = append(serviceNames, serviceName)
|
||||
}
|
||||
return serviceNames, nil
|
||||
}
|
||||
|
||||
// watchForServiceNodesUpdates watches for Consul serviceNode changes for the given serviceName.
|
||||
//
|
||||
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
|
||||
func (sw *serviceWatcher) watchForServiceNodesUpdates(cw *consulAgentWatcher, initWG *sync.WaitGroup) {
|
||||
apiServer := cw.client.APIServer()
|
||||
path := "/v1/agent/health/service/name/" + sw.serviceName
|
||||
f := func() {
|
||||
data, err := cw.client.GetAPIResponseWithParamsCtx(sw.requestCtx, path, nil, nil)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("cannot obtain Consul Agent serviceNodes for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
sns, err := consul.ParseServiceNodes(data)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse Consul Agent serviceNodes response for serviceName=%q from %q: %s", sw.serviceName, apiServer, err)
|
||||
return
|
||||
}
|
||||
|
||||
cw.servicesLock.Lock()
|
||||
sw.serviceNodes = sns
|
||||
cw.servicesLock.Unlock()
|
||||
}
|
||||
|
||||
f()
|
||||
// Notify caller that initialization is complete
|
||||
initWG.Done()
|
||||
|
||||
ticker := time.NewTicker(getCheckInterval())
|
||||
defer ticker.Stop()
|
||||
stopCh := sw.requestCtx.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getServiceNodesSnapshot returns a snapshot of discovered ServiceNodes.
|
||||
func (cw *consulAgentWatcher) getServiceNodesSnapshot() map[string][]consul.ServiceNode {
|
||||
cw.servicesLock.Lock()
|
||||
sns := make(map[string][]consul.ServiceNode, len(cw.services))
|
||||
for svc, sw := range cw.services {
|
||||
sns[svc] = sw.serviceNodes
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
return sns
|
||||
}
|
||||
|
||||
func getCheckInterval() time.Duration {
|
||||
d := *SDCheckInterval
|
||||
if d <= time.Second {
|
||||
return time.Second
|
||||
}
|
||||
return d
|
||||
}
|
|
@ -4,7 +4,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
|
@ -69,27 +68,8 @@ func (svc *Service) appendTargetLabels(ms []*promutils.Labels, tagSeparator stri
|
|||
m.Add("__meta_nomad_service_id", svc.ID)
|
||||
m.Add("__meta_nomad_service_job_id", svc.JobID)
|
||||
m.Add("__meta_nomad_service_port", strconv.Itoa(svc.Port))
|
||||
// We surround the separated list with the separator as well. This way regular expressions
|
||||
// in relabeling rules don't have to consider tag positions.
|
||||
m.Add("__meta_nomad_tags", tagSeparator+strings.Join(svc.Tags, tagSeparator)+tagSeparator)
|
||||
|
||||
// Expose individual tags via __meta_nomad_tag_* labels, so users could move all the tags
|
||||
// into the discovered scrape target with the following relabeling rule in the way similar to kubernetes_sd_configs:
|
||||
//
|
||||
// - action: labelmap
|
||||
// regex: __meta_nomad_tag_(.+)
|
||||
//
|
||||
// This solves https://stackoverflow.com/questions/44339461/relabeling-in-prometheus
|
||||
for _, tag := range svc.Tags {
|
||||
k := tag
|
||||
v := ""
|
||||
if n := strings.IndexByte(tag, '='); n >= 0 {
|
||||
k = tag[:n]
|
||||
v = tag[n+1:]
|
||||
}
|
||||
m.Add(discoveryutils.SanitizeLabelName("__meta_nomad_tag_"+k), v)
|
||||
m.Add(discoveryutils.SanitizeLabelName("__meta_nomad_tagpresent_"+k), "true")
|
||||
}
|
||||
discoveryutils.AddTagsToLabels(m, svc.Tags, "__meta_nomad_", tagSeparator)
|
||||
|
||||
ms = append(ms, m)
|
||||
return ms
|
||||
|
|
|
@ -62,3 +62,28 @@ func TestEqualLabelss(t *testing.T, got, want []*promutils.Labels) {
|
|||
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", gotCopy, want)
|
||||
}
|
||||
}
|
||||
|
||||
// AddTagsToLabels adds <prefix>_tags (separated with tagSeparator) to labels
|
||||
// and exposes individual tags via <prefix>_tag_* labels, so users could move all the tags
|
||||
// into the discovered scrape target with the following relabeling rule in the way similar to kubernetes_sd_configs:
|
||||
//
|
||||
// - action: labelmap
|
||||
// regex: <prefix>_tag_(.+)
|
||||
//
|
||||
// This solves https://stackoverflow.com/questions/44339461/relabeling-in-prometheus
|
||||
func AddTagsToLabels(m *promutils.Labels, tags []string, prefix, tagSeparator string) {
|
||||
// We surround the separated list with the separator as well. This way regular expressions
|
||||
// in relabeling rules don't have to consider tag positions.
|
||||
m.Add(prefix+"tags", tagSeparator+strings.Join(tags, tagSeparator)+tagSeparator)
|
||||
|
||||
for _, tag := range tags {
|
||||
k := tag
|
||||
v := ""
|
||||
if n := strings.IndexByte(tag, '='); n >= 0 {
|
||||
k = tag[:n]
|
||||
v = tag[n+1:]
|
||||
}
|
||||
m.Add(SanitizeLabelName(prefix+"tag_"+k), v)
|
||||
m.Add(SanitizeLabelName(prefix+"tagpresent_"+k), "true")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consulagent"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
|
||||
|
@ -125,6 +126,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
|
|||
scs := newScrapeConfigs(pushData, globalStopCh)
|
||||
scs.add("azure_sd_configs", *azure.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getAzureSDScrapeWork(swsPrev) })
|
||||
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
|
||||
scs.add("consulagent_sd_configs", *consulagent.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulAgentSDScrapeWork(swsPrev) })
|
||||
scs.add("digitalocean_sd_configs", *digitalocean.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDigitalOceanDScrapeWork(swsPrev) })
|
||||
scs.add("dns_sd_configs", *dns.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
|
||||
scs.add("docker_sd_configs", *docker.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSDScrapeWork(swsPrev) })
|
||||
|
|
Loading…
Reference in a new issue