mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape/discovery/kuma: follow-up for 317fef95f9
- Do not generate __meta_server label, since it is unavailable in Prometheus. - Add a link to https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs to docs/CHANGELOG.md, so users could click it and read the docs without the need to search the corresponding docs. - Remove kumaTarget struct, since it is easier generating labels for discovered targets directly from the response returned by Kuma. This simplifies the code. - Store the generated labels for discovered targets inside atomic.Value. This allows reading them from concurrent goroutines without the need to use mutex. - Use synchronouse requests to Kuma instead of long polling, since there is a little sense in the long polling when the Kuma server may return 304 Not Modified response every -promscrape.kumaSDCheckInterval. - Remove -promscrape.kuma.waitTime command-line flag, since it is no longer needed when long polling isn't used. - Set default value for -promscrape.kumaSDCheckInterval to 30s in order to be consistent with Prometheus. - Remove unnecessary indirections for string literals, which are used only once, in order to improve code readability. - Remove unused fields from discoveryRequest and discoveryResponse. - Update tests. - Document why fetch_timeout and refresh_interval options are missing in kuma_sd_config. - Add docs to discoveryutils.RequestCallback and discoveryutils.ResponseCallback, since these are public types. Side notes: it is weird that Prometheus implementation for kuma_sd_configs sets `instance` label, since usually this label is set by the Prometheus itself to __address__ after the relabeling phase. See https://www.robustperception.io/life-of-a-label/ Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3389 See https://github.com/prometheus/prometheus/issues/7919 and https://github.com/prometheus/prometheus/pull/8844 as a reference implementation in Prometheus
This commit is contained in:
parent
eb08579452
commit
9fbd45a22f
13 changed files with 298 additions and 471 deletions
|
@ -2339,9 +2339,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-promscrape.kubernetesSDCheckInterval duration
|
||||
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
|
||||
-promscrape.kumaSDCheckInterval duration
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 1m0s)
|
||||
-promscrape.kuma.waitTime duration
|
||||
Wait time used by Kuma service discovery. Default value is used if not set
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
|
||||
-promscrape.maxDroppedTargets int
|
||||
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
|
||||
-promscrape.maxResponseHeadersSize size
|
||||
|
|
|
@ -1372,9 +1372,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.kubernetesSDCheckInterval duration
|
||||
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
|
||||
-promscrape.kumaSDCheckInterval duration
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 1m0s)
|
||||
-promscrape.kuma.waitTime duration
|
||||
Wait time used by Kuma service discovery. Default value is used if not set
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
|
||||
-promscrape.maxDroppedTargets int
|
||||
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
|
||||
-promscrape.maxResponseHeadersSize size
|
||||
|
|
|
@ -19,7 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* SECURITY: upgrade Go builder from Go1.20.0 to Go1.20.1. See [the list of issues addressed in Go1.20.1](https://github.com/golang/go/issues?q=milestone%3AGo1.20.1+label%3ACherryPickApproved).
|
||||
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to VictoriaMetrics located in another datacenter or availability zone. This also allows reducing disk IO under high load when `vmagent` starts queuing the collected data to disk when the remote storage is temporarily unavailable or cannot keep up with the data ingestion rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `kuma_sd_config` for [Kuma](http://kuma.io/) Control Plane targets discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3389).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kuma](http://kuma.io/) Control Plane targets discovery aka [kuma_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3389).
|
||||
* FEATURE: [vmgateway](https://docs.victoriametrics.com/vmgateway.html): add the ability to verify JWT signature via [JWKS endpoint](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-key-sets). See [these docs](https://docs.victoriametrics.com/vmgateway.html#using-jwks-endpoint-for-jwt-signature-verification).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add the ability to limit the number of concurrent requests on a per-user basis via `-maxConcurrentPerUserRequests` command-line flag and via `max_concurrent_requests` config option. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3346) and [these docs](https://docs.victoriametrics.com/vmauth.html#concurrency-limiting).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): automatically retry failing `GET` requests on all [the configured backends](https://docs.victoriametrics.com/vmauth.html#load-balancing). Previously the backend error has been immediately returned to the client without retrying the request on the remaining backends.
|
||||
|
|
|
@ -2339,6 +2339,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
How frequently to reload the full state from Kubernetes API server (default 30m0s)
|
||||
-promscrape.kubernetesSDCheckInterval duration
|
||||
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
|
||||
-promscrape.kumaSDCheckInterval duration
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
|
||||
-promscrape.maxDroppedTargets int
|
||||
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
|
||||
-promscrape.maxResponseHeadersSize size
|
||||
|
|
|
@ -109,6 +109,7 @@ Case studies:
|
|||
* [Brandwatch](https://docs.victoriametrics.com/CaseStudies.html#brandwatch)
|
||||
* [CERN](https://docs.victoriametrics.com/CaseStudies.html#cern)
|
||||
* [COLOPL](https://docs.victoriametrics.com/CaseStudies.html#colopl)
|
||||
* [Dig Security](https://docs.victoriametrics.com/CaseStudies.html#dig-security)
|
||||
* [Fly.io](https://docs.victoriametrics.com/CaseStudies.html#flyio)
|
||||
* [German Research Center for Artificial Intelligence](https://docs.victoriametrics.com/CaseStudies.html#german-research-center-for-artificial-intelligence)
|
||||
* [Grammarly](https://docs.victoriametrics.com/CaseStudies.html#grammarly)
|
||||
|
@ -2341,6 +2342,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
How frequently to reload the full state from Kubernetes API server (default 30m0s)
|
||||
-promscrape.kubernetesSDCheckInterval duration
|
||||
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
|
||||
-promscrape.kumaSDCheckInterval duration
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
|
||||
-promscrape.maxDroppedTargets int
|
||||
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
|
||||
-promscrape.maxResponseHeadersSize size
|
||||
|
|
|
@ -974,12 +974,10 @@ to one of the targets returned by the http service.
|
|||
|
||||
The following meta labels are available on discovered targets during [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling):
|
||||
|
||||
* `__meta_server`: the URL of Kuma Control Plane's MADS xDS server from which the target was extracted
|
||||
* `__meta_kuma_mesh`: the name of the mesh
|
||||
* `__meta_kuma_service`: the name of the service associated with the proxy
|
||||
* `__meta_kuma_dataplane`: the name of the proxy
|
||||
* `__meta_kuma_service`: the name of the service associated with the proxy
|
||||
* `__meta_kuma_label_<label_name>`: each label of target given from Kuma Control Plane
|
||||
* `__metrics_path__`: the path by which the service metrics are scraped
|
||||
|
||||
## nomad_sd_configs
|
||||
|
||||
|
|
|
@ -1373,12 +1373,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Interval for checking for changes in http endpoint service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#http_sd_configs for details (default 1m0s)
|
||||
-promscrape.kubernetes.apiServerTimeout duration
|
||||
How frequently to reload the full state from Kubernetes API server (default 30m0s)
|
||||
-promscrape.kumaSDCheckInterval duration
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 1m0s)
|
||||
-promscrape.kuma.waitTime duration
|
||||
Wait time used by Kuma service discovery. Default value is used if not set
|
||||
-promscrape.kubernetesSDCheckInterval duration
|
||||
Interval for checking for changes in Kubernetes API server. This works only if kubernetes_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs for details (default 30s)
|
||||
-promscrape.kumaSDCheckInterval duration
|
||||
Interval for checking for changes in Kuma Service Mesh API service discovery. This works only if kuma_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details (default 30s)
|
||||
-promscrape.maxDroppedTargets int
|
||||
The maximum number of droppedTargets to show at /api/v1/targets page. Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. Note that the increased number of tracked dropped targets may result in increased memory usage (default 1000)
|
||||
-promscrape.maxResponseHeadersSize size
|
||||
|
|
|
@ -4,30 +4,34 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var configMap = discoveryutils.NewConfigMap()
|
||||
|
||||
type apiConfig struct {
|
||||
client *discoveryutils.Client
|
||||
path string
|
||||
client *discoveryutils.Client
|
||||
apiPath string
|
||||
|
||||
// labels contains the latest discovered labels.
|
||||
labels atomic.Value
|
||||
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex // protects targets
|
||||
targets []kumaTarget
|
||||
latestVersion string
|
||||
latestNonce string
|
||||
|
||||
|
@ -35,16 +39,6 @@ type apiConfig struct {
|
|||
parseErrors *metrics.Counter
|
||||
}
|
||||
|
||||
const (
|
||||
discoveryNode = "victoria-metrics"
|
||||
xdsApiVersion = "v3"
|
||||
xdsRequestType = "discovery"
|
||||
xdsResourceType = "monitoringassignments"
|
||||
xdsResourceTypeUrl = "type.googleapis.com/kuma.observability.v1.MonitoringAssignment"
|
||||
)
|
||||
|
||||
var waitTime = flag.Duration("promscrape.kuma.waitTime", 0, "Wait time used by Kuma service discovery. Default value is used if not set")
|
||||
|
||||
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
|
||||
if err != nil {
|
||||
|
@ -54,16 +48,14 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
}
|
||||
|
||||
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
||||
apiServer, apiPath, err := getAPIServerPath(sdc.Server)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse server %q: %w", sdc.Server, err)
|
||||
}
|
||||
ac, err := sdc.HTTPClientConfig.NewConfig(baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse auth config: %w", err)
|
||||
}
|
||||
parsedURL, err := url.Parse(sdc.Server)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse kuma_sd server URL: %w", err)
|
||||
}
|
||||
apiServer := fmt.Sprintf("%s://%s", parsedURL.Scheme, parsedURL.Host)
|
||||
|
||||
proxyAC, err := sdc.ProxyClientConfig.NewConfig(baseDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse proxy auth config: %w", err)
|
||||
|
@ -73,135 +65,205 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
|
|||
return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err)
|
||||
}
|
||||
|
||||
apiPath := path.Join(
|
||||
parsedURL.RequestURI(),
|
||||
xdsApiVersion,
|
||||
xdsRequestType+":"+xdsResourceType,
|
||||
)
|
||||
|
||||
cfg := &apiConfig{
|
||||
client: client,
|
||||
path: apiPath,
|
||||
client: client,
|
||||
apiPath: apiPath,
|
||||
|
||||
fetchErrors: metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_discovery_kuma_errors_total{type="fetch",url=%q}`, sdc.Server)),
|
||||
parseErrors: metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_discovery_kuma_errors_total{type="parse",url=%q}`, sdc.Server)),
|
||||
}
|
||||
|
||||
// initialize targets synchronously and start updating them in background
|
||||
cfg.startWatcher()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cfg.cancel = cancel
|
||||
|
||||
// Initialize targets synchronously and then start updating them in background.
|
||||
// The synchronous targets' update is needed for returning non-empty list of targets
|
||||
// just after the initialization.
|
||||
if err := cfg.updateTargetsLabels(ctx); err != nil {
|
||||
return nil, fmt.Errorf("cannot discover Kuma targets: %w", err)
|
||||
}
|
||||
cfg.wg.Add(1)
|
||||
go func() {
|
||||
defer cfg.wg.Done()
|
||||
cfg.runTargetsWatcher(ctx)
|
||||
}()
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) startWatcher() func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cfg.cancel = cancel
|
||||
|
||||
// blocking initial targets update
|
||||
if err := cfg.updateTargets(ctx); err != nil {
|
||||
logger.Errorf("there were errors when discovering kuma targets, so preserving the previous targets. error: %v", err)
|
||||
func getAPIServerPath(serverURL string) (string, string, error) {
|
||||
if serverURL == "" {
|
||||
return "", "", fmt.Errorf("missing servier url")
|
||||
}
|
||||
|
||||
// start updating targets with a long polling in background
|
||||
cfg.wg.Add(1)
|
||||
go func() {
|
||||
ticker := time.NewTicker(*SDCheckInterval)
|
||||
defer cfg.wg.Done()
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// we are constantly waiting for targets updates in long polling requests
|
||||
err := cfg.updateTargets(ctx)
|
||||
if err != nil {
|
||||
logger.Errorf("there were errors when discovering kuma targets, so preserving the previous targets. error: %v", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return cancel
|
||||
if !strings.Contains(serverURL, "://") {
|
||||
serverURL = "http://" + serverURL
|
||||
}
|
||||
psu, err := url.Parse(serverURL)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("cannot parse server url=%q: %w", serverURL, err)
|
||||
}
|
||||
apiServer := fmt.Sprintf("%s://%s", psu.Scheme, psu.Host)
|
||||
apiPath := psu.Path
|
||||
if !strings.HasSuffix(apiPath, "/") {
|
||||
apiPath += "/"
|
||||
}
|
||||
apiPath += "v3/discovery:monitoringassignments"
|
||||
if psu.RawQuery != "" {
|
||||
apiPath += "?" + psu.RawQuery
|
||||
}
|
||||
return apiServer, apiPath, nil
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) stopWatcher() {
|
||||
func (cfg *apiConfig) runTargetsWatcher(ctx context.Context) {
|
||||
ticker := time.NewTicker(*SDCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
doneCh := ctx.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := cfg.updateTargetsLabels(ctx); err != nil {
|
||||
logger.Errorf("there was an error when discovering Kuma targets, so preserving the previous targets; error: %s", err)
|
||||
}
|
||||
case <-doneCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) mustStop() {
|
||||
cfg.client.Stop()
|
||||
cfg.cancel()
|
||||
cfg.wg.Wait()
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) getTargets() ([]kumaTarget, error) {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
return cfg.targets, nil
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) updateTargets(ctx context.Context) error {
|
||||
requestBody, err := json.Marshal(discoveryRequest{
|
||||
VersionInfo: cfg.latestVersion,
|
||||
Node: discoveryRequestNode{Id: discoveryNode},
|
||||
TypeUrl: xdsResourceTypeUrl,
|
||||
func (cfg *apiConfig) updateTargetsLabels(ctx context.Context) error {
|
||||
dReq := &discoveryRequest{
|
||||
VersionInfo: cfg.latestVersion,
|
||||
Node: discoveryRequestNode{
|
||||
ID: "vmagent",
|
||||
},
|
||||
TypeURL: "type.googleapis.com/kuma.observability.v1.MonitoringAssignment",
|
||||
ResponseNonce: cfg.latestNonce,
|
||||
})
|
||||
}
|
||||
requestBody, err := json.Marshal(dReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot marshal request body for kuma_sd api: %w", err)
|
||||
logger.Panicf("BUG: cannot marshal Kuma discovery request: %s", err)
|
||||
}
|
||||
|
||||
var statusCode int
|
||||
data, err := cfg.client.GetBlockingAPIResponseWithParamsCtx(
|
||||
ctx,
|
||||
cfg.path,
|
||||
func(request *http.Request) {
|
||||
request.Method = http.MethodPost
|
||||
request.Body = io.NopCloser(bytes.NewReader(requestBody))
|
||||
|
||||
// set max duration for long polling request
|
||||
query := request.URL.Query()
|
||||
query.Add("fetch-timeout", cfg.getWaitTime().String())
|
||||
request.URL.RawQuery = query.Encode()
|
||||
|
||||
request.Header.Set("Accept", "application/json")
|
||||
request.Header.Set("Content-Type", "application/json")
|
||||
},
|
||||
func(response *http.Response) {
|
||||
statusCode = response.StatusCode
|
||||
},
|
||||
)
|
||||
|
||||
if statusCode == http.StatusNotModified {
|
||||
return nil
|
||||
updateRequestFunc := func(req *http.Request) {
|
||||
req.Method = "POST"
|
||||
req.Body = io.NopCloser(bytes.NewReader(requestBody))
|
||||
req.Header.Set("Accept", "application/json")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
notModified := false
|
||||
inspectResponseFunc := func(resp *http.Response) {
|
||||
if resp.StatusCode == http.StatusNotModified {
|
||||
// Override status code, so GetBlockingAPIResponseWithParamsCtx() returns nil error.
|
||||
resp.StatusCode = http.StatusOK
|
||||
notModified = true
|
||||
}
|
||||
}
|
||||
data, err := cfg.client.GetBlockingAPIResponseWithParamsCtx(ctx, cfg.apiPath, updateRequestFunc, inspectResponseFunc)
|
||||
if err != nil {
|
||||
cfg.fetchErrors.Inc()
|
||||
return fmt.Errorf("cannot read kuma_sd api response: %w", err)
|
||||
return fmt.Errorf("error when reading Kuma discovery response: %w", err)
|
||||
}
|
||||
if notModified {
|
||||
// The targets weren't modified, so nothing to update.
|
||||
return nil
|
||||
}
|
||||
|
||||
response, err := parseDiscoveryResponse(data)
|
||||
// Parse response
|
||||
labels, versionInfo, nonce, err := parseTargetsLabels(data)
|
||||
if err != nil {
|
||||
cfg.parseErrors.Inc()
|
||||
return fmt.Errorf("cannot parse kuma_sd api response: %w", err)
|
||||
return fmt.Errorf("cannot parse Kuma discovery response received from %q: %w", cfg.client.APIServer(), err)
|
||||
}
|
||||
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
cfg.targets = parseKumaTargets(response)
|
||||
cfg.latestVersion = response.VersionInfo
|
||||
cfg.latestNonce = response.Nonce
|
||||
cfg.labels.Store(&labels)
|
||||
cfg.latestVersion = versionInfo
|
||||
cfg.latestNonce = nonce
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) getWaitTime() time.Duration {
|
||||
d := discoveryutils.BlockingClientReadTimeout
|
||||
// Reduce wait time to avoid timeouts (request execution time should be less than the read timeout)
|
||||
d -= d / 8
|
||||
if *waitTime > time.Second && *waitTime < d {
|
||||
d = *waitTime
|
||||
func parseTargetsLabels(data []byte) ([]*promutils.Labels, string, string, error) {
|
||||
var dResp discoveryResponse
|
||||
if err := json.Unmarshal(data, &dResp); err != nil {
|
||||
return nil, "", "", err
|
||||
}
|
||||
return d
|
||||
return dResp.getTargetsLabels(), dResp.VersionInfo, dResp.Nonce, nil
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) mustStop() {
|
||||
cfg.stopWatcher()
|
||||
cfg.client.Stop()
|
||||
func (dr *discoveryResponse) getTargetsLabels() []*promutils.Labels {
|
||||
var ms []*promutils.Labels
|
||||
for _, r := range dr.Resources {
|
||||
for _, t := range r.Targets {
|
||||
m := promutils.NewLabels(8 + len(r.Labels) + len(t.Labels))
|
||||
|
||||
m.Add("instance", t.Name)
|
||||
m.Add("__address__", t.Address)
|
||||
m.Add("__scheme__", t.Scheme)
|
||||
m.Add("__metrics_path__", t.MetricsPath)
|
||||
m.Add("__meta_kuma_dataplane", t.Name)
|
||||
m.Add("__meta_kuma_mesh", r.Mesh)
|
||||
m.Add("__meta_kuma_service", r.Service)
|
||||
|
||||
addLabels(m, r.Labels)
|
||||
addLabels(m, t.Labels)
|
||||
// Remove possible duplicate labels after addLabels() calls above
|
||||
m.RemoveDuplicates()
|
||||
|
||||
ms = append(ms, m)
|
||||
}
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func addLabels(dst *promutils.Labels, src map[string]string) {
|
||||
bb := bbPool.Get()
|
||||
b := bb.B
|
||||
for k, v := range src {
|
||||
b = append(b[:0], "__meta_kuma_label_"...)
|
||||
b = append(b, discoveryutils.SanitizeLabelName(k)...)
|
||||
labelName := bytesutil.InternBytes(b)
|
||||
dst.Add(labelName, v)
|
||||
}
|
||||
bb.B = b
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
var bbPool bytesutil.ByteBufferPool
|
||||
|
||||
// discoveryRequest represent xDS-requests for Kuma Service Mesh
|
||||
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryrequest
|
||||
type discoveryRequest struct {
|
||||
VersionInfo string `json:"version_info"`
|
||||
Node discoveryRequestNode `json:"node"`
|
||||
ResourceNames []string `json:"resource_names"`
|
||||
TypeURL string `json:"type_url"`
|
||||
ResponseNonce string `json:"response_nonce"`
|
||||
}
|
||||
|
||||
type discoveryRequestNode struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
// discoveryResponse represent xDS-requests for Kuma Service Mesh
|
||||
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryresponse
|
||||
type discoveryResponse struct {
|
||||
VersionInfo string `json:"version_info"`
|
||||
Resources []struct {
|
||||
Mesh string `json:"mesh"`
|
||||
Service string `json:"service"`
|
||||
Targets []struct {
|
||||
Name string `json:"name"`
|
||||
Scheme string `json:"scheme"`
|
||||
Address string `json:"address"`
|
||||
MetricsPath string `json:"metrics_path"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
} `json:"targets"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
} `json:"resources"`
|
||||
Nonce string `json:"nonce"`
|
||||
}
|
||||
|
|
|
@ -1,80 +1,61 @@
|
|||
package kuma
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
)
|
||||
|
||||
func Test_buildAPIPath(t *testing.T) {
|
||||
type args struct {
|
||||
server string
|
||||
func TestGetAPIServerPathSuccess(t *testing.T) {
|
||||
f := func(server, expectedAPIServer, expectedAPIPath string) {
|
||||
t.Helper()
|
||||
apiServer, apiPath, err := getAPIServerPath(server)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if apiServer != expectedAPIServer {
|
||||
t.Fatalf("unexpected API server; got %q; want %q", apiServer, expectedAPIServer)
|
||||
}
|
||||
if apiPath != expectedAPIPath {
|
||||
t.Fatalf("unexpected API path; got %q; want %q", apiPath, expectedAPIPath)
|
||||
}
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "get api path ok",
|
||||
args: args{server: "http://localhost:5676"},
|
||||
want: "/v3/discovery:monitoringassignments",
|
||||
},
|
||||
{
|
||||
name: "get api path incorrect server URL",
|
||||
args: args{server: ":"},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "get api path incorrect server URL err",
|
||||
args: args{server: "api"},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
sdConf := &SDConfig{
|
||||
Server: tt.args.server,
|
||||
HTTPClientConfig: promauth.HTTPClientConfig{},
|
||||
ProxyClientConfig: promauth.ProxyClientConfig{},
|
||||
}
|
||||
apiConf, err := getAPIConfig(sdConf, ".")
|
||||
|
||||
if tt.wantErr {
|
||||
if err == nil {
|
||||
t.Errorf("buildAPIPath() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if apiConf.path != tt.want {
|
||||
t.Errorf("buildAPIPath() got = %v, want = %v", apiConf.path, tt.want)
|
||||
}
|
||||
|
||||
sdConf.MustStop()
|
||||
})
|
||||
}
|
||||
|
||||
// url without path
|
||||
f("http://localhost:5676", "http://localhost:5676", "/v3/discovery:monitoringassignments")
|
||||
// url with path
|
||||
f("http://localhost:5676/", "http://localhost:5676", "/v3/discovery:monitoringassignments")
|
||||
f("https://foo.bar:1234/a/b", "https://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments")
|
||||
// url with query args
|
||||
f("https://foo.bar:1234/a/b?c=d&arg2=value2", "https://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments?c=d&arg2=value2")
|
||||
// missing scheme
|
||||
f("foo.bar", "http://foo.bar", "/v3/discovery:monitoringassignments")
|
||||
f("foo.bar:1234/a/b", "http://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments")
|
||||
f("foo.bar:1234/a/b?c=d&arg2=value2", "http://foo.bar:1234", "/a/b/v3/discovery:monitoringassignments?c=d&arg2=value2")
|
||||
}
|
||||
|
||||
func Test_parseAPIResponse(t *testing.T) {
|
||||
type args struct {
|
||||
data []byte
|
||||
func TestGetAPIConfigFailure(t *testing.T) {
|
||||
f := func(server string) {
|
||||
t.Helper()
|
||||
sdc := &SDConfig{
|
||||
Server: server,
|
||||
}
|
||||
cfg, err := getAPIConfig(sdc, ".")
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
if cfg != nil {
|
||||
t.Fatalf("expecting nil cfg; got %v", cfg)
|
||||
}
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want []kumaTarget
|
||||
wantErr bool
|
||||
}{
|
||||
// empty server url
|
||||
f("")
|
||||
// invalid server url
|
||||
f(":")
|
||||
}
|
||||
|
||||
{
|
||||
name: "parse ok",
|
||||
args: args{
|
||||
data: []byte(`{
|
||||
func TestParseTargetsLabels(t *testing.T) {
|
||||
data := `{
|
||||
"version_info":"5dc9a5dd-2091-4426-a886-dfdc24fc99d7",
|
||||
"resources":[
|
||||
{
|
||||
|
@ -100,77 +81,55 @@ func Test_parseAPIResponse(t *testing.T) {
|
|||
"targets":[
|
||||
{
|
||||
"name":"app",
|
||||
"scheme":"http",
|
||||
"scheme":"https",
|
||||
"address":"127.0.0.1:5671",
|
||||
"metrics_path":"/metrics",
|
||||
"metrics_path":"/metrics/abc",
|
||||
"labels":{ "kuma_io_protocol":"http" }
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"type_url":"type.googleapis.com/kuma.observability.v1.MonitoringAssignment"
|
||||
}`),
|
||||
},
|
||||
want: []kumaTarget{
|
||||
{
|
||||
Mesh: "default",
|
||||
Service: "redis",
|
||||
DataPlane: "redis",
|
||||
Instance: "redis",
|
||||
Scheme: "http",
|
||||
Address: "127.0.0.1:5670",
|
||||
MetricsPath: "/metrics",
|
||||
Labels: map[string]string{"kuma_io_protocol": "tcp", "test": "test1"},
|
||||
},
|
||||
{
|
||||
Mesh: "default",
|
||||
Service: "app",
|
||||
DataPlane: "app",
|
||||
Instance: "app",
|
||||
Scheme: "http",
|
||||
Address: "127.0.0.1:5671",
|
||||
MetricsPath: "/metrics",
|
||||
Labels: map[string]string{"kuma_io_protocol": "http", "test": "test2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "parse err",
|
||||
args: args{data: []byte(`[]`)},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "api version err",
|
||||
args: args{
|
||||
data: []byte(`{
|
||||
"resources":[
|
||||
{
|
||||
"@type":"type.googleapis.com/kuma.observability.v2.MonitoringAssignment",
|
||||
"mesh":"default",
|
||||
"service":"redis",
|
||||
"targets":[]
|
||||
}
|
||||
],
|
||||
"type_url":"type.googleapis.com/kuma.observability.v2.MonitoringAssignment"
|
||||
}`),
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
"type_url":"type.googleapis.com/kuma.observability.v1.MonitoringAssignment",
|
||||
"nonce": "foobar"
|
||||
}`
|
||||
labelss, versionInfo, nonce, err := parseTargetsLabels([]byte(data))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
resp, err := parseDiscoveryResponse(tt.args.data)
|
||||
if tt.wantErr {
|
||||
if err == nil {
|
||||
t.Errorf("parseDiscoveryResponse() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
got := parseKumaTargets(resp)
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("parseDiscoveryResponse() got = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
expectedLabelss := []*promutils.Labels{
|
||||
promutils.NewLabelsFromMap(map[string]string{
|
||||
"__address__": "127.0.0.1:5670",
|
||||
"__meta_kuma_dataplane": "redis",
|
||||
"__meta_kuma_label_kuma_io_protocol": "tcp",
|
||||
"__meta_kuma_label_test": "test1",
|
||||
"__meta_kuma_mesh": "default",
|
||||
"__meta_kuma_service": "redis",
|
||||
"__metrics_path__": "/metrics",
|
||||
"__scheme__": "http",
|
||||
"instance": "redis",
|
||||
}),
|
||||
promutils.NewLabelsFromMap(map[string]string{
|
||||
"__address__": "127.0.0.1:5671",
|
||||
"__meta_kuma_dataplane": "app",
|
||||
"__meta_kuma_label_kuma_io_protocol": "http",
|
||||
"__meta_kuma_label_test": "test2",
|
||||
"__meta_kuma_mesh": "default",
|
||||
"__meta_kuma_service": "app",
|
||||
"__metrics_path__": "/metrics/abc",
|
||||
"__scheme__": "https",
|
||||
"instance": "app",
|
||||
}),
|
||||
}
|
||||
discoveryutils.TestEqualLabelss(t, labelss, expectedLabelss)
|
||||
|
||||
expectedVersionInfo := "5dc9a5dd-2091-4426-a886-dfdc24fc99d7"
|
||||
if versionInfo != expectedVersionInfo {
|
||||
t.Fatalf("unexpected versionInfo; got %q; want %q", versionInfo, expectedVersionInfo)
|
||||
}
|
||||
|
||||
expectedNonce := "foobar"
|
||||
if nonce != expectedNonce {
|
||||
t.Fatalf("unexpected nonce; got %q; want %q", nonce, expectedNonce)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
// SDCheckInterval defines interval for targets refresh.
|
||||
var SDCheckInterval = flag.Duration("promscrape.kumaSDCheckInterval", time.Minute, "Interval for checking for changes in kuma service discovery. "+
|
||||
var SDCheckInterval = flag.Duration("promscrape.kumaSDCheckInterval", 30*time.Second, "Interval for checking for changes in kuma service discovery. "+
|
||||
"This works only if kuma_sd_configs is configured in '-promscrape.config' file. "+
|
||||
"See https://docs.victoriametrics.com/sd_configs.html#kuma_sd_configs for details")
|
||||
|
||||
|
@ -19,22 +19,17 @@ var SDCheckInterval = flag.Duration("promscrape.kumaSDCheckInterval", time.Minut
|
|||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kuma_sd_config
|
||||
type SDConfig struct {
|
||||
Server string `yaml:"server"`
|
||||
Server string `yaml:"server"`
|
||||
|
||||
HTTPClientConfig promauth.HTTPClientConfig `yaml:",inline"`
|
||||
ProxyURL *proxy.URL `yaml:"proxy_url,omitempty"`
|
||||
ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"`
|
||||
}
|
||||
|
||||
type kumaTarget struct {
|
||||
Mesh string `json:"mesh"`
|
||||
ControlPlane string `json:"controlplane"`
|
||||
Service string `json:"service"`
|
||||
DataPlane string `json:"dataplane"`
|
||||
Instance string `json:"instance"`
|
||||
Scheme string `json:"scheme"`
|
||||
Address string `json:"address"`
|
||||
MetricsPath string `json:"metrics_path"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
// fetch_timeout isn't used, so it isn't defined.
|
||||
// FetchTimeout time.Duration `yaml:"fetch_timeout,omitempty"`
|
||||
|
||||
// refresh_interval is obtained from `-promscrape.kumaSDCheckInterval` command-line option.
|
||||
// RefreshInterval time.Duration `yaml:"refresh_interval,omitempty"`
|
||||
}
|
||||
|
||||
// GetLabels returns kuma service discovery labels according to sdc.
|
||||
|
@ -43,11 +38,9 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get API config for kuma_sd: %w", err)
|
||||
}
|
||||
targets, err := cfg.getTargets()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return kumaTargetsToLabels(targets, sdc.Server), nil
|
||||
v := cfg.labels.Load()
|
||||
pLabels := v.(*[]*promutils.Labels)
|
||||
return *pLabels, nil
|
||||
}
|
||||
|
||||
// MustStop stops further usage for sdc.
|
||||
|
@ -59,55 +52,3 @@ func (sdc *SDConfig) MustStop() {
|
|||
cfg.mustStop()
|
||||
}
|
||||
}
|
||||
|
||||
func kumaTargetsToLabels(src []kumaTarget, sourceURL string) []*promutils.Labels {
|
||||
ms := make([]*promutils.Labels, 0, len(src))
|
||||
for _, target := range src {
|
||||
m := promutils.NewLabels(8 + len(target.Labels))
|
||||
|
||||
m.Add("instance", target.Instance)
|
||||
m.Add("__address__", target.Address)
|
||||
m.Add("__scheme__", target.Scheme)
|
||||
m.Add("__metrics_path__", target.MetricsPath)
|
||||
m.Add("__meta_server", sourceURL)
|
||||
m.Add("__meta_kuma_mesh", target.Mesh)
|
||||
m.Add("__meta_kuma_service", target.Service)
|
||||
m.Add("__meta_kuma_dataplane", target.DataPlane)
|
||||
for k, v := range target.Labels {
|
||||
m.Add("__meta_kuma_label_"+k, v)
|
||||
}
|
||||
|
||||
m.RemoveDuplicates()
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func parseKumaTargets(response discoveryResponse) []kumaTarget {
|
||||
result := make([]kumaTarget, 0, len(response.Resources))
|
||||
|
||||
for _, resource := range response.Resources {
|
||||
for _, target := range resource.Targets {
|
||||
labels := make(map[string]string)
|
||||
for label, value := range resource.Labels {
|
||||
labels[label] = value
|
||||
}
|
||||
for label, value := range target.Labels {
|
||||
labels[label] = value
|
||||
}
|
||||
result = append(result, kumaTarget{
|
||||
Mesh: resource.Mesh,
|
||||
ControlPlane: response.ControlPlane.Identifier,
|
||||
Service: resource.Service,
|
||||
DataPlane: target.Name,
|
||||
Instance: target.Name,
|
||||
Scheme: target.Scheme,
|
||||
Address: target.Address,
|
||||
MetricsPath: target.MetricsPath,
|
||||
Labels: labels,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
package kuma
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
)
|
||||
|
||||
func Test_kumaTargetsToLabels(t *testing.T) {
|
||||
type args struct {
|
||||
src []kumaTarget
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want []*promutils.Labels
|
||||
}{
|
||||
{
|
||||
name: "convert to labels ok",
|
||||
args: args{
|
||||
src: []kumaTarget{
|
||||
{
|
||||
Mesh: "default",
|
||||
Service: "redis",
|
||||
DataPlane: "redis",
|
||||
Instance: "redis",
|
||||
Scheme: "http",
|
||||
Address: "127.0.0.1:5670",
|
||||
MetricsPath: "/metrics",
|
||||
Labels: map[string]string{"kuma_io_protocol": "tcp", "kuma_io_service": "redis"},
|
||||
},
|
||||
{
|
||||
Mesh: "default",
|
||||
Service: "app",
|
||||
DataPlane: "app",
|
||||
Instance: "app",
|
||||
Scheme: "http",
|
||||
Address: "127.0.0.1:5671",
|
||||
MetricsPath: "/vm/metrics",
|
||||
Labels: map[string]string{"kuma_io_protocol": "http", "kuma_io_service": "app"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: []*promutils.Labels{
|
||||
promutils.NewLabelsFromMap(map[string]string{
|
||||
"instance": "redis",
|
||||
"__address__": "127.0.0.1:5670",
|
||||
"__scheme__": "http",
|
||||
"__metrics_path__": "/metrics",
|
||||
"__meta_server": "http://localhost:5676",
|
||||
"__meta_kuma_mesh": "default",
|
||||
"__meta_kuma_service": "redis",
|
||||
"__meta_kuma_dataplane": "redis",
|
||||
"__meta_kuma_label_kuma_io_protocol": "tcp",
|
||||
"__meta_kuma_label_kuma_io_service": "redis",
|
||||
}),
|
||||
promutils.NewLabelsFromMap(map[string]string{
|
||||
"instance": "app",
|
||||
"__address__": "127.0.0.1:5671",
|
||||
"__scheme__": "http",
|
||||
"__metrics_path__": "/vm/metrics",
|
||||
"__meta_server": "http://localhost:5676",
|
||||
"__meta_kuma_mesh": "default",
|
||||
"__meta_kuma_service": "app",
|
||||
"__meta_kuma_dataplane": "app",
|
||||
"__meta_kuma_label_kuma_io_protocol": "http",
|
||||
"__meta_kuma_label_kuma_io_service": "app",
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := kumaTargetsToLabels(tt.args.src, "http://localhost:5676")
|
||||
discoveryutils.TestEqualLabelss(t, got, tt.want)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
package kuma
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// discoveryRequest represent xDS-requests for Kuma Service Mesh
|
||||
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryrequest
|
||||
type discoveryRequest struct {
|
||||
VersionInfo string `json:"version_info,omitempty"`
|
||||
Node discoveryRequestNode `json:"node,omitempty"`
|
||||
ResourceNames []string `json:"resource_names,omitempty"`
|
||||
TypeUrl string `json:"type_url,omitempty"`
|
||||
ResponseNonce string `json:"response_nonce,omitempty"`
|
||||
}
|
||||
|
||||
type discoveryRequestNode struct {
|
||||
Id string `json:"id,omitempty"`
|
||||
}
|
||||
|
||||
// discoveryResponse represent xDS-requests for Kuma Service Mesh
|
||||
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#envoy-v3-api-msg-service-discovery-v3-discoveryresponse
|
||||
type discoveryResponse struct {
|
||||
VersionInfo string `json:"version_info,omitempty"`
|
||||
Resources []struct {
|
||||
Mesh string `json:"mesh,omitempty"`
|
||||
Service string `json:"service,omitempty"`
|
||||
Targets []struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Scheme string `json:"scheme,omitempty"`
|
||||
Address string `json:"address,omitempty"`
|
||||
MetricsPath string `json:"metrics_path,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
} `json:"targets,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
} `json:"resources,omitempty"`
|
||||
TypeUrl string `json:"type_url,omitempty"`
|
||||
Nonce string `json:"nonce,omitempty"`
|
||||
ControlPlane struct {
|
||||
Identifier string `json:"identifier,omitempty"`
|
||||
} `json:"control_plane,omitempty"`
|
||||
}
|
||||
|
||||
func parseDiscoveryResponse(data []byte) (discoveryResponse, error) {
|
||||
response := discoveryResponse{}
|
||||
err := json.Unmarshal(data, &response)
|
||||
if err != nil {
|
||||
return discoveryResponse{}, fmt.Errorf("cannot parse kuma_sd api response, err: %w", err)
|
||||
}
|
||||
if response.TypeUrl != xdsResourceTypeUrl {
|
||||
return discoveryResponse{}, fmt.Errorf("unexpected type_url in kuma_sd api response, expected: %s, got: %s", xdsResourceTypeUrl, response.TypeUrl)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
|
@ -42,8 +42,11 @@ const (
|
|||
DefaultClientReadTimeout = time.Minute
|
||||
)
|
||||
|
||||
type RequestCallback func(request *http.Request)
|
||||
type ResponseCallback func(response *http.Response)
|
||||
// RequestCallback is called on the request before sending the request to the server.
|
||||
type RequestCallback func(req *http.Request)
|
||||
|
||||
// ResponseCallback is called on the response before validating and returning the response to the caller.
|
||||
type ResponseCallback func(resp *http.Response)
|
||||
|
||||
func concurrencyLimitChInit() {
|
||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrency)
|
||||
|
|
Loading…
Reference in a new issue