add kuma_sd_config for Kuma Control Plane targets discovery (#3389) (#3840)

This commit is contained in:
Alexander Marshalov 2023-02-22 13:59:56 +01:00 committed by Aliaksandr Valialkin
parent 88d2a6c3a3
commit 173643a771
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
12 changed files with 725 additions and 6 deletions

View file

@ -1371,6 +1371,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
How frequently to reload the full state from Kubernetes API server (default 30m0s)
-promscrape.kubernetesSDCheckInterval duration
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
-promscrape.kumaSDCheckInterval duration
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 1m0s)
-promscrape.kuma.waitTime duration
Wait time used by Kuma service discovery. Default value is used if not set
-promscrape.maxDroppedTargets int
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
-promscrape.maxResponseHeadersSize size

View file

@ -32,6 +32,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `range_trim_zscore(z, q)` function for dropping outliers located farther than `z*range_stddev(q)` from `range_avg(q)`. This should help removing outliers during query time at [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3759).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show `median` instead of `avg` in graph tooltip and line legend, since `median` is more tolerant against spikes. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3706).
* FEATURE: allow setting zero value for `-search.latencyOffset` command-line flag. This may be needed in [some cases](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836). Previously the minimum supported value for `-search.latencyOffset` command-line flag was `1s`.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `kuma_sd_config` for [Kuma](http://kuma.io/) Control Plane targets discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3389).
* BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790).
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).

View file

@ -20,6 +20,7 @@ sort: 24
* `gce_sd_configs` is for discovering and scraping [Google Compute Engine](https://cloud.google.com/compute) targets. See [these docs](#gce_sd_configs).
* `http_sd_configs` is for discovering and scraping targerts provided by external http-based service discovery. See [these docs](#http_sd_configs).
* `kubernetes_sd_configs` is for discovering and scraping [Kubernetes](https://kubernetes.io/) targets. See [these docs](#kubernetes_sd_configs).
* `kuma_sd_configs` is for discovering and scraping [Kuma](https://kuma.io) targets. See [these docs](#kuma_sd_configs).
* `nomad_sd_configs` is for discovering and scraping targets registered in [HashiCorp Nomad](https://www.nomadproject.io/). See [these docs](#nomad_sd_configs).
* `openstack_sd_configs` is for discovering and scraping OpenStack targets. See [these docs](#openstack_sd_configs).
* `static_configs` is for scraping statically defined targets. See [these docs](#static_configs).
@ -944,6 +945,42 @@ One of the following `role` types can be configured to discover targets:
* `__meta_kubernetes_ingress_scheme`: Protocol scheme of ingress, https if TLS config is set. Defaults to http.
* `__meta_kubernetes_ingress_path`: Path from ingress spec. Defaults to `/`.
## kuma_sd_configs
Kuma service discovery config allows to fetch targets from the specified control plane `server` of [Kuma Service Mesh](https://kuma.io).
It discovers "monitoring assignments" based on Kuma Dataplane Proxies,
via the [MADS (Monitoring Assignment Discovery Service)](https://kuma.io/docs/2.1.x/policies/traffic-metrics/#traffic-metrics)
[xDS RESP API](http://envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol).
Configuration example:
```yaml
scrape_configs:
- job_name: kuma
kuma_sd_configs:
# server must contain the URL of Kuma Control Plane's MADS xDS server.
- server: "http://localhost:5676"
# Additional HTTP API client options can be specified here.
# See https://docs.victoriametrics.com/sd_configs.html#http-api-client-options
```
The `server` is queried periodically with the interval specified in `-promscrape.kumaSDCheckInterval` command-line flag.
Discovery errors are tracked in `promscrape_discovery_kuma_errors_total` metric.
Each discovered target has an [`__address__`](https://docs.victoriametrics.com/relabeling.html#how-to-modify-scrape-urls-in-targets) label set
to one of the targets returned by the http service.
The following meta labels are available on discovered targets during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling):
* `__meta_server`: the URL of Kuma Control Plane's MADS xDS server from which the target was extracted
* `__meta_kuma_mesh`: the name of the mesh
* `__meta_kuma_service`: the name of the service associated with the proxy
* `__meta_kuma_dataplane`: the name of the proxy
* `__meta_kuma_label_<label_name>`: each label of target given from Kuma Control Plane
* `__metrics_path__`: the path by which the service metrics are scraped
## nomad_sd_configs
Nomad SD configuration allows retrieving scrape targets from [HashiCorp Nomad Services](https://www.hashicorp.com/blog/nomad-service-discovery).

View file

@ -1373,6 +1373,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Interval for checking for changes in http endpoint service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#http_sd_configs for details (default 1m0s)
-promscrape.kubernetes.apiServerTimeout duration
How frequently to reload the full state from Kubernetes API server (default 30m0s)
-promscrape.kumaSDCheckInterval duration
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 1m0s)
-promscrape.kuma.waitTime duration
Wait time used by Kuma service discovery. Default value is used if not set
-promscrape.kubernetesSDCheckInterval duration
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
-promscrape.maxDroppedTargets int

View file

@ -30,6 +30,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kuma"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
@ -264,6 +265,7 @@ type ScrapeConfig struct {
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs,omitempty"`
HTTPSDConfigs []http.SDConfig `yaml:"http_sd_configs,omitempty"`
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs,omitempty"`
KumaSDConfigs []kuma.SDConfig `yaml:"kuma_sd_configs,omitempty"`
NomadSDConfigs []nomad.SDConfig `yaml:"nomad_sd_configs,omitempty"`
OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs,omitempty"`
StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"`
@ -332,6 +334,9 @@ func (sc *ScrapeConfig) mustStop() {
for i := range sc.KubernetesSDConfigs {
sc.KubernetesSDConfigs[i].MustStop()
}
for i := range sc.KumaSDConfigs {
sc.KumaSDConfigs[i].MustStop()
}
for i := range sc.NomadSDConfigs {
sc.NomadSDConfigs[i].MustStop()
}
@ -812,6 +817,33 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
return dst
}
// getKumaSDScrapeWork returns `kuma_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getKumaSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.KumaSDConfigs {
sdc := &sc.KumaSDConfigs[j]
var okLocal bool
dst, okLocal = appendSDScrapeWork(dst, sdc, cfg.baseDir, sc.swc, "kuma_sd_config")
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering kuma targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getNomadSDScrapeWork returns `nomad_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getNomadSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
@ -893,7 +925,7 @@ func (cfg *Config) getYandexCloudSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork
return dst
}
// getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg.
// getStaticScrapeWork returns `static_configs` ScrapeWork from cfg.
func (cfg *Config) getStaticScrapeWork() []*ScrapeWork {
var dst []*ScrapeWork
for _, sc := range cfg.ScrapeConfigs {

View file

@ -0,0 +1,207 @@
package kuma
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"path"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/metrics"
)
var configMap = discoveryutils.NewConfigMap()
type apiConfig struct {
client *discoveryutils.Client
path string
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex // protects targets
targets []kumaTarget
latestVersion string
latestNonce string
fetchErrors *metrics.Counter
parseErrors *metrics.Counter
}
const (
discoveryNode = "victoria-metrics"
xdsApiVersion = "v3"
xdsRequestType = "discovery"
xdsResourceType = "monitoringassignments"
xdsResourceTypeUrl = "type.googleapis.com/kuma.observability.v1.MonitoringAssignment"
)
var waitTime = flag.Duration("promscrape.kuma.waitTime", 0, "Wait time used by Kuma service discovery. Default value is used if not set")
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
ac, err := sdc.HTTPClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse auth config: %w", err)
}
parsedURL, err := url.Parse(sdc.Server)
if err != nil {
return nil, fmt.Errorf("cannot parse kuma_sd server URL: %w", err)
}
apiServer := fmt.Sprintf("%s://%s", parsedURL.Scheme, parsedURL.Host)
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
if err != nil {
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
}
client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL, proxyAC)
if err != nil {
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
}
apiPath := path.Join(
parsedURL.RequestURI(),
xdsApiVersion,
xdsRequestType+":"+xdsResourceType,
)
cfg := &apiConfig{
client: client,
path: apiPath,
fetchErrors: metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_discovery_kuma_errors_total{type="fetch",url=%q}`, sdc.Server)),
parseErrors: metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_discovery_kuma_errors_total{type="parse",url=%q}`, sdc.Server)),
}
// initialize targets synchronously and start updating them in background
cfg.startWatcher()
return cfg, nil
}
func (cfg *apiConfig) startWatcher() func() {
ctx, cancel := context.WithCancel(context.Background())
cfg.cancel = cancel
// blocking initial targets update
if err := cfg.updateTargets(ctx); err != nil {
logger.Errorf("there were errors when discovering kuma targets, so preserving the previous targets. error: %v", err)
}
// start updating targets with a long polling in background
cfg.wg.Add(1)
go func() {
ticker := time.NewTicker(*SDCheckInterval)
defer cfg.wg.Done()
defer ticker.Stop()
for {
select {
case <-ticker.C:
// we are constantly waiting for targets updates in long polling requests
err := cfg.updateTargets(ctx)
if err != nil {
logger.Errorf("there were errors when discovering kuma targets, so preserving the previous targets. error: %v", err)
}
case <-ctx.Done():
return
}
}
}()
return cancel
}
func (cfg *apiConfig) stopWatcher() {
cfg.cancel()
cfg.wg.Wait()
}
func (cfg *apiConfig) getTargets() ([]kumaTarget, error) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
return cfg.targets, nil
}
func (cfg *apiConfig) updateTargets(ctx context.Context) error {
requestBody, err := json.Marshal(discoveryRequest{
VersionInfo: cfg.latestVersion,
Node: discoveryRequestNode{Id: discoveryNode},
TypeUrl: xdsResourceTypeUrl,
ResponseNonce: cfg.latestNonce,
})
if err != nil {
return fmt.Errorf("cannot marshal request body for kuma_sd api: %w", err)
}
var statusCode int
data, err := cfg.client.GetBlockingAPIResponseWithParamsCtx(
ctx,
cfg.path,
func(request *http.Request) {
request.Method = http.MethodPost
request.Body = io.NopCloser(bytes.NewReader(requestBody))
// set max duration for long polling request
query := request.URL.Query()
query.Add("fetch-timeout", cfg.getWaitTime().String())
request.URL.RawQuery = query.Encode()
request.Header.Set("Accept", "application/json")
request.Header.Set("Content-Type", "application/json")
},
func(response *http.Response) {
statusCode = response.StatusCode
},
)
if statusCode == http.StatusNotModified {
return nil
}
if err != nil {
cfg.fetchErrors.Inc()
return fmt.Errorf("cannot read kuma_sd api response: %w", err)
}
response, err := parseDiscoveryResponse(data)
if err != nil {
cfg.parseErrors.Inc()
return fmt.Errorf("cannot parse kuma_sd api response: %w", err)
}
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.targets = parseKumaTargets(response)
cfg.latestVersion = response.VersionInfo
cfg.latestNonce = response.Nonce
return nil
}
func (cfg *apiConfig) getWaitTime() time.Duration {
d := discoveryutils.BlockingClientReadTimeout
// Reduce wait time to avoid timeouts (request execution time should be less than the read timeout)
d -= d / 8
if *waitTime > time.Second && *waitTime < d {
d = *waitTime
}
return d
}
func (cfg *apiConfig) mustStop() {
cfg.stopWatcher()
cfg.client.Stop()
}

View file

@ -0,0 +1,176 @@
package kuma
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
func Test_buildAPIPath(t *testing.T) {
type args struct {
server string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "get api path ok",
args: args{server: "http://localhost:5676"},
want: "/v3/discovery:monitoringassignments",
},
{
name: "get api path incorrect server URL",
args: args{server: ":"},
wantErr: true,
},
{
name: "get api path incorrect server URL err",
args: args{server: "api"},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sdConf := &SDConfig{
Server: tt.args.server,
HTTPClientConfig: promauth.HTTPClientConfig{},
ProxyClientConfig: promauth.ProxyClientConfig{},
}
apiConf, err := getAPIConfig(sdConf, ".")
if tt.wantErr {
if err == nil {
t.Errorf("buildAPIPath() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
if apiConf.path != tt.want {
t.Errorf("buildAPIPath() got = %v, want = %v", apiConf.path, tt.want)
}
sdConf.MustStop()
})
}
}
func Test_parseAPIResponse(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want []kumaTarget
wantErr bool
}{
{
name: "parse ok",
args: args{
data: []byte(`{
"version_info":"5dc9a5dd-2091-4426-a886-dfdc24fc99d7",
"resources":[
{
"@type":"type.googleapis.com/kuma.observability.v1.MonitoringAssignment",
"mesh":"default",
"service":"redis",
"labels":{ "test":"test1" },
"targets":[
{
"name":"redis",
"scheme":"http",
"address":"127.0.0.1:5670",
"metrics_path":"/metrics",
"labels":{ "kuma_io_protocol":"tcp" }
}
]
},
{
"@type":"type.googleapis.com/kuma.observability.v1.MonitoringAssignment",
"mesh":"default",
"service":"app",
"labels":{ "test":"test2" },
"targets":[
{
"name":"app",
"scheme":"http",
"address":"127.0.0.1:5671",
"metrics_path":"/metrics",
"labels":{ "kuma_io_protocol":"http" }
}
]
}
],
"type_url":"type.googleapis.com/kuma.observability.v1.MonitoringAssignment"
}`),
},
want: []kumaTarget{
{
Mesh: "default",
Service: "redis",
DataPlane: "redis",
Instance: "redis",
Scheme: "http",
Address: "127.0.0.1:5670",
MetricsPath: "/metrics",
Labels: map[string]string{"kuma_io_protocol": "tcp", "test": "test1"},
},
{
Mesh: "default",
Service: "app",
DataPlane: "app",
Instance: "app",
Scheme: "http",
Address: "127.0.0.1:5671",
MetricsPath: "/metrics",
Labels: map[string]string{"kuma_io_protocol": "http", "test": "test2"},
},
},
},
{
name: "parse err",
args: args{data: []byte(`[]`)},
wantErr: true,
},
{
name: "api version err",
args: args{
data: []byte(`{
"resources":[
{
"@type":"type.googleapis.com/kuma.observability.v2.MonitoringAssignment",
"mesh":"default",
"service":"redis",
"targets":[]
}
],
"type_url":"type.googleapis.com/kuma.observability.v2.MonitoringAssignment"
}`),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resp, err := parseDiscoveryResponse(tt.args.data)
if tt.wantErr {
if err == nil {
t.Errorf("parseDiscoveryResponse() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
got := parseKumaTargets(resp)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseDiscoveryResponse() got = %v, want %v", got, tt.want)
}
})
}
}

View file

@ -0,0 +1,113 @@
package kuma
import (
"flag"
"fmt"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
// SDCheckInterval defines interval for targets refresh.
var SDCheckInterval = flag.Duration("promscrape.kumaSDCheckInterval", time.Minute, "Interval for checking for changes in kuma service discovery. "+
"This works only if kuma_sd_configs is configured in '-promscrape.config' file. "+
"See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details")
// SDConfig represents service discovery config for Kuma Service Mesh.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kuma_sd_config
type SDConfig struct {
Server string `yaml:"server"`
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
}
type kumaTarget struct {
Mesh string `json:"mesh"`
ControlPlane string `json:"controlplane"`
Service string `json:"service"`
DataPlane string `json:"dataplane"`
Instance string `json:"instance"`
Scheme string `json:"scheme"`
Address string `json:"address"`
MetricsPath string `json:"metrics_path"`
Labels map[string]string `json:"labels"`
}
// GetLabels returns kuma service discovery labels according to sdc.
func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
cfg, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot get API config for kuma_sd: %w", err)
}
targets, err := cfg.getTargets()
if err != nil {
return nil, err
}
return kumaTargetsToLabels(targets, sdc.Server), nil
}
// MustStop stops further usage for sdc.
func (sdc *SDConfig) MustStop() {
v := configMap.Delete(sdc)
if v != nil {
// v can be nil if GetLabels wasn't called yet.
cfg := v.(*apiConfig)
cfg.mustStop()
}
}
func kumaTargetsToLabels(src []kumaTarget, sourceURL string) []*promutils.Labels {
ms := make([]*promutils.Labels, 0, len(src))
for _, target := range src {
m := promutils.NewLabels(8 + len(target.Labels))
m.Add("instance", target.Instance)
m.Add("__address__", target.Address)
m.Add("__scheme__", target.Scheme)
m.Add("__metrics_path__", target.MetricsPath)
m.Add("__meta_server", sourceURL)
m.Add("__meta_kuma_mesh", target.Mesh)
m.Add("__meta_kuma_service", target.Service)
m.Add("__meta_kuma_dataplane", target.DataPlane)
for k, v := range target.Labels {
m.Add("__meta_kuma_label_"+k, v)
}
m.RemoveDuplicates()
ms = append(ms, m)
}
return ms
}
func parseKumaTargets(response discoveryResponse) []kumaTarget {
result := make([]kumaTarget, 0, len(response.Resources))
for _, resource := range response.Resources {
for _, target := range resource.Targets {
labels := make(map[string]string)
for label, value := range resource.Labels {
labels[label] = value
}
for label, value := range target.Labels {
labels[label] = value
}
result = append(result, kumaTarget{
Mesh: resource.Mesh,
ControlPlane: response.ControlPlane.Identifier,
Service: resource.Service,
DataPlane: target.Name,
Instance: target.Name,
Scheme: target.Scheme,
Address: target.Address,
MetricsPath: target.MetricsPath,
Labels: labels,
})
}
}
return result
}

View file

@ -0,0 +1,79 @@
package kuma
import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
func Test_kumaTargetsToLabels(t *testing.T) {
type args struct {
src []kumaTarget
}
tests := []struct {
name string
args args
want []*promutils.Labels
}{
{
name: "convert to labels ok",
args: args{
src: []kumaTarget{
{
Mesh: "default",
Service: "redis",
DataPlane: "redis",
Instance: "redis",
Scheme: "http",
Address: "127.0.0.1:5670",
MetricsPath: "/metrics",
Labels: map[string]string{"kuma_io_protocol": "tcp", "kuma_io_service": "redis"},
},
{
Mesh: "default",
Service: "app",
DataPlane: "app",
Instance: "app",
Scheme: "http",
Address: "127.0.0.1:5671",
MetricsPath: "/vm/metrics",
Labels: map[string]string{"kuma_io_protocol": "http", "kuma_io_service": "app"},
},
},
},
want: []*promutils.Labels{
promutils.NewLabelsFromMap(map[string]string{
"instance": "redis",
"__address__": "127.0.0.1:5670",
"__scheme__": "http",
"__metrics_path__": "/metrics",
"__meta_server": "http://localhost:5676",
"__meta_kuma_mesh": "default",
"__meta_kuma_service": "redis",
"__meta_kuma_dataplane": "redis",
"__meta_kuma_label_kuma_io_protocol": "tcp",
"__meta_kuma_label_kuma_io_service": "redis",
}),
promutils.NewLabelsFromMap(map[string]string{
"instance": "app",
"__address__": "127.0.0.1:5671",
"__scheme__": "http",
"__metrics_path__": "/vm/metrics",
"__meta_server": "http://localhost:5676",
"__meta_kuma_mesh": "default",
"__meta_kuma_service": "app",
"__meta_kuma_dataplane": "app",
"__meta_kuma_label_kuma_io_protocol": "http",
"__meta_kuma_label_kuma_io_service": "app",
}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := kumaTargetsToLabels(tt.args.src, "http://localhost:5676")
discoveryutils.TestEqualLabelss(t, got, tt.want)
})
}
}

View file

@ -0,0 +1,56 @@
package kuma
import (
"encoding/json"
"fmt"
)
// discoveryRequest represent xDS-requests for Kuma Service Mesh
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryrequest
type discoveryRequest struct {
VersionInfo string `json:"version_info,omitempty"`
Node discoveryRequestNode `json:"node,omitempty"`
ResourceNames []string `json:"resource_names,omitempty"`
TypeUrl string `json:"type_url,omitempty"`
ResponseNonce string `json:"response_nonce,omitempty"`
}
type discoveryRequestNode struct {
Id string `json:"id,omitempty"`
}
// discoveryResponse represent xDS-requests for Kuma Service Mesh
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryresponse
type discoveryResponse struct {
VersionInfo string `json:"version_info,omitempty"`
Resources []struct {
Mesh string `json:"mesh,omitempty"`
Service string `json:"service,omitempty"`
Targets []struct {
Name string `json:"name,omitempty"`
Scheme string `json:"scheme,omitempty"`
Address string `json:"address,omitempty"`
MetricsPath string `json:"metrics_path,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
} `json:"targets,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
} `json:"resources,omitempty"`
TypeUrl string `json:"type_url,omitempty"`
Nonce string `json:"nonce,omitempty"`
ControlPlane struct {
Identifier string `json:"identifier,omitempty"`
} `json:"control_plane,omitempty"`
}
func parseDiscoveryResponse(data []byte) (discoveryResponse, error) {
response := discoveryResponse{}
err := json.Unmarshal(data, &response)
if err != nil {
return discoveryResponse{}, fmt.Errorf("cannot parse kuma_sd api response, err: %w", err)
}
if response.TypeUrl != xdsResourceTypeUrl {
return discoveryResponse{}, fmt.Errorf("unexpected type_url in kuma_sd api response, expected: %s, got: %s", xdsResourceTypeUrl, response.TypeUrl)
}
return response, nil
}

View file

@ -42,6 +42,9 @@ const (
DefaultClientReadTimeout = time.Minute
)
type RequestCallback func(request *http.Request)
type ResponseCallback func(response *http.Response)
func concurrencyLimitChInit() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
}
@ -165,7 +168,7 @@ func (c *Client) Context() context.Context {
}
// GetAPIResponseWithReqParams returns response for given absolute path with optional callback for request.
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest func(request *http.Request)) ([]byte, error) {
func (c *Client) GetAPIResponseWithReqParams(path string, modifyRequest RequestCallback) ([]byte, error) {
return c.getAPIResponse(path, modifyRequest)
}
@ -175,7 +178,7 @@ func (c *Client) GetAPIResponse(path string) ([]byte, error) {
}
// GetAPIResponse returns response for the given absolute path with optional callback for request.
func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Request)) ([]byte, error) {
func (c *Client) getAPIResponse(path string, modifyRequest RequestCallback) ([]byte, error) {
// Limit the number of concurrent API requests.
concurrencyLimitChOnce.Do(concurrencyLimitChInit)
t := timerpool.Get(*maxWaitTime)
@ -194,17 +197,22 @@ func (c *Client) getAPIResponse(path string, modifyRequest func(request *http.Re
}
// GetBlockingAPIResponse returns response for given absolute path with blocking client and optional callback for api response,
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
func (c *Client) GetBlockingAPIResponse(path string, inspectResponse ResponseCallback) ([]byte, error) {
return c.getAPIResponseWithParamsAndClientCtx(c.clientCtx, c.blockingClient, path, nil, inspectResponse)
}
// GetBlockingAPIResponseCtx returns response for given absolute path with blocking client and optional callback for api response,
func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, inspectResponse func(resp *http.Response)) ([]byte, error) {
func (c *Client) GetBlockingAPIResponseCtx(ctx context.Context, path string, inspectResponse ResponseCallback) ([]byte, error) {
return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, nil, inspectResponse)
}
// GetBlockingAPIResponseWithParamsCtx returns response for given absolute path with blocking client and optional callback for api response,
func (c *Client) GetBlockingAPIResponseWithParamsCtx(ctx context.Context, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
return c.getAPIResponseWithParamsAndClientCtx(ctx, c.blockingClient, path, modifyRequest, inspectResponse)
}
// getAPIResponseWithParamsAndClient returns response for the given absolute path with optional callback for request and for response.
func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest func(req *http.Request), inspectResponse func(resp *http.Response)) ([]byte, error) {
func (c *Client) getAPIResponseWithParamsAndClientCtx(ctx context.Context, client *HTTPClient, path string, modifyRequest RequestCallback, inspectResponse ResponseCallback) ([]byte, error) {
requestURL := c.apiServer + path
u, err := url.Parse(requestURL)
if err != nil {

View file

@ -26,6 +26,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kuma"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
@ -132,6 +133,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarsh
scs.add("gce_sd_configs", *gce.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
scs.add("http_sd_configs", *http.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getHTTPDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetes.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("kuma_sd_configs", *kuma.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKumaSDScrapeWork(swsPrev) })
scs.add("nomad_sd_configs", *nomad.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getNomadSDScrapeWork(swsPrev) })
scs.add("openstack_sd_configs", *openstack.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })