feature: [Marathon SD] Add support for Marathon service discovery

This commit is contained in:
Jiekun 2024-10-14 15:26:00 +08:00
parent 09a458f931
commit cc2f904fce
No known key found for this signature in database
GPG key ID: 4674A8E5B0AAF6CE
8 changed files with 510 additions and 4 deletions

View file

@ -12,6 +12,10 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@ -35,6 +39,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kuma"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/marathon"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ovhcloud"
@ -42,9 +47,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"gopkg.in/yaml.v2"
)
var (
@ -311,6 +313,7 @@ type ScrapeConfig struct {
HTTPSDConfigs []http.SDConfig `yaml:"http_sd_configs,omitempty"`
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs,omitempty"`
KumaSDConfigs []kuma.SDConfig `yaml:"kuma_sd_configs,omitempty"`
MarathonSDConfigs []marathon.SDConfig `yaml:"marathon_sd_configs,omitempty"`
NomadSDConfigs []nomad.SDConfig `yaml:"nomad_sd_configs,omitempty"`
OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs,omitempty"`
OVHCloudSDConfigs []ovhcloud.SDConfig `yaml:"ovhcloud_sd_configs,omitempty"`
@ -742,6 +745,16 @@ func (cfg *Config) getKumaSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
return cfg.getScrapeWorkGeneric(visitConfigs, "kuma_sd_config", prev)
}
// getMarathonSDScrapeWork returns `marathon_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getMarathonSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
visitConfigs := func(sc *ScrapeConfig, visitor func(sdc targetLabelsGetter)) {
for i := range sc.MarathonSDConfigs {
visitor(&sc.MarathonSDConfigs[i])
}
}
return cfg.getScrapeWorkGeneric(visitConfigs, "marathon_sd_config", prev)
}
// getNomadSDScrapeWork returns `nomad_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getNomadSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
visitConfigs := func(sc *ScrapeConfig, visitor func(sdc targetLabelsGetter)) {

View file

@ -0,0 +1,47 @@
package marathon
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// apiConfig contains config for API server.
type apiConfig struct {
cs []*discoveryutils.Client
}
// getAPIConfig get or create API config from configMap.
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (any, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
// newAPIConfig create API Config.
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
ac, err := sdc.HTTPClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
}
cs := make([]*discoveryutils.Client, 0, len(sdc.Servers))
for i := range sdc.Servers {
c, e := discoveryutils.NewClient(sdc.Servers[i], ac, sdc.ProxyURL, proxyAC, &sdc.HTTPClientConfig)
if e != nil {
return nil, fmt.Errorf("cannot create client for %q: %w", sdc.Servers[i], e)
}
cs = append(cs, c)
}
cfg := &apiConfig{
cs: cs,
}
return cfg, nil
}

View file

@ -0,0 +1,98 @@
package marathon
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestGetAppsList_Success(t *testing.T) {
s := newMockMarathonServer(func() []byte {
return []byte(`{
"apps": [
{
"id": "/myapp",
"cmd": "env && sleep 60",
"args": null,
"user": null,
"env": {
"LD_LIBRARY_PATH": "/usr/local/lib/myLib"
},
"instances": 3,
"cpus": 0.1,
"mem": 5,
"disk": 0,
"executor": "",
"constraints": [
[
"hostname",
"UNIQUE",
""
]
],
"uris": [
"https://raw.github.com/mesosphere/marathon/master/README.md"
],
"ports": [
10013,
10015
],
"portDefinitions": [
{
"labels": {"pdl1":"pdl1", "pdl2":"pdl2"},
"port": 1999
}
],
"requirePorts": false,
"backoffSeconds": 1,
"backoffFactor": 1.15,
"maxLaunchDelaySeconds": 3600,
"container": null,
"healthChecks": [],
"dependencies": [],
"upgradeStrategy": {
"minimumHealthCapacity": 1,
"maximumOverCapacity": 1
},
"labels": {},
"acceptedResourceRoles": null,
"version": "2015-09-25T15:13:48.343Z",
"versionInfo": {
"lastScalingAt": "2015-09-25T15:13:48.343Z",
"lastConfigChangeAt": "2015-09-25T15:13:48.343Z"
},
"tasksStaged": 0,
"tasksRunning": 0,
"tasksHealthy": 0,
"tasksUnhealthy": 0,
"deployments": [
{
"id": "9538079c-3898-4e32-aa31-799bf9097f74"
}
]
}
]
}`)
})
// Prepare a discovery HTTP client who calls mock server.
client, err := discoveryutils.NewClient(s.URL, nil, nil, nil, &promauth.HTTPClientConfig{})
if err != nil {
t.Fatalf("unexpected error wen creating http client: %s", err)
}
ac := &apiConfig{
cs: []*discoveryutils.Client{client},
}
apps, err := GetAppsList(ac)
if err != nil {
t.Fatalf("unexpected error in GetAppsList(): %s", err)
}
expect := AppList{}
if !reflect.DeepEqual(apps, expect) {
t.Fatalf("unexpected result, got: %v, expect: %v", apps, expect)
}
}

View file

@ -0,0 +1,95 @@
package marathon
import (
"encoding/json"
"fmt"
"math/rand"
)
// AppList is a list of Marathon apps.
type AppList struct {
Apps []app `json:"apps"`
}
// App describes a service running on Marathon.
type app struct {
ID string `json:"id"`
Tasks []task `json:"tasks"`
RunningTasks int `json:"tasksRunning"`
Labels map[string]string `json:"labels"`
Container container `json:"container"`
PortDefinitions []portDefinition `json:"portDefinitions"`
Networks []network `json:"networks"`
RequirePorts bool `json:"requirePorts"`
}
// task describes one instance of a service running on Marathon.
type task struct {
ID string `json:"id"`
Host string `json:"host"`
Ports []uint32 `json:"ports"`
IPAddresses []ipAddress `json:"ipAddresses"`
}
// ipAddress describes the address and protocol the container's network interface is bound to.
type ipAddress struct {
Address string `json:"ipAddress"`
Proto string `json:"protocol"`
}
// Container describes the runtime an app in running in.
type container struct {
Docker dockerContainer `json:"docker"`
PortMappings []portMapping `json:"portMappings"`
}
// DockerContainer describes a container which uses the docker runtime.
type dockerContainer struct {
Image string `json:"image"`
PortMappings []portMapping `json:"portMappings"`
}
// PortMapping describes in which port the process are binding inside the docker container.
type portMapping struct {
Labels map[string]string `json:"labels"`
ContainerPort uint32 `json:"containerPort"`
HostPort uint32 `json:"hostPort"`
ServicePort uint32 `json:"servicePort"`
}
// PortDefinition describes which load balancer port you should access to access the service.
type portDefinition struct {
Labels map[string]string `json:"labels"`
Port uint32 `json:"port"`
}
// Network describes the name and type of network the container is attached to.
type network struct {
Name string `json:"name"`
Mode string `json:"mode"`
}
// isContainerNet checks if the app's first network is set to mode 'container'.
func (app app) isContainerNet() bool {
return len(app.Networks) > 0 && app.Networks[0].Mode == "container"
}
func GetAppsList(ac *apiConfig) (*AppList, error) {
c := ac.cs[rand.Intn(len(ac.cs))]
// /v2/apps API: get the list of running applications.
// https://mesosphere.github.io/marathon/api-console/index.html
path := "/v2/apps/?embed=apps.tasks"
resp, err := c.GetAPIResponse(path)
if err != nil {
return nil, fmt.Errorf("cannot get Marathon response from %s: %w", path, err)
}
var apps AppList
err = json.Unmarshal(resp, &apps)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal AppList obtained from %q: %w; response=%q", path, err, resp)
}
return &apps, nil
}

View file

@ -0,0 +1,179 @@
package marathon
import (
"flag"
"fmt"
"net"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
// SDCheckInterval defines interval for targets refresh.
var SDCheckInterval = flag.Duration("promscrape.marathonSDCheckInterval", 30*time.Second, "Interval for checking for changes in Marathon REST API. "+
"This works only if marathon_sd_configs is configured in '-promscrape.config' file. "+
"See https://docs.victoriametrics.com/sd_configs/#marathon_sd_configs for details")
// SDConfig is the configuration for OVH Cloud service discovery.
type SDConfig struct {
Servers []string `yaml:"servers"`
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
}
var configMap = discoveryutils.NewConfigMap()
// GetLabels returns gce labels according to sdc.
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
ac, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot get API config: %w", err)
}
apps, err := GetAppsList(ac)
if err != nil {
return nil, err
}
return getAppsLabels(apps), nil
}
// MustStop stops further usage for sdc.
func (sdc *SDConfig) MustStop() {
_ = configMap.Delete(sdc)
}
// getAppsLabels takes an array of Marathon apps and converts them into labels.
func getAppsLabels(apps *AppList) []*promutils.Labels {
ms := make([]*promutils.Labels, 0, len(apps.Apps))
for _, a := range apps.Apps {
ms = append(ms, getAppLabels(&a))
}
return ms
}
func getAppLabels(app *app) *promutils.Labels {
m := promutils.NewLabels(5)
m.Add("__meta_marathon_app", app.ID)
m.Add("__meta_marathon_image", app.Container.Docker.Image)
var ports []uint32
var labels []map[string]string
var prefix string
switch {
case len(app.Container.PortMappings) != 0:
// In Marathon 1.5.x the "container.docker.portMappings" object was moved
// to "container.portMappings".
ports, labels = extractPortMapping(app.Container.PortMappings, app.isContainerNet())
prefix = "__meta_marathon_port_mapping_label_"
case len(app.Container.Docker.PortMappings) != 0:
// Prior to Marathon 1.5 the port mappings could be found at the path
// "container.docker.portMappings".
ports, labels = extractPortMapping(app.Container.Docker.PortMappings, app.isContainerNet())
prefix = "__meta_marathon_port_mapping_label_"
case len(app.PortDefinitions) != 0:
// PortDefinitions deprecates the "ports" array and can be used to specify
// a list of ports with metadata in case a mapping is not required.
ports = make([]uint32, len(app.PortDefinitions))
labels = make([]map[string]string, len(app.PortDefinitions))
for i := 0; i < len(app.PortDefinitions); i++ {
labels[i] = app.PortDefinitions[i].Labels
// When requirePorts is false, this port becomes the 'servicePort', not the listen port.
// In this case, the port needs to be taken from the task instead of the app.
if app.RequirePorts {
ports[i] = app.PortDefinitions[i].Port
}
}
prefix = "meta_marathon_port_definition_label_"
}
// Gather info about the app's 'tasks'. Each instance (container) is considered a task
// and can be reachable at one or more host:port endpoints.
for _, t := range app.Tasks {
// There are no labels to gather if only Ports is defined. (eg. with host networking)
// Ports can only be gathered from the Task (not from the app) and are guaranteed
// to be the same across all tasks. If we haven't gathered any ports by now,
// use the task's ports as the port list.
if len(ports) == 0 && len(t.Ports) != 0 {
ports = t.Ports
}
// Iterate over the ports we gathered using one of the methods above.
for i, port := range ports {
// A zero port here means that either the portMapping has a zero port defined,
// or there is a portDefinition with requirePorts set to false. This means the port
// is auto-generated by Mesos and needs to be looked up in the task.
if port == 0 && len(t.Ports) == len(ports) {
port = t.Ports[i]
}
// Each port represents a possible Prometheus target.
targetAddress := targetEndpoint(&t, port, app.isContainerNet())
m.Add("__address__", targetAddress)
m.Add("__meta_marathon_task", t.ID)
m.Add("__meta_marathon_port_index", strconv.Itoa(i))
// Gather all port labels and set them on the current target, skip if the port has no Marathon labels.
// This will happen in the host networking case with only `ports` defined, where
// it is inefficient to allocate a list of possibly hundreds of empty label maps per host port.
if len(labels) > 0 {
for ln, lv := range labels[i] {
m.Add(prefix+discoveryutils.SanitizeLabelName(ln), lv)
}
}
}
}
for ln, lv := range app.Labels {
m.Add("__meta_marathon_app_label_"+discoveryutils.SanitizeLabelName(ln), lv)
}
return m
}
// targetEndpoint Generate a target endpoint string in host:port format.
func targetEndpoint(task *task, port uint32, containerNet bool) string {
var host string
// Use the task's ipAddress field when it's in a container network
if containerNet && len(task.IPAddresses) > 0 {
host = task.IPAddresses[0].Address
} else {
host = task.Host
}
return net.JoinHostPort(host, strconv.Itoa(int(port)))
}
// extractPortMapping Get a list of ports and a list of labels from a PortMapping.
func extractPortMapping(portMappings []portMapping, containerNet bool) ([]uint32, []map[string]string) {
ports := make([]uint32, len(portMappings))
labels := make([]map[string]string, len(portMappings))
for i := 0; i < len(portMappings); i++ {
labels[i] = portMappings[i].Labels
if containerNet {
// If the app is in a container network, connect directly to the container port.
ports[i] = portMappings[i].ContainerPort
} else {
// Otherwise, connect to the allocated host port for the container.
// Note that this host port is likely set to 0 in the app definition, which means it is
// automatically generated and needs to be extracted from the task's 'ports' array at a later stage.
ports[i] = portMappings[i].HostPort
}
}
return ports, labels
}

View file

@ -0,0 +1,48 @@
package marathon
import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
func TestGetAppLabels(t *testing.T) {
a := app{
ID: "test-service",
Tasks: []task{
{
ID: "test-task-1",
Host: "mesos-slave1",
},
},
RunningTasks: 1,
Labels: map[string]string{"prometheus": "yes"},
Container: container{
Docker: dockerContainer{
Image: "repo/image:tag",
},
PortMappings: []portMapping{
{
Labels: map[string]string{"prometheus": "yes"},
HostPort: 31000,
},
},
},
}
expect := []*promutils.Labels{
promutils.NewLabelsFromMap(map[string]string{
"__meta_marathon_app": "test-service",
"__meta_marathon_image": "repo/image:tag",
"__address__": "mesos-slave1:31000",
"__meta_marathon_task": "test-task-1",
"__meta_marathon_port_index": "0",
"__meta_marathon_port_mapping_label_prometheus": "yes",
"__meta_marathon_app_label_prometheus": "yes",
}),
}
labelList := getAppsLabels(&AppList{Apps: []app{a}})
discoveryutils.TestEqualLabelss(t, labelList, expect)
}

View file

@ -0,0 +1,23 @@
package marathon
import (
"net/http"
"net/http/httptest"
)
type mockMarathonServer struct {
*httptest.Server
responseFunc func() []byte
}
func newMockMarathonServer(responseFunc func() []byte) *mockMarathonServer {
var s mockMarathonServer
s.responseFunc = responseFunc
s.Server = httptest.NewServer(http.HandlerFunc(s.handler))
return &s
}
func (s *mockMarathonServer) handler(w http.ResponseWriter, _ *http.Request) {
data := s.responseFunc()
w.Write(data)
}

View file

@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -28,13 +30,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kuma"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/marathon"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ovhcloud"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/vultr"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
)
var (
@ -140,6 +142,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
scs.add("http_sd_configs", *http.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getHTTPDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetes.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("kuma_sd_configs", *kuma.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKumaSDScrapeWork(swsPrev) })
scs.add("marathon_sd_configs", *marathon.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getMarathonSDScrapeWork(swsPrev) })
scs.add("nomad_sd_configs", *nomad.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getNomadSDScrapeWork(swsPrev) })
scs.add("openstack_sd_configs", *openstack.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
scs.add("ovhcloud_sd_configs", *ovhcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOVHCloudSDScrapeWork(swsPrev) })