VictoriaMetrics/lib/promscrape/discovery/kubernetes/api_watcher.go
Aliaksandr Valialkin e08287f017 lib/promscrape: reload auth tokens from files every second
Previously auth tokens were loaded at startup and couldn't be updated without vmagent restart.
Now there is no need in vmagent restart.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1297
2021-05-14 20:03:35 +03:00

759 lines
22 KiB
Go

package kubernetes
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/metrics"
)
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kuberntes API server")
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
type WatchEvent struct {
Type string
Object json.RawMessage
}
// object is any Kubernetes object.
type object interface {
key() string
// getTargetLabels must be called under gw.mu lock.
getTargetLabels(gw *groupWatcher) []map[string]string
}
// parseObjectFunc must parse object from the given data.
type parseObjectFunc func(data []byte) (object, error)
// parseObjectListFunc must parse objectList from the given r.
type parseObjectListFunc func(r io.Reader) (map[string]object, ListMeta, error)
// apiWatcher is used for watching for Kuberntes object changes and caching their latest states.
type apiWatcher struct {
role string
// Constructor for creating ScrapeWork objects from labels
swcFunc ScrapeWorkConstructorFunc
gw *groupWatcher
// swos contains per-namepsace maps of ScrapeWork objects for the given apiWatcher
swosByNamespace map[string]map[string][]interface{}
swosByNamespaceLock sync.Mutex
swosCount *metrics.Counter
}
func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
namespaces := sdc.Namespaces.Names
selectors := sdc.Selectors
proxyURL := sdc.ProxyURL.URL()
gw := getGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
return &apiWatcher{
role: sdc.Role,
swcFunc: swcFunc,
gw: gw,
swosByNamespace: make(map[string]map[string][]interface{}),
swosCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_scrape_works{role=%q}`, sdc.Role)),
}
}
func (aw *apiWatcher) mustStart() {
aw.gw.startWatchersForRole(aw.role, aw)
}
func (aw *apiWatcher) mustStop() {
aw.gw.unsubscribeAPIWatcher(aw)
aw.swosByNamespaceLock.Lock()
aw.swosByNamespace = make(map[string]map[string][]interface{})
aw.swosByNamespaceLock.Unlock()
}
func (aw *apiWatcher) reloadScrapeWorks(namespace string, swosByKey map[string][]interface{}) {
aw.swosByNamespaceLock.Lock()
aw.swosCount.Add(len(swosByKey) - len(aw.swosByNamespace[namespace]))
aw.swosByNamespace[namespace] = swosByKey
aw.swosByNamespaceLock.Unlock()
}
func (aw *apiWatcher) setScrapeWorks(namespace, key string, labels []map[string]string) {
swos := aw.getScrapeWorkObjectsForLabels(labels)
aw.swosByNamespaceLock.Lock()
swosByKey := aw.swosByNamespace[namespace]
if swosByKey == nil {
swosByKey = make(map[string][]interface{})
aw.swosByNamespace[namespace] = swosByKey
}
if len(swos) > 0 {
aw.swosCount.Add(len(swos) - len(swosByKey[key]))
swosByKey[key] = swos
} else {
aw.swosCount.Add(-len(swosByKey[key]))
delete(swosByKey, key)
}
aw.swosByNamespaceLock.Unlock()
}
func (aw *apiWatcher) removeScrapeWorks(namespace, key string) {
aw.swosByNamespaceLock.Lock()
swosByKey := aw.swosByNamespace[namespace]
if len(swosByKey) > 0 {
aw.swosCount.Add(-len(swosByKey[key]))
delete(swosByKey, key)
}
aw.swosByNamespaceLock.Unlock()
}
func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} {
swos := make([]interface{}, 0, len(labelss))
for _, labels := range labelss {
swo := aw.swcFunc(labels)
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if swo != nil && !reflect.ValueOf(swo).IsNil() {
swos = append(swos, swo)
}
}
return swos
}
// getScrapeWorkObjects returns all the ScrapeWork objects for the given aw.
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
aw.gw.registerPendingAPIWatchers()
aw.swosByNamespaceLock.Lock()
defer aw.swosByNamespaceLock.Unlock()
size := 0
for _, swosByKey := range aw.swosByNamespace {
for _, swosLocal := range swosByKey {
size += len(swosLocal)
}
}
swos := make([]interface{}, 0, size)
for _, swosByKey := range aw.swosByNamespace {
for _, swosLocal := range swosByKey {
swos = append(swos, swosLocal...)
}
}
return swos
}
// groupWatcher watches for Kubernetes objects on the given apiServer with the given namespaces,
// selectors using the given client.
type groupWatcher struct {
apiServer string
namespaces []string
selectors []Selector
getAuthHeader func() string
client *http.Client
mu sync.Mutex
m map[string]*urlWatcher
}
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
var proxy func(*http.Request) (*url.URL, error)
if proxyURL != nil {
proxy = http.ProxyURL(proxyURL)
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: ac.NewTLSConfig(),
Proxy: proxy,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: *apiServerTimeout,
MaxIdleConnsPerHost: 100,
},
Timeout: *apiServerTimeout,
}
return &groupWatcher{
apiServer: apiServer,
getAuthHeader: ac.GetAuthHeader,
namespaces: namespaces,
selectors: selectors,
client: client,
m: make(map[string]*urlWatcher),
}
}
func getGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, proxyURL *url.URL) *groupWatcher {
proxyURLStr := "<nil>"
if proxyURL != nil {
proxyURLStr = proxyURL.String()
}
key := fmt.Sprintf("apiServer=%s, namespaces=%s, selectors=%s, proxyURL=%s, authConfig=%s",
apiServer, namespaces, selectorsKey(selectors), proxyURLStr, ac.String())
groupWatchersLock.Lock()
gw := groupWatchers[key]
if gw == nil {
gw = newGroupWatcher(apiServer, ac, namespaces, selectors, proxyURL)
groupWatchers[key] = gw
}
groupWatchersLock.Unlock()
return gw
}
func selectorsKey(selectors []Selector) string {
var sb strings.Builder
for _, s := range selectors {
fmt.Fprintf(&sb, "{role=%q, label=%q, field=%q}", s.Role, s.Label, s.Field)
}
return sb.String()
}
var (
groupWatchersLock sync.Mutex
groupWatchers = make(map[string]*groupWatcher)
_ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 {
groupWatchersLock.Lock()
n := len(groupWatchers)
groupWatchersLock.Unlock()
return float64(n)
})
)
func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object {
if gw == nil {
// this is needed for testing
return nil
}
key := namespace + "/" + name
for _, uw := range gw.m {
if uw.role != role {
// Role mismatch
continue
}
if uw.namespace != "" && uw.namespace != namespace {
// Namespace mismatch
continue
}
if o := uw.objectsByKey[key]; o != nil {
return o
}
}
return nil
}
func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
if role == "endpoints" || role == "endpointslices" {
// endpoints and endpointslices watchers query pod and service objects. So start watchers for these roles as well.
gw.startWatchersForRole("pod", nil)
gw.startWatchersForRole("service", nil)
}
paths, namespaces := getAPIPathsWithNamespaces(role, gw.namespaces, gw.selectors)
for i, path := range paths {
apiURL := gw.apiServer + path
gw.mu.Lock()
uw := gw.m[apiURL]
needStart := uw == nil
if needStart {
uw = newURLWatcher(role, namespaces[i], apiURL, gw)
gw.m[apiURL] = uw
}
if aw != nil {
uw.subscribeAPIWatcherLocked(aw)
}
gw.mu.Unlock()
if needStart {
uw.reloadObjects()
go uw.watchForUpdates()
if role == "endpoints" || role == "endpointslices" {
// Refresh endpoints and enpointslices targets in background, since they depend on other object types such as pod and service.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
go func() {
for {
time.Sleep(5 * time.Second)
gw.mu.Lock()
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
gw.mu.Unlock()
}
}()
}
}
}
}
// doRequest performs http request to the given requestURL.
func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
if ah := gw.getAuthHeader(); ah != "" {
req.Header.Set("Authorization", ah)
}
return gw.client.Do(req)
}
func (gw *groupWatcher) registerPendingAPIWatchers() {
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
uw.registerPendingAPIWatchersLocked()
}
}
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
uw.unsubscribeAPIWatcherLocked(aw)
}
}
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
//
// urlWatcher fields must be accessed under gw.mu lock.
type urlWatcher struct {
role string
namespace string
apiURL string
gw *groupWatcher
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// awsPending contains pending apiWatcher objects, which are registered in a batch.
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
// shared among big number of scrape jobs, since per-object labels are generated only once
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
// See reloadScrapeWorksForAPIWatchersLocked for details.
awsPending map[*apiWatcher]struct{}
// aws contains registered apiWatcher objects
aws map[*apiWatcher]struct{}
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
resourceVersion string
objectsCount *metrics.Counter
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsUpdated *metrics.Counter
staleResourceVersions *metrics.Counter
}
func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
uw := &urlWatcher{
role: role,
namespace: namespace,
apiURL: apiURL,
gw: gw,
parseObject: parseObject,
parseObjectList: parseObjectList,
awsPending: make(map[*apiWatcher]struct{}),
aws: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)),
}
logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
return uw
}
func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) {
if _, ok := uw.aws[aw]; !ok {
if _, ok := uw.awsPending[aw]; !ok {
uw.awsPending[aw] = struct{}{}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc()
}
}
}
func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
if len(uw.awsPending) == 0 {
return
}
awsPending := make([]*apiWatcher, 0, len(uw.awsPending))
for aw := range uw.awsPending {
awsPending = append(awsPending, aw)
uw.aws[aw] = struct{}{}
}
uw.reloadScrapeWorksForAPIWatchersLocked(uw.awsPending)
uw.awsPending = make(map[*apiWatcher]struct{})
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending))
}
func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) {
if _, ok := uw.awsPending[aw]; ok {
delete(uw.awsPending, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec()
}
if _, ok := uw.aws[aw]; ok {
delete(uw.aws, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec()
}
}
func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.gw.mu.Lock()
uw.resourceVersion = resourceVersion
uw.gw.mu.Unlock()
}
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func (uw *urlWatcher) reloadObjects() string {
uw.gw.mu.Lock()
resourceVersion := uw.resourceVersion
uw.gw.mu.Unlock()
if resourceVersion != "" {
// Fast path - there is no need in reloading the objects.
return resourceVersion
}
requestURL := uw.apiURL
resp, err := uw.gw.doRequest(requestURL)
if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
return ""
}
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
return ""
}
objectsByKey, metadata, err := uw.parseObjectList(resp.Body)
_ = resp.Body.Close()
if err != nil {
logger.Errorf("cannot parse objects from %q: %s", requestURL, err)
return ""
}
uw.gw.mu.Lock()
var updated, removed, added int
for key := range uw.objectsByKey {
if o, ok := objectsByKey[key]; ok {
uw.objectsByKey[key] = o
updated++
} else {
delete(uw.objectsByKey, key)
removed++
}
}
for key, o := range objectsByKey {
if _, ok := uw.objectsByKey[key]; !ok {
uw.objectsByKey[key] = o
added++
}
}
uw.objectsUpdated.Add(updated)
uw.objectsRemoved.Add(removed)
uw.objectsAdded.Add(added)
uw.objectsCount.Add(added - removed)
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
uw.resourceVersion = metadata.ResourceVersion
uw.gw.mu.Unlock()
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
return uw.resourceVersion
}
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatcher]struct{}) {
if len(awsMap) == 0 {
return
}
aws := make([]*apiWatcher, 0, len(awsMap))
for aw := range awsMap {
aws = append(aws, aw)
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
for key, o := range uw.objectsByKey {
labels := o.getTargetLabels(uw.gw)
for i, aw := range aws {
swos := aw.getScrapeWorkObjectsForLabels(labels)
if len(swos) > 0 {
swosByKey[i][key] = swos
}
}
}
for i, aw := range aws {
aw.reloadScrapeWorks(uw.namespace, swosByKey[i])
}
}
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates() {
backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second
backoffSleep := func() {
time.Sleep(backoffDelay)
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
apiURL := uw.apiURL
delimiter := "?"
if strings.Contains(apiURL, "?") {
delimiter = "&"
}
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
resourceVersion := uw.reloadObjects()
if resourceVersion == "" {
backoffSleep()
continue
}
requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion)
resp, err := uw.gw.doRequest(requestURL)
if err != nil {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
backoffSleep()
continue
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == 410 {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time.Second
uw.staleResourceVersions.Inc()
uw.setResourceVersion("")
} else {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
backoffSleep()
}
continue
}
backoffDelay = time.Second
err = uw.readObjectUpdateStream(resp.Body)
_ = resp.Body.Close()
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
}
backoffSleep()
continue
}
}
}
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
d := json.NewDecoder(r)
var we WatchEvent
for {
if err := d.Decode(&we); err != nil {
return err
}
switch we.Type {
case "ADDED", "MODIFIED":
o, err := uw.parseObject(we.Object)
if err != nil {
return err
}
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; !ok {
uw.objectsCount.Inc()
uw.objectsAdded.Inc()
} else {
uw.objectsUpdated.Inc()
}
uw.objectsByKey[key] = o
if len(uw.aws) > 0 {
labels := o.getTargetLabels(uw.gw)
for aw := range uw.aws {
aw.setScrapeWorks(uw.namespace, key, labels)
}
}
uw.gw.mu.Unlock()
case "DELETED":
o, err := uw.parseObject(we.Object)
if err != nil {
return err
}
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; ok {
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
}
for aw := range uw.aws {
aw.removeScrapeWorks(uw.namespace, key)
}
uw.gw.mu.Unlock()
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
bm, err := parseBookmark(we.Object)
if err != nil {
return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err)
}
uw.setResourceVersion(bm.Metadata.ResourceVersion)
case "ERROR":
em, err := parseError(we.Object)
if err != nil {
return fmt.Errorf("cannot parse error message from %q: %w", we.Object, err)
}
if em.Code == 410 {
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
uw.staleResourceVersions.Inc()
uw.setResourceVersion("")
return nil
}
return fmt.Errorf("unexpected error message: %q", we.Object)
default:
return fmt.Errorf("unexpected WatchEvent type %q: %q", we.Type, we.Object)
}
}
}
// Bookmark is a bookmark message from Kubernetes Watch API.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct {
Metadata struct {
ResourceVersion string
}
}
func parseBookmark(data []byte) (*Bookmark, error) {
var bm Bookmark
if err := json.Unmarshal(data, &bm); err != nil {
return nil, err
}
return &bm, nil
}
// Error is an error message from Kubernetes Watch API.
type Error struct {
Code int
}
func parseError(data []byte) (*Error, error) {
var em Error
if err := json.Unmarshal(data, &em); err != nil {
return nil, err
}
return &em, nil
}
func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) ([]string, []string) {
objectType := getObjectTypeByRole(role)
if objectType == "nodes" || len(namespaces) == 0 {
query := joinSelectors(role, selectors)
path := getAPIPath(objectType, "", query)
return []string{path}, []string{""}
}
query := joinSelectors(role, selectors)
paths := make([]string, len(namespaces))
for i, namespace := range namespaces {
paths[i] = getAPIPath(objectType, namespace, query)
}
return paths, namespaces
}
func getAPIPath(objectType, namespace, query string) string {
suffix := objectType
if namespace != "" {
suffix = "namespaces/" + namespace + "/" + objectType
}
if len(query) > 0 {
suffix += "?" + query
}
if objectType == "ingresses" {
return "/apis/networking.k8s.io/v1beta1/" + suffix
}
if objectType == "endpointslices" {
return "/apis/discovery.k8s.io/v1beta1/" + suffix
}
return "/api/v1/" + suffix
}
func joinSelectors(role string, selectors []Selector) string {
var labelSelectors, fieldSelectors []string
for _, s := range selectors {
if s.Role != role {
continue
}
if s.Label != "" {
labelSelectors = append(labelSelectors, s.Label)
}
if s.Field != "" {
fieldSelectors = append(fieldSelectors, s.Field)
}
}
var args []string
if len(labelSelectors) > 0 {
args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ",")))
}
if len(fieldSelectors) > 0 {
args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ",")))
}
return strings.Join(args, "&")
}
func getObjectTypeByRole(role string) string {
switch role {
case "node":
return "nodes"
case "pod":
return "pods"
case "service":
return "services"
case "endpoints":
return "endpoints"
case "endpointslices":
return "endpointslices"
case "ingress":
return "ingresses"
default:
logger.Panicf("BUG: unknonw role=%q", role)
return ""
}
}
func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) {
switch role {
case "node":
return parseNode, parseNodeList
case "pod":
return parsePod, parsePodList
case "service":
return parseService, parseServiceList
case "endpoints":
return parseEndpoints, parseEndpointsList
case "endpointslices":
return parseEndpointSlice, parseEndpointSliceList
case "ingress":
return parseIngress, parseIngressList
default:
logger.Panicf("BUG: unsupported role=%q", role)
return nil, nil
}
}