mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/promscrape/discovery/kubernetes: cache target labels
This should reduce CPU usage on repeated SDConfig.GetLabels() calls.
This commit is contained in:
parent
19712fc2bd
commit
815666e6a6
14 changed files with 58 additions and 184 deletions
|
@ -109,6 +109,7 @@ type WatchEvent struct {
|
|||
// object is any Kubernetes object.
|
||||
type object interface {
|
||||
key() string
|
||||
getTargetLabels(aw *apiWatcher) []map[string]string
|
||||
}
|
||||
|
||||
// parseObjectFunc must parse object from the given data.
|
||||
|
@ -159,6 +160,23 @@ func newAPIWatcher(client *http.Client, apiServer, authorization string, namespa
|
|||
}
|
||||
}
|
||||
|
||||
// getLabelsForRole returns all the sets of labels for the given role.
|
||||
func (aw *apiWatcher) getLabelsForRole(role string) []map[string]string {
|
||||
var ms []map[string]string
|
||||
aw.mu.Lock()
|
||||
for _, uw := range aw.watchersByURL {
|
||||
if uw.role != role {
|
||||
continue
|
||||
}
|
||||
uw.mu.Lock()
|
||||
for _, labels := range uw.labelsByKey {
|
||||
ms = append(ms, labels...)
|
||||
}
|
||||
uw.mu.Unlock()
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
// getObjectByRole returns an object with the given (namespace, name) key and the given role.
|
||||
func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
|
||||
if aw == nil {
|
||||
|
@ -184,26 +202,6 @@ func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object {
|
|||
return o
|
||||
}
|
||||
|
||||
// getObjectsByRole returns all the objects for the given role.
|
||||
func (aw *apiWatcher) getObjectsByRole(role string) []object {
|
||||
aw.startWatchersForRole(role)
|
||||
var os []object
|
||||
aw.mu.Lock()
|
||||
for _, uw := range aw.watchersByURL {
|
||||
if uw.role != role {
|
||||
continue
|
||||
}
|
||||
uw.mu.Lock()
|
||||
for _, o := range uw.objectsByKey {
|
||||
os = append(os, o)
|
||||
}
|
||||
uw.mu.Unlock()
|
||||
}
|
||||
aw.lastAccessTime = time.Now()
|
||||
aw.mu.Unlock()
|
||||
return os
|
||||
}
|
||||
|
||||
func (aw *apiWatcher) startWatchersForRole(role string) {
|
||||
parseObject, parseObjectList := getObjectParsersForRole(role)
|
||||
paths := getAPIPaths(role, aw.namespaces, aw.selectors)
|
||||
|
@ -258,11 +256,12 @@ type urlWatcher struct {
|
|||
parseObject parseObjectFunc
|
||||
parseObjectList parseObjectListFunc
|
||||
|
||||
// mu protects objectsByKey
|
||||
// mu protects objectsByKey and labelsByKey
|
||||
mu sync.Mutex
|
||||
|
||||
// objectsByKey contains the latest state for objects obtained from apiURL
|
||||
objectsByKey map[string]object
|
||||
labelsByKey map[string][]map[string]string
|
||||
|
||||
// the parent apiWatcher
|
||||
aw *apiWatcher
|
||||
|
@ -277,6 +276,7 @@ func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObject
|
|||
parseObjectList: parseObjectList,
|
||||
|
||||
objectsByKey: make(map[string]object),
|
||||
labelsByKey: make(map[string][]map[string]string),
|
||||
|
||||
aw: aw,
|
||||
}
|
||||
|
@ -394,9 +394,14 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
|
|||
uw.mu.Lock()
|
||||
uw.objectsByKey[key] = o
|
||||
uw.mu.Unlock()
|
||||
labels := o.getTargetLabels(uw.aw)
|
||||
uw.mu.Lock()
|
||||
uw.labelsByKey[key] = labels
|
||||
uw.mu.Unlock()
|
||||
case "DELETED":
|
||||
uw.mu.Lock()
|
||||
delete(uw.objectsByKey, key)
|
||||
delete(uw.labelsByKey, key)
|
||||
uw.mu.Unlock()
|
||||
default:
|
||||
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role)
|
||||
|
|
|
@ -7,25 +7,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
// getEndpointsLabels returns labels for k8s endpoints obtained from the given cfg.
|
||||
func getEndpointsLabels(cfg *apiConfig) []map[string]string {
|
||||
epss := getEndpoints(cfg)
|
||||
var ms []map[string]string
|
||||
for _, eps := range epss {
|
||||
ms = eps.appendTargetLabels(ms, cfg.aw)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func getEndpoints(cfg *apiConfig) []*Endpoints {
|
||||
os := cfg.aw.getObjectsByRole("endpoint")
|
||||
epss := make([]*Endpoints, len(os))
|
||||
for i, o := range os {
|
||||
epss[i] = o.(*Endpoints)
|
||||
}
|
||||
return epss
|
||||
}
|
||||
|
||||
func (eps *Endpoints) key() string {
|
||||
return eps.Metadata.key()
|
||||
}
|
||||
|
@ -104,15 +85,16 @@ type EndpointPort struct {
|
|||
Protocol string
|
||||
}
|
||||
|
||||
// appendTargetLabels appends labels for each endpoint in eps to ms and returns the result.
|
||||
// getTargetLabels returns labels for each endpoint in eps.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
|
||||
func (eps *Endpoints) appendTargetLabels(ms []map[string]string, aw *apiWatcher) []map[string]string {
|
||||
func (eps *Endpoints) getTargetLabels(aw *apiWatcher) []map[string]string {
|
||||
var svc *Service
|
||||
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
|
||||
svc = o.(*Service)
|
||||
}
|
||||
podPortsSeen := make(map[*Pod][]int)
|
||||
var ms []map[string]string
|
||||
for _, ess := range eps.Subsets {
|
||||
for _, epp := range ess.Ports {
|
||||
ms = appendEndpointLabelsForAddresses(ms, aw, podPortsSeen, eps, ess.Addresses, epp, svc, "true")
|
||||
|
|
|
@ -88,9 +88,7 @@ func TestParseEndpointsListSuccess(t *testing.T) {
|
|||
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
|
||||
}
|
||||
|
||||
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
|
||||
return o.(*Endpoints).appendTargetLabels(nil, nil)
|
||||
})
|
||||
sortedLabelss := getSortedLabelss(objectsByKey)
|
||||
expectedLabelss := [][]prompbmarshal.Label{
|
||||
discoveryutils.GetSortedLabels(map[string]string{
|
||||
"__address__": "172.17.0.2:8443",
|
||||
|
|
|
@ -8,26 +8,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
// getEndpointSlicesLabels returns labels for k8s endpointSlices obtained from the given cfg.
|
||||
func getEndpointSlicesLabels(cfg *apiConfig) []map[string]string {
|
||||
epss := getEndpointSlices(cfg)
|
||||
var ms []map[string]string
|
||||
for _, eps := range epss {
|
||||
ms = eps.appendTargetLabels(ms, cfg.aw)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
// getEndpointSlices retrieves endpointSlice with given apiConfig
|
||||
func getEndpointSlices(cfg *apiConfig) []*EndpointSlice {
|
||||
os := cfg.aw.getObjectsByRole("endpointslices")
|
||||
epss := make([]*EndpointSlice, len(os))
|
||||
for i, o := range os {
|
||||
epss[i] = o.(*EndpointSlice)
|
||||
}
|
||||
return epss
|
||||
}
|
||||
|
||||
func (eps *EndpointSlice) key() string {
|
||||
return eps.Metadata.key()
|
||||
}
|
||||
|
@ -52,14 +32,16 @@ func parseEndpointSlice(data []byte) (object, error) {
|
|||
return &eps, nil
|
||||
}
|
||||
|
||||
// appendTargetLabels injects labels for endPointSlice to slice map
|
||||
// follows TargetRef for enrich labels with pod and service metadata
|
||||
func (eps *EndpointSlice) appendTargetLabels(ms []map[string]string, aw *apiWatcher) []map[string]string {
|
||||
// getTargetLabels returns labels for eps.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices
|
||||
func (eps *EndpointSlice) getTargetLabels(aw *apiWatcher) []map[string]string {
|
||||
var svc *Service
|
||||
if o := aw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
|
||||
svc = o.(*Service)
|
||||
}
|
||||
podPortsSeen := make(map[*Pod][]int)
|
||||
var ms []map[string]string
|
||||
for _, ess := range eps.Endpoints {
|
||||
var p *Pod
|
||||
if o := aw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil {
|
||||
|
|
|
@ -185,9 +185,7 @@ func TestParseEndpointSliceListSuccess(t *testing.T) {
|
|||
if meta.ResourceVersion != expectedResourceVersion {
|
||||
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
|
||||
}
|
||||
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
|
||||
return o.(*EndpointSlice).appendTargetLabels(nil, nil)
|
||||
})
|
||||
sortedLabelss := getSortedLabelss(objectsByKey)
|
||||
expectedLabelss := [][]prompbmarshal.Label{
|
||||
discoveryutils.GetSortedLabels(map[string]string{
|
||||
"__address__": "172.18.0.2:6443",
|
||||
|
|
|
@ -5,25 +5,6 @@ import (
|
|||
"fmt"
|
||||
)
|
||||
|
||||
// getIngressesLabels returns labels for k8s ingresses obtained from the given cfg.
|
||||
func getIngressesLabels(cfg *apiConfig) []map[string]string {
|
||||
igs := getIngresses(cfg)
|
||||
var ms []map[string]string
|
||||
for _, ig := range igs {
|
||||
ms = ig.appendTargetLabels(ms)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func getIngresses(cfg *apiConfig) []*Ingress {
|
||||
os := cfg.aw.getObjectsByRole("ingress")
|
||||
igs := make([]*Ingress, len(os))
|
||||
for i, o := range os {
|
||||
igs[i] = o.(*Ingress)
|
||||
}
|
||||
return igs
|
||||
}
|
||||
|
||||
func (ig *Ingress) key() string {
|
||||
return ig.Metadata.key()
|
||||
}
|
||||
|
@ -101,16 +82,17 @@ type HTTPIngressPath struct {
|
|||
Path string
|
||||
}
|
||||
|
||||
// appendTargetLabels appends labels for Ingress ig to ms and returns the result.
|
||||
// getTargetLabels returns labels for ig.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress
|
||||
func (ig *Ingress) appendTargetLabels(ms []map[string]string) []map[string]string {
|
||||
func (ig *Ingress) getTargetLabels(aw *apiWatcher) []map[string]string {
|
||||
tlsHosts := make(map[string]bool)
|
||||
for _, tls := range ig.Spec.TLS {
|
||||
for _, host := range tls.Hosts {
|
||||
tlsHosts[host] = true
|
||||
}
|
||||
}
|
||||
var ms []map[string]string
|
||||
for _, r := range ig.Spec.Rules {
|
||||
paths := getIngressRulePaths(r.HTTP.Paths)
|
||||
scheme := "http"
|
||||
|
|
|
@ -79,9 +79,7 @@ func TestParseIngressListSuccess(t *testing.T) {
|
|||
if meta.ResourceVersion != expectedResourceVersion {
|
||||
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
|
||||
}
|
||||
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
|
||||
return o.(*Ingress).appendTargetLabels(nil)
|
||||
})
|
||||
sortedLabelss := getSortedLabelss(objectsByKey)
|
||||
expectedLabelss := [][]prompbmarshal.Label{
|
||||
discoveryutils.GetSortedLabels(map[string]string{
|
||||
"__address__": "foobar",
|
||||
|
|
|
@ -44,18 +44,8 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]map[string]string, error) {
|
|||
return nil, fmt.Errorf("cannot create API config: %w", err)
|
||||
}
|
||||
switch sdc.Role {
|
||||
case "node":
|
||||
return getNodesLabels(cfg), nil
|
||||
case "pod":
|
||||
return getPodsLabels(cfg), nil
|
||||
case "service":
|
||||
return getServicesLabels(cfg), nil
|
||||
case "endpoints":
|
||||
return getEndpointsLabels(cfg), nil
|
||||
case "endpointslices":
|
||||
return getEndpointSlicesLabels(cfg), nil
|
||||
case "ingress":
|
||||
return getIngressesLabels(cfg), nil
|
||||
case "node", "pod", "service", "endpoints", "endpointslices", "ingress":
|
||||
return cfg.aw.getLabelsForRole(sdc.Role), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `pod`, `service`, `endpoints`, `endpointslices` or `ingress`; skipping it", sdc.Role)
|
||||
}
|
||||
|
|
|
@ -8,24 +8,6 @@ import (
|
|||
)
|
||||
|
||||
// getNodesLabels returns labels for k8s nodes obtained from the given cfg
|
||||
func getNodesLabels(cfg *apiConfig) []map[string]string {
|
||||
nodes := getNodes(cfg)
|
||||
var ms []map[string]string
|
||||
for _, n := range nodes {
|
||||
ms = n.appendTargetLabels(ms)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func getNodes(cfg *apiConfig) []*Node {
|
||||
os := cfg.aw.getObjectsByRole("node")
|
||||
ns := make([]*Node, len(os))
|
||||
for i, o := range os {
|
||||
ns[i] = o.(*Node)
|
||||
}
|
||||
return ns
|
||||
}
|
||||
|
||||
func (n *Node) key() string {
|
||||
return n.Metadata.key()
|
||||
}
|
||||
|
@ -89,14 +71,14 @@ type NodeDaemonEndpoints struct {
|
|||
KubeletEndpoint DaemonEndpoint
|
||||
}
|
||||
|
||||
// appendTargetLabels appends labels for the given Node n to ms and returns the result.
|
||||
// getTargetLabels returs labels for the given n.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node
|
||||
func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string {
|
||||
func (n *Node) getTargetLabels(aw *apiWatcher) []map[string]string {
|
||||
addr := getNodeAddr(n.Status.Addresses)
|
||||
if len(addr) == 0 {
|
||||
// Skip node without address
|
||||
return ms
|
||||
return nil
|
||||
}
|
||||
addr = discoveryutils.JoinHostPort(addr, n.Status.DaemonEndpoints.KubeletEndpoint.Port)
|
||||
m := map[string]string{
|
||||
|
@ -114,8 +96,7 @@ func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string {
|
|||
ln := discoveryutils.SanitizeLabelName(a.Type)
|
||||
m["__meta_kubernetes_node_address_"+ln] = a.Address
|
||||
}
|
||||
ms = append(ms, m)
|
||||
return ms
|
||||
return []map[string]string{m}
|
||||
}
|
||||
|
||||
func getNodeAddr(nas []NodeAddress) string {
|
||||
|
|
|
@ -235,9 +235,7 @@ func TestParseNodeListSuccess(t *testing.T) {
|
|||
if meta.ResourceVersion != expectedResourceVersion {
|
||||
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
|
||||
}
|
||||
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
|
||||
return o.(*Node).appendTargetLabels(nil)
|
||||
})
|
||||
sortedLabelss := getSortedLabelss(objectsByKey)
|
||||
expectedLabelss := [][]prompbmarshal.Label{
|
||||
discoveryutils.GetSortedLabels(map[string]string{
|
||||
"instance": "m01",
|
||||
|
@ -283,10 +281,10 @@ func TestParseNodeListSuccess(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func getSortedLabelss(objectsByKey map[string]object, getLabelss func(o object) []map[string]string) [][]prompbmarshal.Label {
|
||||
func getSortedLabelss(objectsByKey map[string]object) [][]prompbmarshal.Label {
|
||||
var result [][]prompbmarshal.Label
|
||||
for _, o := range objectsByKey {
|
||||
labelss := getLabelss(o)
|
||||
labelss := o.getTargetLabels(nil)
|
||||
for _, labels := range labelss {
|
||||
result = append(result, discoveryutils.GetSortedLabels(labels))
|
||||
}
|
||||
|
|
|
@ -9,25 +9,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
// getPodsLabels returns labels for k8s pods obtained from the given cfg
|
||||
func getPodsLabels(cfg *apiConfig) []map[string]string {
|
||||
pods := getPods(cfg)
|
||||
var ms []map[string]string
|
||||
for _, p := range pods {
|
||||
ms = p.appendTargetLabels(ms)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func getPods(cfg *apiConfig) []*Pod {
|
||||
os := cfg.aw.getObjectsByRole("pod")
|
||||
ps := make([]*Pod, len(os))
|
||||
for i, o := range os {
|
||||
ps[i] = o.(*Pod)
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
func (p *Pod) key() string {
|
||||
return p.Metadata.key()
|
||||
}
|
||||
|
@ -111,14 +92,15 @@ type PodCondition struct {
|
|||
Status string
|
||||
}
|
||||
|
||||
// appendTargetLabels appends labels for each port of the given Pod p to ms and returns the result.
|
||||
// getTargetLabels returns labels for each port of the given p.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#pod
|
||||
func (p *Pod) appendTargetLabels(ms []map[string]string) []map[string]string {
|
||||
func (p *Pod) getTargetLabels(aw *apiWatcher) []map[string]string {
|
||||
if len(p.Status.PodIP) == 0 {
|
||||
// Skip pod without IP
|
||||
return ms
|
||||
return nil
|
||||
}
|
||||
var ms []map[string]string
|
||||
ms = appendPodLabels(ms, p, p.Spec.Containers, "false")
|
||||
ms = appendPodLabels(ms, p, p.Spec.InitContainers, "true")
|
||||
return ms
|
||||
|
|
|
@ -236,9 +236,7 @@ func TestParsePodListSuccess(t *testing.T) {
|
|||
if meta.ResourceVersion != expectedResourceVersion {
|
||||
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
|
||||
}
|
||||
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
|
||||
return o.(*Pod).appendTargetLabels(nil)
|
||||
})
|
||||
sortedLabelss := getSortedLabelss(objectsByKey)
|
||||
expectedLabelss := [][]prompbmarshal.Label{
|
||||
discoveryutils.GetSortedLabels(map[string]string{
|
||||
"__address__": "172.17.0.2:1234",
|
||||
|
|
|
@ -7,25 +7,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
// getServicesLabels returns labels for k8s services obtained from the given cfg
|
||||
func getServicesLabels(cfg *apiConfig) []map[string]string {
|
||||
svcs := getServices(cfg)
|
||||
var ms []map[string]string
|
||||
for _, svc := range svcs {
|
||||
ms = svc.appendTargetLabels(ms)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func getServices(cfg *apiConfig) []*Service {
|
||||
os := cfg.aw.getObjectsByRole("service")
|
||||
svcs := make([]*Service, len(os))
|
||||
for i, o := range os {
|
||||
svcs[i] = o.(*Service)
|
||||
}
|
||||
return svcs
|
||||
}
|
||||
|
||||
func (s *Service) key() string {
|
||||
return s.Metadata.key()
|
||||
}
|
||||
|
@ -85,11 +66,12 @@ type ServicePort struct {
|
|||
Port int
|
||||
}
|
||||
|
||||
// appendTargetLabels appends labels for each port of the given Service s to ms and returns the result.
|
||||
// getTargetLabels returns labels for each port of the given s.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service
|
||||
func (s *Service) appendTargetLabels(ms []map[string]string) []map[string]string {
|
||||
func (s *Service) getTargetLabels(aw *apiWatcher) []map[string]string {
|
||||
host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace)
|
||||
var ms []map[string]string
|
||||
for _, sp := range s.Spec.Ports {
|
||||
addr := discoveryutils.JoinHostPort(host, sp.Port)
|
||||
m := map[string]string{
|
||||
|
|
|
@ -97,9 +97,7 @@ func TestParseServiceListSuccess(t *testing.T) {
|
|||
if meta.ResourceVersion != expectedResourceVersion {
|
||||
t.Fatalf("unexpected resource version; got %s; want %s", meta.ResourceVersion, expectedResourceVersion)
|
||||
}
|
||||
sortedLabelss := getSortedLabelss(objectsByKey, func(o object) []map[string]string {
|
||||
return o.(*Service).appendTargetLabels(nil)
|
||||
})
|
||||
sortedLabelss := getSortedLabelss(objectsByKey)
|
||||
expectedLabelss := [][]prompbmarshal.Label{
|
||||
discoveryutils.GetSortedLabels(map[string]string{
|
||||
"__address__": "kube-dns.kube-system.svc:53",
|
||||
|
|
Loading…
Reference in a new issue