VictoriaMetrics/lib/promscrape/discovery/kubernetes/api_watcher.go
Aliaksandr Valialkin ab4f090c63 lib/promscrape/discovery/kubernetes: reduce memory usage further when big number of scrape jobs are configured for the same kubernetes_sd_config role
Serialize reloading per-role objects, so they don't occupy too much memory when objects for many scrape jobs are simultaneously refreshed.
Do not reload per-role objects if they were already refreshed by concurrent goroutines. This should reduce load on Kubernetes API server
when big number of scrape jobs are configured for the same Kubernetes role.

This is a follow-up for 17b87725ed

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113
2021-03-07 19:51:03 +02:00

625 lines
16 KiB
Go

package kubernetes
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
type WatchEvent struct {
Type string
Object json.RawMessage
}
// object is any Kubernetes object.
type object interface {
key() string
getTargetLabels(aw *apiWatcher) []map[string]string
}
// parseObjectFunc must parse object from the given data.
type parseObjectFunc func(data []byte) (object, error)
// parseObjectListFunc must parse objectList from the given data.
type parseObjectListFunc func(data []byte) (map[string]object, ListMeta, error)
// apiWatcher is used for watching for Kuberntes object changes and caching their latest states.
type apiWatcher struct {
// The client used for watching for object changes
client *http.Client
// Kubenetes API server address in the form http://api-server
apiServer string
// The contents for `Authorization` HTTP request header
authorization string
// Namespaces to watch
namespaces []string
// Selectors to apply during watch
selectors []Selector
// Constructor for creating ScrapeWork objects from labels.
swcFunc ScrapeWorkConstructorFunc
// mu protects watchersByURL
mu sync.Mutex
// a map of watchers keyed by request urls
watchersByURL map[string]*urlWatcher
stopFunc func()
stopCtx context.Context
wg sync.WaitGroup
}
func (aw *apiWatcher) mustStop() {
aw.stopFunc()
aw.wg.Wait()
}
func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher {
stopCtx, stopFunc := context.WithCancel(context.Background())
return &apiWatcher{
apiServer: apiServer,
authorization: authorization,
client: client,
namespaces: namespaces,
selectors: selectors,
swcFunc: swcFunc,
watchersByURL: make(map[string]*urlWatcher),
stopFunc: stopFunc,
stopCtx: stopCtx,
}
}
// getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role.
func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} {
aw.startWatchersForRole(role)
var swos []interface{}
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
for _, swosLocal := range uw.swosByKey {
swos = append(swos, swosLocal...)
}
uw.mu.Unlock()
}
aw.mu.Unlock()
return swos
}
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
if aw == nil {
return nil
}
key := namespace + "/" + name
aw.startWatchersForRole(role)
var o object
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
o = uw.objectsByKey.get(key)
if o != nil {
break
}
}
aw.mu.Unlock()
return o
}
func (aw *apiWatcher) startWatchersForRole(role string) {
parseObject, parseObjectList := getObjectParsersForRole(role)
paths := getAPIPaths(role, aw.namespaces, aw.selectors)
for _, path := range paths {
apiURL := aw.apiServer + path
aw.startWatcherForURL(role, apiURL, parseObject, parseObjectList)
}
}
func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) {
aw.mu.Lock()
if aw.watchersByURL[apiURL] != nil {
// Watcher for the given path already exists.
aw.mu.Unlock()
return
}
uw := aw.newURLWatcher(role, apiURL, parseObject, parseObjectList)
aw.watchersByURL[apiURL] = uw
aw.mu.Unlock()
uw.watchersCount.Inc()
uw.watchersCreated.Inc()
uw.reloadObjects()
aw.wg.Add(1)
go func() {
defer aw.wg.Done()
logger.Infof("started watcher for %q", apiURL)
uw.watchForUpdates()
logger.Infof("stopped watcher for %q", apiURL)
uw.objectsByKey.decRef()
aw.mu.Lock()
delete(aw.watchersByURL, apiURL)
aw.mu.Unlock()
uw.watchersCount.Dec()
uw.watchersStopped.Inc()
}()
}
// needStop returns true if aw must be stopped.
func (aw *apiWatcher) needStop() bool {
select {
case <-aw.stopCtx.Done():
return true
default:
return false
}
}
// doRequest performs http request to the given requestURL.
func (aw *apiWatcher) doRequest(requestURL string) (*http.Response, error) {
req, err := http.NewRequestWithContext(aw.stopCtx, "GET", requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
if aw.authorization != "" {
req.Header.Set("Authorization", aw.authorization)
}
return aw.client.Do(req)
}
// urlWatcher watches for an apiURL and updates object states in objectsByKey.
type urlWatcher struct {
role string
apiURL string
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey *objectsMap
// mu protects swosByKey and resourceVersion
mu sync.Mutex
swosByKey map[string][]interface{}
resourceVersion string
// the parent apiWatcher
aw *apiWatcher
watchersCount *metrics.Counter
watchersCreated *metrics.Counter
watchersStopped *metrics.Counter
}
func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher {
return &urlWatcher{
role: role,
apiURL: apiURL,
parseObject: parseObject,
parseObjectList: parseObjectList,
objectsByKey: sharedObjectsGlobal.getByAPIURL(role, apiURL),
swosByKey: make(map[string][]interface{}),
aw: aw,
watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)),
watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)),
watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)),
}
}
// Limit the concurrency for per-role objects reloading to 1.
//
// This should reduce memory usage when big number of watchers simultaneously receive an update for objects of the same role.
var reloadObjectsLocksByRole = map[string]*sync.Mutex{
"node": {},
"pod": {},
"service": {},
"endpoints": {},
"endpointslices": {},
"ingress": {},
}
func (uw *urlWatcher) resetResourceVersion() {
uw.mu.Lock()
uw.resourceVersion = ""
uw.mu.Unlock()
}
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func (uw *urlWatcher) reloadObjects() string {
lock := reloadObjectsLocksByRole[uw.role]
lock.Lock()
defer lock.Unlock()
uw.mu.Lock()
resourceVersion := uw.resourceVersion
uw.mu.Unlock()
if resourceVersion != "" {
// Fast path - objects have been already reloaded by concurrent goroutines.
return resourceVersion
}
aw := uw.aw
requestURL := uw.apiURL
resp, err := aw.doRequest(requestURL)
if err != nil {
if !aw.needStop() {
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
}
return ""
}
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
return ""
}
objectsByKey, metadata, err := uw.parseObjectList(body)
if err != nil {
if !aw.needStop() {
logger.Errorf("cannot parse response from %q: %s", requestURL, err)
}
return ""
}
uw.objectsByKey.reload(objectsByKey)
swosByKey := make(map[string][]interface{})
for k, o := range objectsByKey {
labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKey[k] = swos
}
}
uw.mu.Lock()
uw.swosByKey = swosByKey
uw.resourceVersion = metadata.ResourceVersion
uw.mu.Unlock()
return metadata.ResourceVersion
}
func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} {
swos := make([]interface{}, 0, len(labelss))
for _, labels := range labelss {
swo := swcFunc(labels)
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if swo != nil && !reflect.ValueOf(swo).IsNil() {
swos = append(swos, swo)
}
}
return swos
}
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates() {
aw := uw.aw
backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second
backoffSleep := func() {
time.Sleep(backoffDelay)
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
apiURL := uw.apiURL
delimiter := "?"
if strings.Contains(apiURL, "?") {
delimiter = "&"
}
timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
if aw.needStop() {
return
}
resourceVersion := uw.reloadObjects()
requestURL := apiURL
if resourceVersion != "" {
requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion)
}
resp, err := aw.doRequest(requestURL)
if err != nil {
if aw.needStop() {
return
}
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
backoffSleep()
uw.resetResourceVersion()
continue
}
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
if resp.StatusCode == 410 {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time.Second
} else {
backoffSleep()
}
uw.resetResourceVersion()
continue
}
backoffDelay = time.Second
err = uw.readObjectUpdateStream(resp.Body)
_ = resp.Body.Close()
if err != nil {
if aw.needStop() {
return
}
if !errors.Is(err, io.EOF) {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
}
backoffSleep()
uw.resetResourceVersion()
continue
}
}
}
// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
aw := uw.aw
d := json.NewDecoder(r)
var we WatchEvent
for {
if err := d.Decode(&we); err != nil {
return err
}
o, err := uw.parseObject(we.Object)
if err != nil {
return err
}
key := o.key()
switch we.Type {
case "ADDED", "MODIFIED":
uw.objectsByKey.update(key, o)
labels := o.getTargetLabels(aw)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
uw.mu.Lock()
if len(swos) > 0 {
uw.swosByKey[key] = swos
} else {
delete(uw.swosByKey, key)
}
uw.mu.Unlock()
case "DELETED":
uw.objectsByKey.remove(key)
uw.mu.Lock()
delete(uw.swosByKey, key)
uw.mu.Unlock()
default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)
}
}
}
func getAPIPaths(role string, namespaces []string, selectors []Selector) []string {
objectName := getObjectNameByRole(role)
if objectName == "nodes" || len(namespaces) == 0 {
query := joinSelectors(role, selectors)
path := getAPIPath(objectName, "", query)
return []string{path}
}
query := joinSelectors(role, selectors)
paths := make([]string, len(namespaces))
for i, namespace := range namespaces {
paths[i] = getAPIPath(objectName, namespace, query)
}
return paths
}
func getAPIPath(objectName, namespace, query string) string {
suffix := objectName
if namespace != "" {
suffix = "namespaces/" + namespace + "/" + objectName
}
if len(query) > 0 {
suffix += "?" + query
}
if objectName == "ingresses" {
return "/apis/networking.k8s.io/v1beta1/" + suffix
}
if objectName == "endpointslices" {
return "/apis/discovery.k8s.io/v1beta1/" + suffix
}
return "/api/v1/" + suffix
}
func joinSelectors(role string, selectors []Selector) string {
var labelSelectors, fieldSelectors []string
for _, s := range selectors {
if s.Role != role {
continue
}
if s.Label != "" {
labelSelectors = append(labelSelectors, s.Label)
}
if s.Field != "" {
fieldSelectors = append(fieldSelectors, s.Field)
}
}
var args []string
if len(labelSelectors) > 0 {
args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ",")))
}
if len(fieldSelectors) > 0 {
args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ",")))
}
return strings.Join(args, "&")
}
func getObjectNameByRole(role string) string {
switch role {
case "node":
return "nodes"
case "pod":
return "pods"
case "service":
return "services"
case "endpoints":
return "endpoints"
case "endpointslices":
return "endpointslices"
case "ingress":
return "ingresses"
default:
logger.Panicf("BUG: unknonw role=%q", role)
return ""
}
}
func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) {
switch role {
case "node":
return parseNode, parseNodeList
case "pod":
return parsePod, parsePodList
case "service":
return parseService, parseServiceList
case "endpoints":
return parseEndpoints, parseEndpointsList
case "endpointslices":
return parseEndpointSlice, parseEndpointSliceList
case "ingress":
return parseIngress, parseIngressList
default:
logger.Panicf("BUG: unsupported role=%q", role)
return nil, nil
}
}
type objectsMap struct {
mu sync.Mutex
refCount int
m map[string]object
objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter
objectsCount *metrics.Counter
}
func (om *objectsMap) incRef() {
om.mu.Lock()
om.refCount++
om.mu.Unlock()
}
func (om *objectsMap) decRef() {
om.mu.Lock()
om.refCount--
if om.refCount < 0 {
logger.Panicf("BUG: refCount cannot be smaller than 0; got %d", om.refCount)
}
if om.refCount == 0 {
// Free up memory occupied by om.m
om.objectsRemoved.Add(len(om.m))
om.objectsCount.Add(-len(om.m))
om.m = make(map[string]object)
}
om.mu.Unlock()
}
func (om *objectsMap) reload(m map[string]object) {
om.mu.Lock()
om.objectsAdded.Add(len(m))
om.objectsRemoved.Add(len(om.m))
om.objectsCount.Add(len(m) - len(om.m))
for k := range om.m {
delete(om.m, k)
}
for k, o := range m {
om.m[k] = o
}
om.mu.Unlock()
}
func (om *objectsMap) update(key string, o object) {
om.mu.Lock()
if om.m[key] == nil {
om.objectsAdded.Inc()
om.objectsCount.Inc()
}
om.m[key] = o
om.mu.Unlock()
}
func (om *objectsMap) remove(key string) {
om.mu.Lock()
if om.m[key] != nil {
om.objectsRemoved.Inc()
om.objectsCount.Dec()
delete(om.m, key)
}
om.mu.Unlock()
}
func (om *objectsMap) get(key string) object {
om.mu.Lock()
o, ok := om.m[key]
om.mu.Unlock()
if !ok {
return nil
}
return o
}
type sharedObjects struct {
mu sync.Mutex
oms map[string]*objectsMap
}
func (so *sharedObjects) getByAPIURL(role, apiURL string) *objectsMap {
so.mu.Lock()
om := so.oms[apiURL]
if om == nil {
om = &objectsMap{
m: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
}
so.oms[apiURL] = om
}
so.mu.Unlock()
om.incRef()
return om
}
var sharedObjectsGlobal = &sharedObjects{
oms: make(map[string]*objectsMap),
}