mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/promscrape: add Prometheus-compatible service discovery for Nomad (#3549)
Add nomad_sd_config support for service discovery
This commit is contained in:
parent
51d956225d
commit
8f42c5a024
13 changed files with 1330 additions and 0 deletions
|
@ -1338,6 +1338,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.minResponseSizeForStreamParse size
|
||||
The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000)
|
||||
-promscrape.nomad.waitTime duration
|
||||
Wait time used by Nomad service discovery. Default value is used if not set
|
||||
-promscrape.nomadSDCheckInterval duration
|
||||
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.openstackSDCheckInterval duration
|
||||
|
|
|
@ -31,6 +31,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix a panic during target discovery when `vmagent` runs with `-promscrape.dropOriginalLabels` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3580). The bug has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): properly encode `filters` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
|
||||
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus compatible service discovery for Nomad services. These targets are discovered via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs).
|
||||
|
||||
## [v1.85.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.3)
|
||||
|
||||
|
|
|
@ -2333,6 +2333,10 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-promscrape.minResponseSizeForStreamParse size
|
||||
The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000)
|
||||
-promscrape.nomad.waitTime duration
|
||||
Wait time used by Nomad service discovery. Default value is used if not set
|
||||
-promscrape.nomadSDCheckInterval duration
|
||||
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.openstackSDCheckInterval duration
|
||||
|
|
|
@ -20,6 +20,7 @@ sort: 24
|
|||
* `gce_sd_configs` is for discovering and scraping [Google Compute Engine](https://cloud.google.com/compute) targets. See [these docs](#gce_sd_configs).
|
||||
* `http_sd_configs` is for discovering and scraping targerts provided by external http-based service discovery. See [these docs](#http_sd_configs).
|
||||
* `kubernetes_sd_configs` is for discovering and scraping [Kubernetes](https://kubernetes.io/) targets. See [these docs](#kubernetes_sd_configs).
|
||||
* `nomad_sd_configs` is for discovering and scraping targets registered in [Nomad](https://www.nomadproject.io/). See [these docs](#nomad_sd_configs).
|
||||
* `openstack_sd_configs` is for discovering and scraping OpenStack targets. See [these docs](#openstack_sd_configs).
|
||||
* `static_configs` is for scraping statically defined targets. See [these docs](#static_configs).
|
||||
* `yandexcloud_sd_configs` is for discoverying and scraping [Yandex Cloud](https://cloud.yandex.com/en/) targets. See [these docs](#yandexcloud_sd_configs).
|
||||
|
@ -869,6 +870,73 @@ One of the following `role` types can be configured to discover targets:
|
|||
* `__meta_kubernetes_ingress_scheme`: Protocol scheme of ingress, https if TLS config is set. Defaults to http.
|
||||
* `__meta_kubernetes_ingress_path`: Path from ingress spec. Defaults to `/`.
|
||||
|
||||
## nomad_sd_configs
|
||||
|
||||
Nomad SD configuration allows retrieving scrape targets from [Nomad Services](https://www.hashicorp.com/blog/nomad-service-discovery).
|
||||
|
||||
Configuration example:
|
||||
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- job_name: nomad
|
||||
nomad_sd_configs:
|
||||
|
||||
# server is an optional Nomad server to connect to. By default localhost:4646 is used
|
||||
- server: "localhost:4646"
|
||||
|
||||
# token is an optional Nomad API token.
|
||||
# If the token isn't specified, then it is read from the NOMAD_TOKEN environment var.
|
||||
# token: "..."
|
||||
|
||||
# datacenter is an optional Nomad API datacenter.
|
||||
# If the datacenter isn't specified, then it is read from Nomad server.
|
||||
# See https://developer.hashicorp.com/nomad/api-docs/agent#query-self
|
||||
# datacenter: "..."
|
||||
|
||||
# namespace is an optional Nomad namespace.
|
||||
# If the namespace isn't specified, then it is read from NOMAD_NAMESPACE environment var.
|
||||
# namespace: "..."
|
||||
|
||||
# scheme is an optional scheme (http or https) to use for connecting to Nomad server.
|
||||
# By default http scheme is used.
|
||||
# scheme: "..."
|
||||
|
||||
# services is an optional list of services for which targets are retrieved.
|
||||
# If omitted, all services are scraped.
|
||||
# See https://developer.hashicorp.com/nomad/api-docs/services#list-services .
|
||||
# services: ["...", "..."]
|
||||
|
||||
# tags is an optional list of tags used to filter nodes for a given service.
|
||||
# Services must contain all tags in the list.
|
||||
# tags: ["...", "..."]
|
||||
|
||||
# tag_separate is an optional string by which Nomad tags are joined into the __meta_nomad_tags label.
|
||||
# By default "," is used as a tag separator.
|
||||
# Individual tags are also available via __meta_nomad_tag_<tagname> labels - see below.
|
||||
# tag_separator: "..."
|
||||
|
||||
# allow_stale is an optional config, which allows stale Nomad results.
|
||||
# See https://developer.hashicorp.com/nomad/api-docs#consistency-modes
|
||||
# Reduce load on Nomad if set to true. By default is is set to true.
|
||||
# allow_stale: ...
|
||||
|
||||
# Additional HTTP API client options can be specified here.
|
||||
# See https://docs.victoriametrics.com/sd_configs.html#http-api-client-options
|
||||
```
|
||||
|
||||
The following meta labels are available on discovered targets during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling):
|
||||
|
||||
* `__meta_nomad_address`: the address of the target
|
||||
* `__meta_nomad_dc`: the datacenter name for the target
|
||||
* `__meta_nomad_namespace`: namespace of the service
|
||||
* `__meta_nomad_node_id`: the node ID defined for the target
|
||||
* `__meta_nomad_service`: the name of the service the target belongs to
|
||||
* `__meta_nomad_service_address`: the service address of the target
|
||||
* `__meta_nomad_service_id`: the service ID of the target
|
||||
* `__meta_nomad_service_port`: the service port of the target
|
||||
* `__meta_nomad_tag_<tagname>`: the value for the given <tagname> tag of the target
|
||||
* `__meta_nomad_tagpresent_<tagname>`: "true" for every <tagname> tag of the target
|
||||
* `__meta_nomad_tags`: the list of tags of the target joined by the tag_separator
|
||||
|
||||
## openstack_sd_configs
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
|
@ -262,6 +263,7 @@ type ScrapeConfig struct {
|
|||
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs,omitempty"`
|
||||
HTTPSDConfigs []http.SDConfig `yaml:"http_sd_configs,omitempty"`
|
||||
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs,omitempty"`
|
||||
NomadSDConfigs []nomad.SDConfig `yaml:"nomad_sd_configs,omitempty"`
|
||||
OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs,omitempty"`
|
||||
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
|
||||
YandexCloudSDConfigs []yandexcloud.SDConfig `yaml:"yandexcloud_sd_configs,omitempty"`
|
||||
|
@ -329,6 +331,9 @@ func (sc *ScrapeConfig) mustStop() {
|
|||
for i := range sc.KubernetesSDConfigs {
|
||||
sc.KubernetesSDConfigs[i].MustStop()
|
||||
}
|
||||
for i := range sc.NomadSDConfigs {
|
||||
sc.NomadSDConfigs[i].MustStop()
|
||||
}
|
||||
for i := range sc.OpenStackSDConfigs {
|
||||
sc.OpenStackSDConfigs[i].MustStop()
|
||||
}
|
||||
|
@ -806,6 +811,33 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
|||
return dst
|
||||
}
|
||||
|
||||
// getNomadSDScrapeWork returns `nomad_sd_configs` ScrapeWork from cfg.
|
||||
func (cfg *Config) getNomadSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
||||
swsPrevByJob := getSWSByJob(prev)
|
||||
dst := make([]*ScrapeWork, 0, len(prev))
|
||||
for _, sc := range cfg.ScrapeConfigs {
|
||||
dstLen := len(dst)
|
||||
ok := true
|
||||
for j := range sc.NomadSDConfigs {
|
||||
sdc := &sc.NomadSDConfigs[j]
|
||||
var okLocal bool
|
||||
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "nomad_sd_config")
|
||||
if ok {
|
||||
ok = okLocal
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
swsPrev := swsPrevByJob[sc.swc.jobName]
|
||||
if len(swsPrev) > 0 {
|
||||
logger.Errorf("there were errors when discovering nomad targets for job %q, so preserving the previous targets", sc.swc.jobName)
|
||||
dst = append(dst[:dstLen], swsPrev...)
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// getOpenStackSDScrapeWork returns `openstack_sd_configs` ScrapeWork from cfg.
|
||||
func (cfg *Config) getOpenStackSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
|
||||
swsPrevByJob := getSWSByJob(prev)
|
||||
|
|
28
lib/promscrape/discovery/nomad/agent.go
Normal file
28
lib/promscrape/discovery/nomad/agent.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Agent is Nomad agent.
|
||||
//
|
||||
// See https://developer.hashicorp.com/nomad/api-docs/agent
|
||||
type Agent struct {
|
||||
Config AgentConfig
|
||||
}
|
||||
|
||||
// AgentConfig is Nomad agent config.
|
||||
//
|
||||
// See https://developer.hashicorp.com/nomad/api-docs/agent
|
||||
type AgentConfig struct {
|
||||
Datacenter string
|
||||
}
|
||||
|
||||
func parseAgent(data []byte) (*Agent, error) {
|
||||
var a Agent
|
||||
if err := json.Unmarshal(data, &a); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal agent info from %q: %w", data, err)
|
||||
}
|
||||
return &a, nil
|
||||
}
|
477
lib/promscrape/discovery/nomad/agent_test.go
Normal file
477
lib/promscrape/discovery/nomad/agent_test.go
Normal file
|
@ -0,0 +1,477 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseAgentFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
a, err := parseAgent([]byte(s))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
if a != nil {
|
||||
t.Fatalf("unexpected non-nil Agent: %v", a)
|
||||
}
|
||||
}
|
||||
f(``)
|
||||
f(`[1,23]`)
|
||||
}
|
||||
|
||||
func TestParseAgentSuccess(t *testing.T) {
|
||||
data := `
|
||||
{
|
||||
"config": {
|
||||
"ACL": {
|
||||
"Enabled": false,
|
||||
"PolicyTTL": 30000000000,
|
||||
"ReplicationToken": "",
|
||||
"RoleTTL": 30000000000,
|
||||
"TokenMaxExpirationTTL": 0,
|
||||
"TokenMinExpirationTTL": 0,
|
||||
"TokenTTL": 30000000000
|
||||
},
|
||||
"Addresses": {
|
||||
"HTTP": "0.0.0.0",
|
||||
"RPC": "0.0.0.0",
|
||||
"Serf": "0.0.0.0"
|
||||
},
|
||||
"AdvertiseAddrs": {
|
||||
"HTTP": "192.168.29.76:4646",
|
||||
"RPC": "192.168.29.76:4647",
|
||||
"Serf": "192.168.29.76:4648"
|
||||
},
|
||||
"Audit": {
|
||||
"Enabled": null,
|
||||
"Filters": null,
|
||||
"Sinks": null
|
||||
},
|
||||
"Autopilot": {
|
||||
"CleanupDeadServers": null,
|
||||
"DisableUpgradeMigration": null,
|
||||
"EnableCustomUpgrades": null,
|
||||
"EnableRedundancyZones": null,
|
||||
"LastContactThreshold": 200000000,
|
||||
"MaxTrailingLogs": 250,
|
||||
"MinQuorum": 0,
|
||||
"ServerStabilizationTime": 10000000000
|
||||
},
|
||||
"BindAddr": "0.0.0.0",
|
||||
"Client": {
|
||||
"AllocDir": "",
|
||||
"Artifact": {
|
||||
"GCSTimeout": "30m",
|
||||
"GitTimeout": "30m",
|
||||
"HTTPMaxSize": "100GB",
|
||||
"HTTPReadTimeout": "30m",
|
||||
"HgTimeout": "30m",
|
||||
"S3Timeout": "30m"
|
||||
},
|
||||
"BindWildcardDefaultHostNetwork": true,
|
||||
"BridgeNetworkName": "",
|
||||
"BridgeNetworkSubnet": "",
|
||||
"CNIConfigDir": "/opt/cni/config",
|
||||
"CNIPath": "/opt/cni/bin",
|
||||
"CgroupParent": "",
|
||||
"ChrootEnv": {
|
||||
"/run/resolvconf": "/run/resolvconf",
|
||||
"/sbin": "/sbin",
|
||||
"/usr": "/usr",
|
||||
"/bin/": "/bin/",
|
||||
"/etc/": "/etc/",
|
||||
"/lib": "/lib",
|
||||
"/lib32": "/lib32",
|
||||
"/lib64": "/lib64"
|
||||
},
|
||||
"ClientMaxPort": 14512,
|
||||
"ClientMinPort": 14000,
|
||||
"CpuCompute": 0,
|
||||
"DisableRemoteExec": false,
|
||||
"Enabled": true,
|
||||
"GCDiskUsageThreshold": 80,
|
||||
"GCInodeUsageThreshold": 70,
|
||||
"GCInterval": 60000000000,
|
||||
"GCMaxAllocs": 50,
|
||||
"GCParallelDestroys": 2,
|
||||
"HostNetworks": null,
|
||||
"HostVolumes": null,
|
||||
"MaxDynamicPort": 32000,
|
||||
"MaxKillTimeout": "30s",
|
||||
"MemoryMB": 0,
|
||||
"Meta": {
|
||||
"stack": "zerodha",
|
||||
"env": "dev"
|
||||
},
|
||||
"MinDynamicPort": 20000,
|
||||
"NetworkInterface": "",
|
||||
"NetworkSpeed": 0,
|
||||
"NoHostUUID": true,
|
||||
"NodeClass": "",
|
||||
"NomadServiceDiscovery": true,
|
||||
"Options": {},
|
||||
"ReserveableCores": "",
|
||||
"Reserved": {
|
||||
"CPU": 0,
|
||||
"Cores": "2",
|
||||
"DiskMB": 1024,
|
||||
"MemoryMB": 1024,
|
||||
"ReservedPorts": "22"
|
||||
},
|
||||
"ServerJoin": {
|
||||
"RetryInterval": 30000000000,
|
||||
"RetryJoin": [],
|
||||
"RetryMaxAttempts": 0,
|
||||
"StartJoin": null
|
||||
},
|
||||
"Servers": null,
|
||||
"StateDir": "",
|
||||
"TemplateConfig": {
|
||||
"BlockQueryWaitTime": null,
|
||||
"BlockQueryWaitTimeHCL": "",
|
||||
"ConsulRetry": null,
|
||||
"DisableSandbox": false,
|
||||
"FunctionBlacklist": null,
|
||||
"FunctionDenylist": [
|
||||
"plugin",
|
||||
"writeToFile"
|
||||
],
|
||||
"MaxStale": null,
|
||||
"MaxStaleHCL": "",
|
||||
"NomadRetry": null,
|
||||
"VaultRetry": null
|
||||
}
|
||||
},
|
||||
"Consul": {
|
||||
"Addr": "127.0.0.1:8500",
|
||||
"AllowUnauthenticated": true,
|
||||
"Auth": "",
|
||||
"AutoAdvertise": true,
|
||||
"CAFile": "",
|
||||
"CertFile": "",
|
||||
"ChecksUseAdvertise": false,
|
||||
"ClientAutoJoin": true,
|
||||
"ClientHTTPCheckName": "Nomad Client HTTP Check",
|
||||
"ClientServiceName": "nomad-client",
|
||||
"EnableSSL": false,
|
||||
"GRPCAddr": "",
|
||||
"KeyFile": "",
|
||||
"Namespace": "",
|
||||
"ServerAutoJoin": true,
|
||||
"ServerHTTPCheckName": "Nomad Server HTTP Check",
|
||||
"ServerRPCCheckName": "Nomad Server RPC Check",
|
||||
"ServerSerfCheckName": "Nomad Server Serf Check",
|
||||
"ServerServiceName": "nomad",
|
||||
"ShareSSL": null,
|
||||
"Tags": null,
|
||||
"Timeout": 5000000000,
|
||||
"Token": "",
|
||||
"VerifySSL": true
|
||||
},
|
||||
"DataDir": "/opt/nomad/data",
|
||||
"Datacenter": "dc1",
|
||||
"DevMode": false,
|
||||
"DisableAnonymousSignature": false,
|
||||
"DisableUpdateCheck": false,
|
||||
"EnableDebug": false,
|
||||
"EnableSyslog": false,
|
||||
"Files": [
|
||||
"nomad.hcl"
|
||||
],
|
||||
"HTTPAPIResponseHeaders": {},
|
||||
"LeaveOnInt": false,
|
||||
"LeaveOnTerm": false,
|
||||
"Limits": {
|
||||
"HTTPMaxConnsPerClient": 100,
|
||||
"HTTPSHandshakeTimeout": "5s",
|
||||
"RPCHandshakeTimeout": "5s",
|
||||
"RPCMaxConnsPerClient": 100
|
||||
},
|
||||
"LogFile": "",
|
||||
"LogJson": false,
|
||||
"LogLevel": "INFO",
|
||||
"LogRotateBytes": 0,
|
||||
"LogRotateDuration": "",
|
||||
"LogRotateMaxFiles": 0,
|
||||
"NodeName": "",
|
||||
"PluginDir": "/opt/nomad/data/plugins",
|
||||
"Plugins": [
|
||||
{
|
||||
"Args": null,
|
||||
"Config": {
|
||||
"allow_privileged": true,
|
||||
"volumes": [
|
||||
{
|
||||
"enabled": true
|
||||
}
|
||||
],
|
||||
"extra_labels": [
|
||||
"job_name",
|
||||
"job_id",
|
||||
"task_group_name",
|
||||
"task_name",
|
||||
"namespace",
|
||||
"node_name",
|
||||
"node_id"
|
||||
]
|
||||
},
|
||||
"Name": "docker"
|
||||
},
|
||||
{
|
||||
"Args": null,
|
||||
"Config": {
|
||||
"enabled": true,
|
||||
"no_cgroups": true
|
||||
},
|
||||
"Name": "raw_exec"
|
||||
}
|
||||
],
|
||||
"Ports": {
|
||||
"HTTP": 4646,
|
||||
"RPC": 4647,
|
||||
"Serf": 4648
|
||||
},
|
||||
"Region": "global",
|
||||
"Sentinel": {
|
||||
"Imports": null
|
||||
},
|
||||
"Server": {
|
||||
"ACLTokenGCThreshold": "",
|
||||
"AuthoritativeRegion": "",
|
||||
"BootstrapExpect": 1,
|
||||
"CSIPluginGCThreshold": "",
|
||||
"CSIVolumeClaimGCThreshold": "",
|
||||
"DataDir": "",
|
||||
"DefaultSchedulerConfig": null,
|
||||
"DeploymentGCThreshold": "",
|
||||
"DeploymentQueryRateLimit": 0,
|
||||
"EnableEventBroker": true,
|
||||
"Enabled": true,
|
||||
"EnabledSchedulers": null,
|
||||
"EvalGCThreshold": "",
|
||||
"EventBufferSize": 100,
|
||||
"FailoverHeartbeatTTL": 0,
|
||||
"HeartbeatGrace": 0,
|
||||
"JobGCInterval": "",
|
||||
"JobGCThreshold": "",
|
||||
"LicenseEnv": "",
|
||||
"LicensePath": "",
|
||||
"MaxHeartbeatsPerSecond": 0,
|
||||
"MinHeartbeatTTL": 0,
|
||||
"NodeGCThreshold": "",
|
||||
"NonVotingServer": false,
|
||||
"NumSchedulers": null,
|
||||
"PlanRejectionTracker": {
|
||||
"Enabled": false,
|
||||
"NodeThreshold": 100,
|
||||
"NodeWindow": 300000000000
|
||||
},
|
||||
"RaftBoltConfig": null,
|
||||
"RaftMultiplier": null,
|
||||
"RaftProtocol": 3,
|
||||
"RedundancyZone": "",
|
||||
"RejoinAfterLeave": false,
|
||||
"RetryInterval": 0,
|
||||
"RetryJoin": [],
|
||||
"RetryMaxAttempts": 0,
|
||||
"RootKeyGCInterval": "",
|
||||
"RootKeyGCThreshold": "",
|
||||
"RootKeyRotationThreshold": "",
|
||||
"Search": {
|
||||
"FuzzyEnabled": true,
|
||||
"LimitQuery": 20,
|
||||
"LimitResults": 100,
|
||||
"MinTermLength": 2
|
||||
},
|
||||
"ServerJoin": {
|
||||
"RetryInterval": 30000000000,
|
||||
"RetryJoin": [],
|
||||
"RetryMaxAttempts": 0,
|
||||
"StartJoin": null
|
||||
},
|
||||
"StartJoin": [],
|
||||
"UpgradeVersion": ""
|
||||
},
|
||||
"SyslogFacility": "LOCAL0",
|
||||
"TLSConfig": {
|
||||
"CAFile": "",
|
||||
"CertFile": "",
|
||||
"Checksum": "",
|
||||
"EnableHTTP": false,
|
||||
"EnableRPC": false,
|
||||
"KeyFile": "",
|
||||
"KeyLoader": {},
|
||||
"RPCUpgradeMode": false,
|
||||
"TLSCipherSuites": "",
|
||||
"TLSMinVersion": "",
|
||||
"TLSPreferServerCipherSuites": false,
|
||||
"VerifyHTTPSClient": false,
|
||||
"VerifyServerHostname": false
|
||||
},
|
||||
"Telemetry": {
|
||||
"CirconusAPIApp": "",
|
||||
"CirconusAPIToken": "",
|
||||
"CirconusAPIURL": "",
|
||||
"CirconusBrokerID": "",
|
||||
"CirconusBrokerSelectTag": "",
|
||||
"CirconusCheckDisplayName": "",
|
||||
"CirconusCheckForceMetricActivation": "",
|
||||
"CirconusCheckID": "",
|
||||
"CirconusCheckInstanceID": "",
|
||||
"CirconusCheckSearchTag": "",
|
||||
"CirconusCheckSubmissionURL": "",
|
||||
"CirconusCheckTags": "",
|
||||
"CirconusSubmissionInterval": "",
|
||||
"CollectionInterval": "1s",
|
||||
"DataDogAddr": "",
|
||||
"DataDogTags": null,
|
||||
"DisableDispatchedJobSummaryMetrics": false,
|
||||
"DisableHostname": false,
|
||||
"FilterDefault": null,
|
||||
"PrefixFilter": null,
|
||||
"PrometheusMetrics": false,
|
||||
"PublishAllocationMetrics": false,
|
||||
"PublishNodeMetrics": false,
|
||||
"StatsdAddr": "",
|
||||
"StatsiteAddr": "",
|
||||
"UseNodeName": false
|
||||
},
|
||||
"UI": {
|
||||
"Consul": {
|
||||
"BaseUIURL": ""
|
||||
},
|
||||
"Enabled": true,
|
||||
"Vault": {
|
||||
"BaseUIURL": ""
|
||||
}
|
||||
},
|
||||
"Vault": {
|
||||
"Addr": "https://vault.service.consul:8200",
|
||||
"AllowUnauthenticated": true,
|
||||
"ConnectionRetryIntv": 30000000000,
|
||||
"Enabled": null,
|
||||
"Namespace": "",
|
||||
"Role": "",
|
||||
"TLSCaFile": "",
|
||||
"TLSCaPath": "",
|
||||
"TLSCertFile": "",
|
||||
"TLSKeyFile": "",
|
||||
"TLSServerName": "",
|
||||
"TLSSkipVerify": null,
|
||||
"TaskTokenTTL": "",
|
||||
"Token": ""
|
||||
},
|
||||
"Version": {
|
||||
"Revision": "f464aca721d222ae9c1f3df643b3c3aaa20e2da7",
|
||||
"Version": "1.4.3",
|
||||
"VersionMetadata": "",
|
||||
"VersionPrerelease": ""
|
||||
}
|
||||
},
|
||||
"member": {
|
||||
"Addr": "192.168.29.76",
|
||||
"DelegateCur": 4,
|
||||
"DelegateMax": 5,
|
||||
"DelegateMin": 2,
|
||||
"Name": "pop-os.global",
|
||||
"Port": 4648,
|
||||
"ProtocolCur": 2,
|
||||
"ProtocolMax": 5,
|
||||
"ProtocolMin": 1,
|
||||
"Status": "alive",
|
||||
"Tags": {
|
||||
"rpc_addr": "192.168.29.76",
|
||||
"bootstrap": "1",
|
||||
"expect": "1",
|
||||
"role": "nomad",
|
||||
"region": "global",
|
||||
"build": "1.4.3",
|
||||
"revision": "f464aca721d222ae9c1f3df643b3c3aaa20e2da7",
|
||||
"port": "4647",
|
||||
"vsn": "1",
|
||||
"dc": "dc1",
|
||||
"raft_vsn": "3",
|
||||
"id": "d78cdda9-7e35-48d0-a0e0-36041f6df0ec"
|
||||
}
|
||||
},
|
||||
"stats": {
|
||||
"nomad": {
|
||||
"leader": "true",
|
||||
"leader_addr": "192.168.29.76:4647",
|
||||
"bootstrap": "true",
|
||||
"known_regions": "1",
|
||||
"server": "true"
|
||||
},
|
||||
"raft": {
|
||||
"term": "3",
|
||||
"latest_configuration_index": "0",
|
||||
"applied_index": "384",
|
||||
"protocol_version_min": "0",
|
||||
"protocol_version_max": "3",
|
||||
"snapshot_version_min": "0",
|
||||
"num_peers": "0",
|
||||
"state": "Leader",
|
||||
"last_log_term": "3",
|
||||
"commit_index": "384",
|
||||
"last_contact": "0",
|
||||
"last_snapshot_index": "0",
|
||||
"protocol_version": "3",
|
||||
"snapshot_version_max": "1",
|
||||
"latest_configuration": "[{Suffrage:Voter ID:d78cdda9-7e35-48d0-a0e0-36041f6df0ec Address:172.20.10.3:4647}]",
|
||||
"last_log_index": "384",
|
||||
"fsm_pending": "0",
|
||||
"last_snapshot_term": "0"
|
||||
},
|
||||
"serf": {
|
||||
"coordinate_resets": "0",
|
||||
"health_score": "0",
|
||||
"member_time": "1",
|
||||
"query_time": "1",
|
||||
"intent_queue": "0",
|
||||
"event_queue": "0",
|
||||
"encrypted": "false",
|
||||
"members": "1",
|
||||
"failed": "0",
|
||||
"left": "0",
|
||||
"event_time": "1",
|
||||
"query_queue": "0"
|
||||
},
|
||||
"runtime": {
|
||||
"arch": "amd64",
|
||||
"version": "go1.19.3",
|
||||
"max_procs": "8",
|
||||
"goroutines": "294",
|
||||
"cpu_count": "8",
|
||||
"kernel.name": "linux"
|
||||
},
|
||||
"vault": {
|
||||
"tracked_for_revoked": "0",
|
||||
"token_ttl": "0s",
|
||||
"token_expire_time": "",
|
||||
"token_last_renewal_time": "",
|
||||
"token_next_renewal_time": ""
|
||||
},
|
||||
"client": {
|
||||
"node_id": "9e02c85b-db59-45f1-ddee-40d0317bd33d",
|
||||
"known_servers": "192.168.29.76:4647",
|
||||
"num_allocations": "3",
|
||||
"last_heartbeat": "13.254411467s",
|
||||
"heartbeat_ttl": "19.86001456s"
|
||||
}
|
||||
}
|
||||
}
|
||||
`
|
||||
a, err := parseAgent([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
aExpected := &Agent{
|
||||
Config: AgentConfig{
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(a, aExpected) {
|
||||
t.Fatalf("unexpected Agent parsed;\ngot\n%v\nwant\n%v", a, aExpected)
|
||||
}
|
||||
}
|
181
lib/promscrape/discovery/nomad/api.go
Normal file
181
lib/promscrape/discovery/nomad/api.go
Normal file
|
@ -0,0 +1,181 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/fasthttp"
|
||||
)
|
||||
|
||||
var waitTime = flag.Duration("promscrape.nomad.waitTime", 0, "Wait time used by Nomad service discovery. Default value is used if not set")
|
||||
|
||||
// apiConfig contains config for API server.
|
||||
type apiConfig struct {
|
||||
tagSeparator string
|
||||
nomadWatcher *nomadWatcher
|
||||
}
|
||||
|
||||
func (ac *apiConfig) mustStop() {
|
||||
ac.nomadWatcher.mustStop()
|
||||
}
|
||||
|
||||
var configMap = discoveryutils.NewConfigMap()
|
||||
|
||||
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v.(*apiConfig), nil
|
||||
}
|
||||
|
||||
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
hcc := sdc.HTTPClientConfig
|
||||
token, err := getToken(sdc.Token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if token != "" {
|
||||
if hcc.BearerToken != nil {
|
||||
return nil, fmt.Errorf("cannot set both token and bearer_token configs")
|
||||
}
|
||||
hcc.BearerToken = promauth.NewSecret(token)
|
||||
}
|
||||
if len(sdc.Username) > 0 {
|
||||
if hcc.BasicAuth != nil {
|
||||
return nil, fmt.Errorf("cannot set both username and basic_auth configs")
|
||||
}
|
||||
hcc.BasicAuth = &promauth.BasicAuthConfig{
|
||||
Username: sdc.Username,
|
||||
Password: sdc.Password,
|
||||
}
|
||||
}
|
||||
ac, err := hcc.NewConfig(baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse auth config: %w", err)
|
||||
}
|
||||
apiServer := sdc.Server
|
||||
if apiServer == "" {
|
||||
apiServer = "localhost:4646"
|
||||
}
|
||||
if !strings.Contains(apiServer, "://") {
|
||||
scheme := sdc.Scheme
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
}
|
||||
apiServer = scheme + "://" + apiServer
|
||||
}
|
||||
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
|
||||
}
|
||||
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL, proxyAC)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||
}
|
||||
tagSeparator := ","
|
||||
if sdc.TagSeparator != nil {
|
||||
tagSeparator = *sdc.TagSeparator
|
||||
}
|
||||
dc, err := getDatacenter(client, sdc.Datacenter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
namespace := sdc.Namespace
|
||||
// default namespace can be detected from env var.
|
||||
if namespace == "" {
|
||||
namespace = os.Getenv("NOMAD_NAMESPACE")
|
||||
}
|
||||
|
||||
nw := newNomadWatcher(client, sdc, dc, namespace)
|
||||
cfg := &apiConfig{
|
||||
tagSeparator: tagSeparator,
|
||||
nomadWatcher: nw,
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func getToken(token *promauth.Secret) (string, error) {
|
||||
if token != nil {
|
||||
return token.String(), nil
|
||||
}
|
||||
t := os.Getenv("NOMAD_TOKEN")
|
||||
// Allow empty token - it should work if ACL is disabled in Nomad.
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func getDatacenter(client *discoveryutils.Client, dc string) (string, error) {
|
||||
if dc != "" {
|
||||
return dc, nil
|
||||
}
|
||||
// See https://developer.hashicorp.com/nomad/api-docs/agent#query-self
|
||||
data, err := client.GetAPIResponse("/v1/agent/self")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot query nomad agent info: %w", err)
|
||||
}
|
||||
a, err := parseAgent(data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return a.Config.Datacenter, nil
|
||||
}
|
||||
|
||||
// maxWaitTime is duration for Nomad blocking request.
|
||||
func maxWaitTime() time.Duration {
|
||||
d := discoveryutils.BlockingClientReadTimeout
|
||||
// Nomad adds random delay up to wait/16, so reduce the timeout in order to keep it below BlockingClientReadTimeout.
|
||||
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries
|
||||
d -= d / 16
|
||||
// The timeout cannot exceed 10 minuntes. See https://developer.hashicorp.com/nomad/api-docs#blocking-queries
|
||||
|
||||
if d > 10*time.Minute {
|
||||
d = 10 * time.Minute
|
||||
}
|
||||
if *waitTime > time.Second && *waitTime < d {
|
||||
d = *waitTime
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// getBlockingAPIResponse perfoms blocking request to Nomad via client and returns response.
|
||||
// See https://developer.hashicorp.com/nomad/api-docs#blocking-queries .
|
||||
func getBlockingAPIResponse(client *discoveryutils.Client, path string, index int64) ([]byte, int64, error) {
|
||||
path += "&index=" + strconv.FormatInt(index, 10)
|
||||
path += "&wait=" + fmt.Sprintf("%ds", int(maxWaitTime().Seconds()))
|
||||
getMeta := func(resp *fasthttp.Response) {
|
||||
ind := resp.Header.Peek("X-Nomad-Index")
|
||||
if len(ind) == 0 {
|
||||
logger.Errorf("cannot find X-Nomad-Index header in response from %q", path)
|
||||
return
|
||||
}
|
||||
newIndex, err := strconv.ParseInt(string(ind), 10, 64)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse X-Nomad-Index header value in response from %q: %s", path, err)
|
||||
return
|
||||
}
|
||||
// Properly handle the returned newIndex according to https://developer.hashicorp.com/nomad/api-docs#blocking-queries.
|
||||
// Index implementation details are the same for Consul and Nomad: https://developer.hashicorp.com/consul/api-docs/features/blocking#implementation-details
|
||||
if newIndex < 1 {
|
||||
index = 1
|
||||
return
|
||||
}
|
||||
if index > newIndex {
|
||||
index = 0
|
||||
return
|
||||
}
|
||||
index = newIndex
|
||||
}
|
||||
data, err := client.GetBlockingAPIResponse(path, getMeta)
|
||||
if err != nil {
|
||||
return nil, index, fmt.Errorf("cannot perform blocking Nomad API request at %q: %w", path, err)
|
||||
}
|
||||
return data, index, nil
|
||||
}
|
52
lib/promscrape/discovery/nomad/nomad.go
Normal file
52
lib/promscrape/discovery/nomad/nomad.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||
)
|
||||
|
||||
// SDConfig represents service discovery config for Nomad.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#nomad_sd_config
|
||||
type SDConfig struct {
|
||||
Server string `yaml:"server,omitempty"`
|
||||
Token *promauth.Secret `yaml:"token"`
|
||||
Datacenter string `yaml:"datacenter"`
|
||||
Namespace string `yaml:"namespace,omitempty"`
|
||||
// RefreshInterval time.Duration `yaml:"refresh_interval"`
|
||||
// refresh_interval is obtained from `-promscrape.nomadSDCheckInterval` command-line option.
|
||||
Region string `yaml:"region,omitempty"`
|
||||
Scheme string `yaml:"scheme,omitempty"`
|
||||
Username string `yaml:"username"`
|
||||
Password *promauth.Secret `yaml:"password"`
|
||||
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
|
||||
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
|
||||
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
|
||||
Services []string `yaml:"services,omitempty"`
|
||||
Tags []string `yaml:"tags,omitempty"`
|
||||
AllowStale *bool `yaml:"allow_stale,omitempty"`
|
||||
TagSeparator *string `yaml:"tag_separator,omitempty"`
|
||||
}
|
||||
|
||||
// GetLabels returns Nomad labels according to sdc.
|
||||
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||
cfg, err := getAPIConfig(sdc, baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get API config: %w", err)
|
||||
}
|
||||
ms := getServiceLabels(cfg)
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
// MustStop stops further usage for sdc.
|
||||
func (sdc *SDConfig) MustStop() {
|
||||
v := configMap.Delete(sdc)
|
||||
if v != nil {
|
||||
// v can be nil if GetLabels wasn't called yet.
|
||||
cfg := v.(*apiConfig)
|
||||
cfg.mustStop()
|
||||
}
|
||||
}
|
92
lib/promscrape/discovery/nomad/service.go
Normal file
92
lib/promscrape/discovery/nomad/service.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
)
|
||||
|
||||
// getServiceLabels returns labels for Nomad services with given cfg.
|
||||
func getServiceLabels(cfg *apiConfig) []*promutils.Labels {
|
||||
svcs := cfg.nomadWatcher.getServiceSnapshot()
|
||||
var ms []*promutils.Labels
|
||||
for _, s := range svcs {
|
||||
for i := range s {
|
||||
ms = s[i].appendTargetLabels(ms, cfg.tagSeparator)
|
||||
}
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
type ServiceList struct {
|
||||
Namespace string `json:"Namespace"`
|
||||
Services []struct {
|
||||
ServiceName string `json:"ServiceName"`
|
||||
Tags []string `json:"Tags"`
|
||||
} `json:"Services"`
|
||||
}
|
||||
|
||||
// Service is Nomad service.
|
||||
// See https://developer.hashicorp.com/nomad/api-docs/services#list-services
|
||||
type Service struct {
|
||||
ID string `json:"ID"`
|
||||
ServiceName string `json:"ServiceName"`
|
||||
Namespace string `json:"Namespace"`
|
||||
NodeID string `json:"NodeID"`
|
||||
Datacenter string `json:"Datacenter"`
|
||||
JobID string `json:"JobID"`
|
||||
AllocID string `json:"AllocID"`
|
||||
Tags []string `json:"Tags"`
|
||||
Address string `json:"Address"`
|
||||
Port int `json:"Port"`
|
||||
}
|
||||
|
||||
func parseServices(data []byte) ([]Service, error) {
|
||||
var sns []Service
|
||||
if err := json.Unmarshal(data, &sns); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal Services from %q: %w", data, err)
|
||||
}
|
||||
return sns, nil
|
||||
}
|
||||
|
||||
func (svc *Service) appendTargetLabels(ms []*promutils.Labels, tagSeparator string) []*promutils.Labels {
|
||||
addr := discoveryutils.JoinHostPort(svc.Address, svc.Port)
|
||||
m := promutils.NewLabels(16)
|
||||
m.Add("__address__", addr)
|
||||
m.Add("__meta_nomad_dc", svc.Datacenter)
|
||||
m.Add("__meta_nomad_address", svc.Address)
|
||||
m.Add("__meta_nomad_namespace", svc.Namespace)
|
||||
m.Add("__meta_nomad_node_id", svc.NodeID)
|
||||
m.Add("__meta_nomad_service", svc.ServiceName)
|
||||
m.Add("__meta_nomad_service_address", svc.Address)
|
||||
m.Add("__meta_nomad_service_id", svc.ID)
|
||||
m.Add("__meta_nomad_service_port", strconv.Itoa(svc.Port))
|
||||
// We surround the separated list with the separator as well. This way regular expressions
|
||||
// in relabeling rules don't have to consider tag positions.
|
||||
m.Add("__meta_nomad_tags", tagSeparator+strings.Join(svc.Tags, tagSeparator)+tagSeparator)
|
||||
|
||||
// Expose individual tags via __meta_nomad_tag_* labels, so users could move all the tags
|
||||
// into the discovered scrape target with the following relabeling rule in the way similar to kubernetes_sd_configs:
|
||||
//
|
||||
// - action: labelmap
|
||||
// regex: __meta_nomad_tag_(.+)
|
||||
//
|
||||
// This solves https://stackoverflow.com/questions/44339461/relabeling-in-prometheus
|
||||
for _, tag := range svc.Tags {
|
||||
k := tag
|
||||
v := ""
|
||||
if n := strings.IndexByte(tag, '='); n >= 0 {
|
||||
k = tag[:n]
|
||||
v = tag[n+1:]
|
||||
}
|
||||
m.Add(discoveryutils.SanitizeLabelName("__meta_nomad_tag_"+k), v)
|
||||
m.Add(discoveryutils.SanitizeLabelName("__meta_nomad_tagpresent_"+k), "true")
|
||||
}
|
||||
|
||||
ms = append(ms, m)
|
||||
return ms
|
||||
}
|
79
lib/promscrape/discovery/nomad/service_test.go
Normal file
79
lib/promscrape/discovery/nomad/service_test.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
)
|
||||
|
||||
func TestParseServicesFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
sns, err := parseServices([]byte(s))
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
if sns != nil {
|
||||
t.Fatalf("unexpected non-nil ServiceNodes: %v", sns)
|
||||
}
|
||||
}
|
||||
f(``)
|
||||
f(`[1,23]`)
|
||||
f(`{"items":[{"metadata":1}]}`)
|
||||
}
|
||||
|
||||
func TestParseServiceNodesSuccess(t *testing.T) {
|
||||
data := `
|
||||
[
|
||||
{
|
||||
"ID": "_nomad-task-1a321d90-79b5-681f-e6fa-8a43c8ec6b69-web-doggo-web-http",
|
||||
"ServiceName": "doggo-web",
|
||||
"Namespace": "default",
|
||||
"NodeID": "9e02c85b-db59-45f1-ddee-40d0317bd33d",
|
||||
"Datacenter": "dc1",
|
||||
"JobID": "doggo",
|
||||
"AllocID": "1a321d90-79b5-681f-e6fa-8a43c8ec6b69",
|
||||
"Tags": [
|
||||
"doggo",
|
||||
"web"
|
||||
],
|
||||
"Address": "192.168.29.76",
|
||||
"Port": 23761,
|
||||
"CreateIndex": 402,
|
||||
"ModifyIndex": 402
|
||||
}
|
||||
]
|
||||
`
|
||||
sns, err := parseServices([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if len(sns) != 1 {
|
||||
t.Fatalf("unexpected length of ServiceNodes; got %d; want %d", len(sns), 1)
|
||||
}
|
||||
sn := sns[0]
|
||||
|
||||
// Check sn.appendTargetLabels()
|
||||
tagSeparator := ","
|
||||
labelss := sn.appendTargetLabels(nil, tagSeparator)
|
||||
expectedLabelss := []*promutils.Labels{
|
||||
promutils.NewLabelsFromMap(map[string]string{
|
||||
"__address__": "192.168.29.76:23761",
|
||||
"__meta_nomad_dc": "dc1",
|
||||
"__meta_nomad_node_id": "9e02c85b-db59-45f1-ddee-40d0317bd33d",
|
||||
"__meta_nomad_address": "192.168.29.76",
|
||||
"__meta_nomad_namespace": "default",
|
||||
"__meta_nomad_service": "doggo-web",
|
||||
"__meta_nomad_service_address": "192.168.29.76",
|
||||
"__meta_nomad_service_id": "_nomad-task-1a321d90-79b5-681f-e6fa-8a43c8ec6b69-web-doggo-web-http",
|
||||
"__meta_nomad_service_port": "23761",
|
||||
"__meta_nomad_tag_doggo": "",
|
||||
"__meta_nomad_tag_web": "",
|
||||
"__meta_nomad_tagpresent_doggo": "true",
|
||||
"__meta_nomad_tagpresent_web": "true",
|
||||
"__meta_nomad_tags": ",doggo,web,",
|
||||
}),
|
||||
}
|
||||
discoveryutils.TestEqualLabelss(t, labelss, expectedLabelss)
|
||||
}
|
310
lib/promscrape/discovery/nomad/watch.go
Normal file
310
lib/promscrape/discovery/nomad/watch.go
Normal file
|
@ -0,0 +1,310 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// SDCheckInterval is check interval for Nomad service discovery.
|
||||
var SDCheckInterval = flag.Duration("promscrape.nomadSDCheckInterval", 30*time.Second, "Interval for checking for changes in Nomad. "+
|
||||
"This works only if nomad_sd_configs is configured in '-promscrape.config' file. "+
|
||||
"See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details")
|
||||
|
||||
// nomadWatcher is a watcher for nomad api, updates services map in background with long-polling.
|
||||
type nomadWatcher struct {
|
||||
client *discoveryutils.Client
|
||||
|
||||
serviceNamesQueryArgs string
|
||||
watchServices []string
|
||||
watchTags []string
|
||||
|
||||
// servicesLock protects services
|
||||
servicesLock sync.Mutex
|
||||
services map[string]*serviceWatcher
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
type serviceWatcher struct {
|
||||
serviceName string
|
||||
services []Service
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// newNomadWatcher creates new watcher and starts background service discovery for Nomad.
|
||||
func newNomadWatcher(client *discoveryutils.Client, sdc *SDConfig, datacenter, namespace string) *nomadWatcher {
|
||||
baseQueryArgs := "?dc=" + url.QueryEscape(datacenter)
|
||||
if sdc.AllowStale == nil || *sdc.AllowStale {
|
||||
baseQueryArgs += "&stale"
|
||||
}
|
||||
if namespace != "" {
|
||||
baseQueryArgs += "&namespace=" + url.QueryEscape(namespace)
|
||||
}
|
||||
serviceNodesQueryArgs := baseQueryArgs
|
||||
|
||||
cw := &nomadWatcher{
|
||||
client: client,
|
||||
serviceNamesQueryArgs: serviceNodesQueryArgs,
|
||||
watchServices: sdc.Services,
|
||||
watchTags: sdc.Tags,
|
||||
services: make(map[string]*serviceWatcher),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
initCh := make(chan struct{})
|
||||
go cw.watchForServicesUpdates(initCh)
|
||||
// wait for initialization to complete
|
||||
<-initCh
|
||||
return cw
|
||||
}
|
||||
|
||||
func (cw *nomadWatcher) mustStop() {
|
||||
close(cw.stopCh)
|
||||
// Do not wait for the watcher to stop, since it may take
|
||||
// up to discoveryutils.BlockingClientReadTimeout to complete.
|
||||
// TODO: add ability to cancel blocking requests.
|
||||
}
|
||||
|
||||
func (cw *nomadWatcher) updateServices(serviceNames []string) {
|
||||
var initWG sync.WaitGroup
|
||||
// Start watchers for new services.
|
||||
cw.servicesLock.Lock()
|
||||
for _, serviceName := range serviceNames {
|
||||
if _, ok := cw.services[serviceName]; ok {
|
||||
// The watcher for serviceName already exists.
|
||||
continue
|
||||
}
|
||||
sw := &serviceWatcher{
|
||||
serviceName: serviceName,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
cw.services[serviceName] = sw
|
||||
cw.wg.Add(1)
|
||||
serviceWatchersCreated.Inc()
|
||||
initWG.Add(1)
|
||||
go func() {
|
||||
serviceWatchersCount.Inc()
|
||||
sw.watchForServiceAddressUpdates(cw, &initWG)
|
||||
serviceWatchersCount.Dec()
|
||||
cw.wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop watchers for removed services.
|
||||
newServiceNamesMap := make(map[string]struct{}, len(serviceNames))
|
||||
for _, serviceName := range serviceNames {
|
||||
newServiceNamesMap[serviceName] = struct{}{}
|
||||
}
|
||||
for serviceName, sw := range cw.services {
|
||||
if _, ok := newServiceNamesMap[serviceName]; ok {
|
||||
continue
|
||||
}
|
||||
close(sw.stopCh)
|
||||
delete(cw.services, serviceName)
|
||||
serviceWatchersStopped.Inc()
|
||||
|
||||
// Do not wait for the watcher goroutine to exit, since this may take for up to maxWaitTime
|
||||
// if it is blocked in Nomad API request.
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
|
||||
// Wait for initialization to complete.
|
||||
initWG.Wait()
|
||||
}
|
||||
|
||||
// watchForServicesUpdates watches for new services and updates it in cw.
|
||||
//
|
||||
// watchForServicesUpdates closes the initCh once the initialization is complete and first discovery iteration is done.
|
||||
func (cw *nomadWatcher) watchForServicesUpdates(initCh chan struct{}) {
|
||||
index := int64(0)
|
||||
clientAddr := cw.client.Addr()
|
||||
f := func() {
|
||||
serviceNames, newIndex, err := cw.getBlockingServiceNames(index)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot obtain Nomad serviceNames from %q: %s", clientAddr, err)
|
||||
return
|
||||
}
|
||||
if index == newIndex {
|
||||
// Nothing changed.
|
||||
return
|
||||
}
|
||||
cw.updateServices(serviceNames)
|
||||
index = newIndex
|
||||
}
|
||||
|
||||
logger.Infof("started Nomad service watcher for %q", clientAddr)
|
||||
f()
|
||||
|
||||
// send signal that initialization is complete
|
||||
close(initCh)
|
||||
|
||||
checkInterval := getCheckInterval()
|
||||
ticker := time.NewTicker(checkInterval / 2)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-cw.stopCh:
|
||||
logger.Infof("stopping Nomad service watchers for %q", clientAddr)
|
||||
startTime := time.Now()
|
||||
cw.servicesLock.Lock()
|
||||
for _, sw := range cw.services {
|
||||
close(sw.stopCh)
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
cw.wg.Wait()
|
||||
logger.Infof("stopped Nomad service watcher for %q in %.3f seconds", clientAddr, time.Since(startTime).Seconds())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
serviceWatchersCreated = metrics.NewCounter("vm_promscrape_discovery_nomad_service_watchers_created_total")
|
||||
serviceWatchersStopped = metrics.NewCounter("vm_promscrape_discovery_nomad_service_watchers_stopped_total")
|
||||
serviceWatchersCount = metrics.NewCounter("vm_promscrape_discovery_nomad_service_watchers")
|
||||
)
|
||||
|
||||
// getBlockingServiceNames obtains service names via blocking request to Nomad.
|
||||
//
|
||||
// It returns an empty serviceNames list if response contains the same index.
|
||||
func (cw *nomadWatcher) getBlockingServiceNames(index int64) ([]string, int64, error) {
|
||||
path := "/v1/services" + cw.serviceNamesQueryArgs
|
||||
data, newIndex, err := getBlockingAPIResponse(cw.client, path, index)
|
||||
if err != nil {
|
||||
return nil, index, err
|
||||
}
|
||||
if index == newIndex {
|
||||
// Nothing changed - return an empty serviceNames list.
|
||||
return nil, index, nil
|
||||
}
|
||||
|
||||
var svcs []ServiceList
|
||||
if err := json.Unmarshal(data, &svcs); err != nil {
|
||||
return nil, index, fmt.Errorf("cannot parse response from %q: %w; data=%q", path, err, data)
|
||||
}
|
||||
|
||||
serviceNames := make([]string, 0, len(svcs))
|
||||
for _, svc := range svcs {
|
||||
for _, s := range svc.Services {
|
||||
if !shouldCollectServiceByName(cw.watchServices, s.ServiceName) {
|
||||
continue
|
||||
}
|
||||
if !shouldCollectServiceByTags(cw.watchTags, s.Tags) {
|
||||
continue
|
||||
}
|
||||
serviceNames = append(serviceNames, s.ServiceName)
|
||||
}
|
||||
}
|
||||
|
||||
return serviceNames, newIndex, nil
|
||||
}
|
||||
|
||||
// getServiceSnapshot returns a snapshot of discovered Services.
|
||||
func (cw *nomadWatcher) getServiceSnapshot() map[string][]Service {
|
||||
cw.servicesLock.Lock()
|
||||
sns := make(map[string][]Service, len(cw.services))
|
||||
for svc, sw := range cw.services {
|
||||
sns[svc] = sw.services
|
||||
}
|
||||
cw.servicesLock.Unlock()
|
||||
return sns
|
||||
}
|
||||
|
||||
// watchForServiceNodesUpdates watches for Nomad serviceNode changes for the given serviceName.
|
||||
//
|
||||
// watchForServiceNodesUpdates calls initWG.Done() once the initialization is complete and the first discovery iteration is done.
|
||||
func (sw *serviceWatcher) watchForServiceAddressUpdates(nw *nomadWatcher, initWG *sync.WaitGroup) {
|
||||
clientAddr := nw.client.Addr()
|
||||
index := int64(0)
|
||||
// TODO: Maybe use a different query arg.
|
||||
path := "/v1/service/" + sw.serviceName + nw.serviceNamesQueryArgs
|
||||
f := func() {
|
||||
data, newIndex, err := getBlockingAPIResponse(nw.client, path, index)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot obtain Nomad services for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
|
||||
return
|
||||
}
|
||||
if index == newIndex {
|
||||
// Nothing changed.
|
||||
return
|
||||
}
|
||||
sns, err := parseServices(data)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot parse Nomad services response for serviceName=%q from %q: %s", sw.serviceName, clientAddr, err)
|
||||
return
|
||||
}
|
||||
|
||||
nw.servicesLock.Lock()
|
||||
sw.services = sns
|
||||
nw.servicesLock.Unlock()
|
||||
|
||||
index = newIndex
|
||||
}
|
||||
|
||||
f()
|
||||
// Notify caller that initialization is complete
|
||||
initWG.Done()
|
||||
|
||||
checkInterval := getCheckInterval()
|
||||
ticker := time.NewTicker(checkInterval / 2)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
f()
|
||||
case <-sw.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func shouldCollectServiceByName(filterServices []string, serviceName string) bool {
|
||||
if len(filterServices) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, filterService := range filterServices {
|
||||
// Use case-insensitive comparison for service names according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1422
|
||||
if strings.EqualFold(filterService, serviceName) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func shouldCollectServiceByTags(filterTags, tags []string) bool {
|
||||
if len(filterTags) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, filterTag := range filterTags {
|
||||
hasTag := false
|
||||
for _, tag := range tags {
|
||||
if tag == filterTag {
|
||||
hasTag = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasTag {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getCheckInterval() time.Duration {
|
||||
d := *SDCheckInterval
|
||||
if d <= time.Second {
|
||||
return time.Second
|
||||
}
|
||||
return d
|
||||
}
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
|
@ -130,6 +131,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
|
|||
scs.add("gce_sd_configs", *gce.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
|
||||
scs.add("http_sd_configs", *http.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getHTTPDScrapeWork(swsPrev) })
|
||||
scs.add("kubernetes_sd_configs", *kubernetes.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
|
||||
scs.add("nomad_sd_configs", *nomad.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getNomadSDScrapeWork(swsPrev) })
|
||||
scs.add("openstack_sd_configs", *openstack.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
|
||||
scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })
|
||||
scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
|
||||
|
|
Loading…
Reference in a new issue