Adds eureka service discovery (#913)

* Adds eureka service discovery
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/851
Netflix service discovery for AWS

* Apply suggestions from code review

Co-authored-by: Aliaksandr Valialkin <valyala@gmail.com>
This commit is contained in:
Nikolay 2020-11-20 14:38:12 +03:00 committed by Aliaksandr Valialkin
parent 7ac49ac176
commit bb2bcb9725
10 changed files with 470 additions and 0 deletions

View file

@ -169,6 +169,8 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
[OpenStack identity API v3](https://docs.openstack.org/api-ref/identity/v3/) is supported only.
* `dockerswarm_sd_configs` - for scraping Docker Swarm targets.
See [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config) for details.
* `eureka_sd_configs` - for scraping targets registered in [Netflix Eureka](https://github.com/Netflix/eureka).
See [eureka_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config) for details.
File feature requests at [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.

View file

@ -7,6 +7,8 @@
snap install victoriametrics
```
* FEATURE: vminsert: export `vm_rpc_vmstorage_is_reachable` metric, which can be used for monitoring reachability of vmstorage nodes from vminsert nodes.
* FEATURE: vmagent: add Netflix Eureka service discovery (aka [eureka_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config)).
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/851
* FEATURE: add `-loggerWarnsPerSecondLimit` command-line flag for rate limiting of WARN messages in logs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/905
* FEATURE: apply `loggerErrorsPerSecondLimit` and `-loggerWarnsPerSecondLimit` rate limit per caller. I.e. log messages are suppressed if the same caller logs the same message
at the rate exceeding the given limit. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/905#issuecomment-729395855

View file

@ -306,6 +306,8 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)
* [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config)
* [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config)
* [eureka_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config)
Other `*_sd_config` types will be supported in the future.

View file

@ -169,6 +169,8 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
[OpenStack identity API v3](https://docs.openstack.org/api-ref/identity/v3/) is supported only.
* `dockerswarm_sd_configs` - for scraping Docker Swarm targets.
See [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config) for details.
* `eureka_sd_configs` - for scraping targets registered in [Netflix Eureka](https://github.com/Netflix/eureka).
See [eureka_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config) for details.
File feature requests at [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.

View file

@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
@ -76,6 +77,7 @@ type ScrapeConfig struct {
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs,omitempty"`
OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs,omitempty"`
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs,omitempty"`
EurekaSDConfigs []eureka.SDConfig `yaml:"eureka_sd_configs,omitempty"`
DockerSwarmConfigs []dockerswarm.SDConfig `yaml:"dockerswarm_sd_configs,omitempty"`
DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs,omitempty"`
EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs,omitempty"`
@ -293,6 +295,34 @@ func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
return dst
}
// getEurekaSDScrapeWork returns `eureka_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getEurekaSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.EurekaSDConfigs {
sdc := &sc.EurekaSDConfigs[j]
var okLocal bool
dst, okLocal = appendEurekaScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering eureka targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
@ -537,6 +567,15 @@ func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir stri
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config"), true
}
func appendEurekaScrapeWork(dst []ScrapeWork, sdc *eureka.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := eureka.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering eureka targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "eureka_sd_config"), true
}
func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := dns.GetLabels(sdc)
if err != nil {

View file

@ -0,0 +1,81 @@
package eureka
import (
"encoding/xml"
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
var configMap = discoveryutils.NewConfigMap()
type apiConfig struct {
client *discoveryutils.Client
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
var (
ba *promauth.BasicAuthConfig
token string
)
if sdc.Token != nil {
token = *sdc.Token
}
port := 80
if sdc.Port == nil {
sdc.Port = &port
}
if len(sdc.Username) > 0 {
ba = &promauth.BasicAuthConfig{
Username: sdc.Username,
Password: sdc.Password,
}
token = ""
}
ac, err := promauth.NewConfig(baseDir, ba, token, "", sdc.TLSConfig)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
apiServer := sdc.Server
if apiServer == "" {
apiServer = "localhost:8080/eureka/v2"
}
if !strings.Contains(apiServer, "://") {
scheme := sdc.Scheme
if scheme == "" {
scheme = "http"
}
apiServer = scheme + "://" + apiServer
}
client, err := discoveryutils.NewClient(apiServer, ac)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}
cfg := &apiConfig{
client: client,
}
return cfg, nil
}
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func getAPIResponse(cfg *apiConfig, path string) ([]byte, error) {
return cfg.client.GetAPIResponse(path)
}
func parseAPIResponse(data []byte) (*applications, error) {
var apps applications
if err := xml.Unmarshal(data, &apps); err != nil {
return nil, fmt.Errorf("failed parse eureka api response: %q, err: %w", data, err)
}
return &apps, nil
}

View file

@ -0,0 +1,107 @@
package eureka
import (
"reflect"
"testing"
)
func Test_parseAPIResponse(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want *applications
wantErr bool
}{
{
name: "parse ok 1 app with instance",
args: args{
data: []byte(`<applications>
<versions__delta>1</versions__delta>
<apps__hashcode>UP_1_</apps__hashcode>
<application>
<name>HELLO-NETFLIX-OSS</name>
<instance>
<hostName>98de25ebef42</hostName>
<app>HELLO-NETFLIX-OSS</app>
<ipAddr>10.10.0.3</ipAddr>
<status>UP</status>
<overriddenstatus>UNKNOWN</overriddenstatus>
<port enabled="true">8080</port>
<securePort enabled="false">443</securePort>
<countryId>1</countryId>
<dataCenterInfo class="com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo">
<name>MyOwn</name>
</dataCenterInfo>
<leaseInfo>
<renewalIntervalInSecs>30</renewalIntervalInSecs>
<durationInSecs>90</durationInSecs>
<registrationTimestamp>1605757726477</registrationTimestamp>
<lastRenewalTimestamp>1605759135484</lastRenewalTimestamp>
<evictionTimestamp>0</evictionTimestamp>
<serviceUpTimestamp>1605757725913</serviceUpTimestamp>
</leaseInfo>
<metadata class="java.util.Collections$EmptyMap"/>
<appGroupName>UNKNOWN</appGroupName>
<homePageUrl>http://98de25ebef42:8080/</homePageUrl>
<statusPageUrl>http://98de25ebef42:8080/Status</statusPageUrl>
<healthCheckUrl>http://98de25ebef42:8080/healthcheck</healthCheckUrl>
<vipAddress>HELLO-NETFLIX-OSS</vipAddress>
<isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
<lastUpdatedTimestamp>1605757726478</lastUpdatedTimestamp>
<lastDirtyTimestamp>1605757725753</lastDirtyTimestamp>
<actionType>ADDED</actionType>
</instance>
</application>
</applications>`),
},
want: &applications{
Applications: []Application{
{
Name: "HELLO-NETFLIX-OSS",
Instances: []Instance{
{
HostName: "98de25ebef42",
HomePageURL: "http://98de25ebef42:8080/",
StatusPageURL: "http://98de25ebef42:8080/Status",
HealthCheckURL: "http://98de25ebef42:8080/healthcheck",
App: "HELLO-NETFLIX-OSS",
IPAddr: "10.10.0.3",
VipAddress: "HELLO-NETFLIX-OSS",
SecureVipAddress: "",
Status: "UP",
Port: Port{
Enabled: true,
Port: 8080,
},
SecurePort: Port{
Port: 443,
},
DataCenterInfo: DataCenterInfo{
Name: "MyOwn",
},
Metadata: MetaData{},
CountryID: 1,
InstanceID: "",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseAPIResponse(tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("parseAPIResponse() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("unxpected response for parseAPIResponse() \ngot = %v, \nwant %v", got, tt.want)
}
})
}
}

View file

@ -0,0 +1,148 @@
package eureka
import (
"encoding/xml"
"fmt"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
const appsAPIPath = "/apps"
// SDConfig represents service discovery config for eureka.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka
type SDConfig struct {
Server string `yaml:"server,omitempty"`
Token *string `yaml:"token"`
Datacenter string `yaml:"datacenter"`
Scheme string `yaml:"scheme,omitempty"`
Username string `yaml:"username"`
Password string `yaml:"password"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option.
Port *int `yaml:"port,omitempty"`
}
type applications struct {
Applications []Application `xml:"application"`
}
// Application - eureka application https://github.com/Netflix/eureka/wiki/Eureka-REST-operations/
type Application struct {
Name string `xml:"name"`
Instances []Instance `xml:"instance"`
}
// Port - eureka instance port.
type Port struct {
Port int `xml:",chardata"`
Enabled bool `xml:"enabled,attr"`
}
// Instance - eureka instance https://github.com/Netflix/eureka/wiki/Eureka-REST-operations
type Instance struct {
HostName string `xml:"hostName"`
HomePageURL string `xml:"homePageUrl"`
StatusPageURL string `xml:"statusPageUrl"`
HealthCheckURL string `xml:"healthCheckUrl"`
App string `xml:"app"`
IPAddr string `xml:"ipAddr"`
VipAddress string `xml:"vipAddress"`
SecureVipAddress string `xml:"secureVipAddress"`
Status string `xml:"status"`
Port Port `xml:"port"`
SecurePort Port `xml:"securePort"`
DataCenterInfo DataCenterInfo `xml:"dataCenterInfo"`
Metadata MetaData `xml:"metadata"`
CountryID int `xml:"countryId"`
InstanceID string `xml:"instanceId"`
}
// MetaData - eureka objects metadata.
type MetaData struct {
Items []Tag `xml:",any"`
}
// Tag - eureka metadata tag - list of k/v values.
type Tag struct {
XMLName xml.Name
Content string `xml:",innerxml"`
}
// DataCenterInfo -eureka datacentre metadata
type DataCenterInfo struct {
Name string `xml:"name"`
Metadata MetaData `xml:"metadata"`
}
// GetLabels returns Eureka labels according to sdc.
func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot get API config: %w", err)
}
data, err := getAPIResponse(cfg, appsAPIPath)
if err != nil {
return nil, err
}
apps, err := parseAPIResponse(data)
if err != nil {
return nil, err
}
return addInstanceLabels(apps, *sdc.Port), nil
}
func addInstanceLabels(applications *applications, port int) []map[string]string {
var ms []map[string]string
for _, app := range applications.Applications {
for _, instance := range app.Instances {
var targetAddress string
if instance.Port.Port > 0 {
targetAddress = discoveryutils.JoinHostPort(instance.HostName, instance.Port.Port)
} else {
targetAddress = discoveryutils.JoinHostPort(instance.HostName, port)
}
m := map[string]string{
"__address__": targetAddress,
"instance": instance.InstanceID,
"__meta_eureka_app_instance_app_nanem": app.Name,
"__meta_eureka_app_instance_hostname": instance.HostName,
"__meta_eureka_app_instance_homepage_url": instance.HomePageURL,
"__meta_eureka_app_instance_statuspage_url": instance.StatusPageURL,
"__meta_eureka_app_instance_healthcheck_url": instance.HealthCheckURL,
"__meta_eureka_app_instance_ip_addr": instance.IPAddr,
"__meta_eureka_app_instance_vip_address": instance.VipAddress,
"__meta_eureka_app_instance_status": instance.Status,
"__meta_eureka_app_instance_country_id": strconv.Itoa(instance.CountryID),
"__meta_eureka_app_instance_id": instance.InstanceID,
}
if len(instance.SecureVipAddress) > 0 {
m["__meta_eureka_app_instance_secure_vip_address"] = instance.SecureVipAddress
}
if instance.Port.Port > 0 {
m["__meta_eureka_app_instance_port"] = strconv.Itoa(instance.Port.Port)
m["__meta_eureka_app_instance_port_enabled"] = strconv.FormatBool(instance.Port.Enabled)
}
if instance.SecurePort.Port > 0 {
m["__meta_eureka_app_instance_secure_port"] = strconv.Itoa(instance.SecurePort.Port)
m["__meta_eureka_app_instance_secure_port_enabled"] = strconv.FormatBool(instance.SecurePort.Enabled)
}
if len(instance.DataCenterInfo.Name) > 0 {
m["__meta_eureka_app_instance_datacenterinfo_name"] = instance.DataCenterInfo.Name
for _, tag := range instance.DataCenterInfo.Metadata.Items {
m["__meta_eureka_app_instance_datacenterinfo_metadata_"+discoveryutils.SanitizeLabelName(tag.XMLName.Local)] = tag.Content
}
}
for _, tag := range instance.Metadata.Items {
m["__meta_eureka_app_instance_metadata_"+discoveryutils.SanitizeLabelName(tag.XMLName.Local)] = tag.Content
}
ms = append(ms, m)
}
}
return ms
}

View file

@ -0,0 +1,83 @@
package eureka
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_addInstanceLabels(t *testing.T) {
type args struct {
applications *applications
port int
}
tests := []struct {
name string
args args
want [][]prompbmarshal.Label
}{
{
name: "1 application",
args: args{
port: 9100,
applications: &applications{
Applications: []Application{
{
Name: "test-app",
Instances: []Instance{
{
Status: "Ok",
HealthCheckURL: "some-url",
HomePageURL: "some-home-url",
StatusPageURL: "some-status-url",
HostName: "host-1",
IPAddr: "10.15.11.11",
CountryID: 5,
VipAddress: "10.15.11.11",
InstanceID: "some-id",
Metadata: MetaData{Items: []Tag{
{
Content: "value-1",
XMLName: struct{ Space, Local string }{Local: "key-1"},
},
}},
},
},
},
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "host-1:9100",
"instance": "some-id",
"__meta_eureka_app_instance_hostname": "host-1",
"__meta_eureka_app_instance_app_nanem": "test-app",
"__meta_eureka_app_instance_healthcheck_url": "some-url",
"__meta_eureka_app_instance_ip_addr": "10.15.11.11",
"__meta_eureka_app_instance_vip_address": "10.15.11.11",
"__meta_eureka_app_instance_country_id": "5",
"__meta_eureka_app_instance_homepage_url": "some-home-url",
"__meta_eureka_app_instance_statuspage_url": "some-status-url",
"__meta_eureka_app_instance_id": "some-id",
"__meta_eureka_app_instance_metadata_key_1": "value-1",
"__meta_eureka_app_instance_status": "Ok",
}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := addInstanceLabels(tt.args.applications, tt.args.port)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range got {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Fatalf("unexpected labels \ngot : %v, \nwant: %v", got, tt.want)
}
})
}
}

View file

@ -28,6 +28,9 @@ var (
consulSDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
eurekaSDCheckInterval = flag.Duration("promscrape.eurekaSDCheckInterval", 30*time.Second, "Interval for checking for changes in eureka. "+
"This works only if `eureka_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config for details")
dnsSDCheckInterval = flag.Duration("promscrape.dnsSDCheckInterval", 30*time.Second, "Interval for checking for changes in dns. "+
"This works only if `dns_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config for details")
@ -99,6 +102,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("eureka_sd_configs", *eurekaSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })