lib/promscrape/discovery/kubernetes: cache target labels

This should reduce CPU usage on repeated SDConfig.GetLabels() calls.
This commit is contained in:
Aliaksandr Valialkin 2021-02-26 20:21:27 +02:00
parent 22822feea3
commit efcdf613c2
14 changed files with 58 additions and 184 deletions

View file

@ -109,6 +109,7 @@ type WatchEvent struct {
// object is any Kubernetes object.
type object interface {
key() string
getTargetLabels(aw *apiWatcher) []map[string]string
}
// parseObjectFunc must parse object from the given data.
@ -159,6 +160,23 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa
}
}
// getLabelsForRole returns all the sets of labels for the given role.
func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string {
var ms []map[string]string
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
for _, labels := range uw.labelsByKey {
ms = append(ms, labels...)
}
uw.mu.Unlock()
}
return ms
}
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
if aw == nil {
@ -184,26 +202,6 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
return o
}
// getObjectsByRole returns all the objects for the given role.
func (aw *apiWatcher) getObjectsByRole(role string) []object {
aw.startWatchersForRole(role)
var os []object
aw.mu.Lock()
for _, uw := range aw.watchersByURL {
if uw.role != role {
continue
}
uw.mu.Lock()
for _, o := range uw.objectsByKey {
os = append(os, o)
}
uw.mu.Unlock()
}
aw.lastAccessTime = time.Now()
aw.mu.Unlock()
return os
}
func (aw *apiWatcher) startWatchersForRole(role string) {
parseObject, parseObjectList := getObjectParsersForRole(role)
paths := getAPIPaths(role, aw.namespaces, aw.selectors)
@ -258,11 +256,12 @@ type urlWatcher struct {
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
// mu protects objectsByKey
// mu protects objectsByKey and labelsByKey
mu sync.Mutex
// objectsByKey contains the latest state for objects obtained from apiURL
objectsByKey map[string]object
labelsByKey map[string][]map[string]string
// the parent apiWatcher
aw *apiWatcher
@ -277,6 +276,7 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject
parseObjectList: parseObjectList,
objectsByKey: make(map[string]object),
labelsByKey: make(map[string][]map[string]string),
aw: aw,
}
@ -394,9 +394,14 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.mu.Lock()
uw.objectsByKey[key] = o
uw.mu.Unlock()
labels := o.getTargetLabels(uw.aw)
uw.mu.Lock()
uw.labelsByKey[key] = labels
uw.mu.Unlock()
case "DELETED":
uw.mu.Lock()
delete(uw.objectsByKey, key)
delete(uw.labelsByKey, key)
uw.mu.Unlock()
default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)

View file

@ -7,25 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getEndpointsLabels returns labels for k8s endpoints obtained from the given cfg.
func getEndpointsLabels(cfg *apiConfig) []map[string]string {
epss := getEndpoints(cfg)
var ms []map[string]string
for _, eps := range epss {
ms = eps.appendTargetLabels(ms, cfg.aw)
}
return ms
}
func getEndpoints(cfg *apiConfig) []*Endpoints {
os := cfg.aw.getObjectsByRole("endpoint")
epss := make([]*Endpoints, len(os))
for i, o := range os {
epss[i] = o.(*Endpoints)
}
return epss
}
func (eps *Endpoints) key() string {
return eps.Metadata.key()
}
@ -104,15 +85,16 @@ type EndpointPort struct {
Protocol string
}
// appendTargetLabels appends labels for each endpoint in eps to ms and returns the result.
// getTargetLabels returns labels for each endpoint in eps.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
func (eps *Endpoints) appendTargetLabels(ms []map[string]string, aw *apiWatcher) []map[string]string {
func (eps *Endpoints) getTargetLabels(aw *apiWatcher) []map[string]string {
var svc *Service
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service)
}
podPortsSeen := make(map[*Pod][]int)
var ms []map[string]string
for _, ess := range eps.Subsets {
for _, epp := range ess.Ports {
ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.Addresses, epp, svc, "true")

View file

@ -88,9 +88,7 @@ func TestParseEndpointsListSuccess(t *testing.T) {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Endpoints).appendTargetLabels(nil, nil)
})
sortedLabelss := getSortedLabelss(objectsByKey)
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.17.0.2:8443",

View file

@ -8,26 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getEndpointSlicesLabels returns labels for k8s endpointSlices obtained from the given cfg.
func getEndpointSlicesLabels(cfg *apiConfig) []map[string]string {
epss := getEndpointSlices(cfg)
var ms []map[string]string
for _, eps := range epss {
ms = eps.appendTargetLabels(ms, cfg.aw)
}
return ms
}
// getEndpointSlices retrieves endpointSlice with given apiConfig
func getEndpointSlices(cfg *apiConfig) []*EndpointSlice {
os := cfg.aw.getObjectsByRole("endpointslices")
epss := make([]*EndpointSlice, len(os))
for i, o := range os {
epss[i] = o.(*EndpointSlice)
}
return epss
}
func (eps *EndpointSlice) key() string {
return eps.Metadata.key()
}
@ -52,14 +32,16 @@ func parseEndpointSlice(data []byte) (object, error) {
return &eps, nil
}
// appendTargetLabels injects labels for endPointSlice to slice map
// follows TargetRef for enrich labels with pod and service metadata
func (eps *EndpointSlice) appendTargetLabels(ms []map[string]string, aw *apiWatcher) []map[string]string {
// getTargetLabels returns labels for eps.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices
func (eps *EndpointSlice) getTargetLabels(aw *apiWatcher) []map[string]string {
var svc *Service
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service)
}
podPortsSeen := make(map[*Pod][]int)
var ms []map[string]string
for _, ess := range eps.Endpoints {
var p *Pod
if o := aw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil {

View file

@ -185,9 +185,7 @@ func TestParseEndpointSliceListSuccess(t *testing.T) {
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*EndpointSlice).appendTargetLabels(nil, nil)
})
sortedLabelss := getSortedLabelss(objectsByKey)
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.18.0.2:6443",

View file

@ -5,25 +5,6 @@ import (
"fmt"
)
// getIngressesLabels returns labels for k8s ingresses obtained from the given cfg.
func getIngressesLabels(cfg *apiConfig) []map[string]string {
igs := getIngresses(cfg)
var ms []map[string]string
for _, ig := range igs {
ms = ig.appendTargetLabels(ms)
}
return ms
}
func getIngresses(cfg *apiConfig) []*Ingress {
os := cfg.aw.getObjectsByRole("ingress")
igs := make([]*Ingress, len(os))
for i, o := range os {
igs[i] = o.(*Ingress)
}
return igs
}
func (ig *Ingress) key() string {
return ig.Metadata.key()
}
@ -101,16 +82,17 @@ type HTTPIngressPath struct {
Path string
}
// appendTargetLabels appends labels for Ingress ig to ms and returns the result.
// getTargetLabels returns labels for ig.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress
func (ig *Ingress) appendTargetLabels(ms []map[string]string) []map[string]string {
func (ig *Ingress) getTargetLabels(aw *apiWatcher) []map[string]string {
tlsHosts := make(map[string]bool)
for _, tls := range ig.Spec.TLS {
for _, host := range tls.Hosts {
tlsHosts[host] = true
}
}
var ms []map[string]string
for _, r := range ig.Spec.Rules {
paths := getIngressRulePaths(r.HTTP.Paths)
scheme := "http"

View file

@ -79,9 +79,7 @@ func TestParseIngressListSuccess(t *testing.T) {
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Ingress).appendTargetLabels(nil)
})
sortedLabelss := getSortedLabelss(objectsByKey)
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "foobar",

View file

@ -44,18 +44,8 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]map[string]string, error) {
return nil, fmt.Errorf("cannot create API config: %w", err)
}
switch sdc.Role {
case "node":
return getNodesLabels(cfg), nil
case "pod":
return getPodsLabels(cfg), nil
case "service":
return getServicesLabels(cfg), nil
case "endpoints":
return getEndpointsLabels(cfg), nil
case "endpointslices":
return getEndpointSlicesLabels(cfg), nil
case "ingress":
return getIngressesLabels(cfg), nil
case "node", "pod", "service", "endpoints", "endpointslices", "ingress":
return cfg.aw.getLabelsForRole(sdc.Role), nil
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role)
}

View file

@ -8,24 +8,6 @@ import (
)
// getNodesLabels returns labels for k8s nodes obtained from the given cfg
func getNodesLabels(cfg *apiConfig) []map[string]string {
nodes := getNodes(cfg)
var ms []map[string]string
for _, n := range nodes {
ms = n.appendTargetLabels(ms)
}
return ms
}
func getNodes(cfg *apiConfig) []*Node {
os := cfg.aw.getObjectsByRole("node")
ns := make([]*Node, len(os))
for i, o := range os {
ns[i] = o.(*Node)
}
return ns
}
func (n *Node) key() string {
return n.Metadata.key()
}
@ -89,14 +71,14 @@ type NodeDaemonEndpoints struct {
KubeletEndpoint DaemonEndpoint
}
// appendTargetLabels appends labels for the given Node n to ms and returns the result.
// getTargetLabels returs labels for the given n.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string {
func (n *Node) getTargetLabels(aw *apiWatcher) []map[string]string {
addr := getNodeAddr(n.Status.Addresses)
if len(addr) == 0 {
// Skip node without address
return ms
return nil
}
addr = discoveryutils.JoinHostPort(addr, n.Status.DaemonEndpoints.KubeletEndpoint.Port)
m := map[string]string{
@ -114,8 +96,7 @@ func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string {
ln := discoveryutils.SanitizeLabelName(a.Type)
m["__meta_kubernetes_node_address_"+ln] = a.Address
}
ms = append(ms, m)
return ms
return []map[string]string{m}
}
func getNodeAddr(nas []NodeAddress) string {

View file

@ -235,9 +235,7 @@ func TestParseNodeListSuccess(t *testing.T) {
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Node).appendTargetLabels(nil)
})
sortedLabelss := getSortedLabelss(objectsByKey)
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"instance": "m01",
@ -283,10 +281,10 @@ func TestParseNodeListSuccess(t *testing.T) {
}
}
func getSortedLabelss(objectsByKey map[string]object, getLabelss func(o object) []map[string]string) [][]prompbmarshal.Label {
func getSortedLabelss(objectsByKey map[string]object) [][]prompbmarshal.Label {
var result [][]prompbmarshal.Label
for _, o := range objectsByKey {
labelss := getLabelss(o)
labelss := o.getTargetLabels(nil)
for _, labels := range labelss {
result = append(result, discoveryutils.GetSortedLabels(labels))
}

View file

@ -9,25 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getPodsLabels returns labels for k8s pods obtained from the given cfg
func getPodsLabels(cfg *apiConfig) []map[string]string {
pods := getPods(cfg)
var ms []map[string]string
for _, p := range pods {
ms = p.appendTargetLabels(ms)
}
return ms
}
func getPods(cfg *apiConfig) []*Pod {
os := cfg.aw.getObjectsByRole("pod")
ps := make([]*Pod, len(os))
for i, o := range os {
ps[i] = o.(*Pod)
}
return ps
}
func (p *Pod) key() string {
return p.Metadata.key()
}
@ -111,14 +92,15 @@ type PodCondition struct {
Status string
}
// appendTargetLabels appends labels for each port of the given Pod p to ms and returns the result.
// getTargetLabels returns labels for each port of the given p.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod
func (p *Pod) appendTargetLabels(ms []map[string]string) []map[string]string {
func (p *Pod) getTargetLabels(aw *apiWatcher) []map[string]string {
if len(p.Status.PodIP) == 0 {
// Skip pod without IP
return ms
return nil
}
var ms []map[string]string
ms = appendPodLabels(ms, p, p.Spec.Containers, "false")
ms = appendPodLabels(ms, p, p.Spec.InitContainers, "true")
return ms

View file

@ -236,9 +236,7 @@ func TestParsePodListSuccess(t *testing.T) {
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Pod).appendTargetLabels(nil)
})
sortedLabelss := getSortedLabelss(objectsByKey)
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.17.0.2:1234",

View file

@ -7,25 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getServicesLabels returns labels for k8s services obtained from the given cfg
func getServicesLabels(cfg *apiConfig) []map[string]string {
svcs := getServices(cfg)
var ms []map[string]string
for _, svc := range svcs {
ms = svc.appendTargetLabels(ms)
}
return ms
}
func getServices(cfg *apiConfig) []*Service {
os := cfg.aw.getObjectsByRole("service")
svcs := make([]*Service, len(os))
for i, o := range os {
svcs[i] = o.(*Service)
}
return svcs
}
func (s *Service) key() string {
return s.Metadata.key()
}
@ -85,11 +66,12 @@ type ServicePort struct {
Port int
}
// appendTargetLabels appends labels for each port of the given Service s to ms and returns the result.
// getTargetLabels returns labels for each port of the given s.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service
func (s *Service) appendTargetLabels(ms []map[string]string) []map[string]string {
func (s *Service) getTargetLabels(aw *apiWatcher) []map[string]string {
host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace)
var ms []map[string]string
for _, sp := range s.Spec.Ports {
addr := discoveryutils.JoinHostPort(host, sp.Port)
m := map[string]string{

View file

@ -97,9 +97,7 @@ func TestParseServiceListSuccess(t *testing.T) {
if meta.ResourceVersion != expectedResourceVersion {
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
}
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
return o.(*Service).appendTargetLabels(nil)
})
sortedLabelss := getSortedLabelss(objectsByKey)
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:53",