lib/promscrape: move KubernetesSDConfig to lib/promscrape/discovery/kubernetes

This commit is contained in:
Aliaksandr Valialkin 2020-04-23 11:34:04 +03:00
parent 1187494c8f
commit e220f3eeb6
10 changed files with 109 additions and 94 deletions

View file

@ -59,7 +59,7 @@ type ScrapeConfig struct {
TLSConfig *promauth.TLSConfig `yaml:"tls_config"` TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
StaticConfigs []StaticConfig `yaml:"static_configs"` StaticConfigs []StaticConfig `yaml:"static_configs"`
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"`
KubernetesSDConfigs []KubernetesSDConfig `yaml:"kubernetes_sd_configs"` KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"`
RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"` RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"`
MetricRelabelConfigs []promrelabel.RelabelConfig `yaml:"metric_relabel_configs"` MetricRelabelConfigs []promrelabel.RelabelConfig `yaml:"metric_relabel_configs"`
SampleLimit int `yaml:"sample_limit"` SampleLimit int `yaml:"sample_limit"`
@ -76,25 +76,6 @@ type FileSDConfig struct {
// `refresh_interval` is ignored. See `-prometheus.fileSDCheckInterval` // `refresh_interval` is ignored. See `-prometheus.fileSDCheckInterval`
} }
// KubernetesSDConfig represents kubernetes-based service discovery config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
type KubernetesSDConfig struct {
APIServer string `yaml:"api_server"`
Role string `yaml:"role"`
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth"`
BearerToken string `yaml:"bearer_token"`
BearerTokenFile string `yaml:"bearer_token_file"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
Namespaces KubernetesNamespaces `yaml:"namespaces"`
Selectors []kubernetes.Selector `yaml:"selectors"`
}
// KubernetesNamespaces represents namespaces for KubernetesSDConfig
type KubernetesNamespaces struct {
Names []string `yaml:"names"`
}
// StaticConfig represents essential parts for `static_config` section of Prometheus config. // StaticConfig represents essential parts for `static_config` section of Prometheus config.
// //
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config
@ -178,8 +159,9 @@ func (cfg *Config) fileSDConfigsCount() int {
func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork { func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork {
var dst []ScrapeWork var dst []ScrapeWork
for _, sc := range cfg.ScrapeConfigs { for _, sc := range cfg.ScrapeConfigs {
for _, sdc := range sc.KubernetesSDConfigs { for i := range sc.KubernetesSDConfigs {
dst = sdc.appendScrapeWork(dst, cfg.baseDir, sc.swc) sdc := &sc.KubernetesSDConfigs[i]
dst = appendKubernetesScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
} }
} }
return dst return dst
@ -299,34 +281,24 @@ type scrapeWorkConfig struct {
sampleLimit int sampleLimit int
} }
func (sdc *KubernetesSDConfig) appendScrapeWork(dst []ScrapeWork, baseDir string, swc *scrapeWorkConfig) []ScrapeWork { func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil { if err != nil {
logger.Errorf("cannot parse auth config for `kubernetes_sd_config` for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("cannot parse auth config for `kubernetes_sd_config` for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst return dst
} }
cfg := &kubernetes.APIConfig{ targetLabels, err := kubernetes.GetLabels(ac, sdc)
Server: sdc.APIServer,
AuthConfig: ac,
Namespaces: sdc.Namespaces.Names,
Selectors: sdc.Selectors,
}
targetLabels, err := kubernetes.GetLabels(cfg, sdc.Role)
if err != nil { if err != nil {
logger.Errorf("error when discovering kubernetes nodes for `job_name` %q: %s; skipping it", swc.jobName, err) logger.Errorf("error when discovering kubernetes nodes for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst return dst
} }
return appendKubernetesScrapeWork(dst, swc, targetLabels, sdc.Role)
}
func appendKubernetesScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, role string) []ScrapeWork {
for _, metaLabels := range targetLabels { for _, metaLabels := range targetLabels {
target := metaLabels["__address__"] target := metaLabels["__address__"]
var err error var err error
dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels) dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels)
if err != nil { if err != nil {
logger.Errorf("error when parsing `kubernetes_sd_config` target %q with role %q for `job_name` %q: %s; skipping it", logger.Errorf("error when parsing `kubernetes_sd_config` target %q with role %q for `job_name` %q: %s; skipping it",
target, role, swc.jobName, err) target, sdc.Role, swc.jobName, err)
continue continue
} }
} }

View file

@ -14,7 +14,15 @@ import (
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
func getAPIResponse(cfg *APIConfig, role, path string) ([]byte, error) { // apiConfig contains config for API server
type apiConfig struct {
Server string
AuthConfig *promauth.Config
Namespaces []string
Selectors []Selector
}
func getAPIResponse(cfg *apiConfig, role, path string) ([]byte, error) {
hcv, err := getHostClient(cfg.Server, cfg.AuthConfig) hcv, err := getHostClient(cfg.Server, cfg.AuthConfig)
if err != nil { if err != nil {
return nil, err return nil, err
@ -122,7 +130,8 @@ func newHostClient(apiServer string, ac *promauth.Config) (*hcValue, error) {
// Assume we run at k8s pod. // Assume we run at k8s pod.
// Discover apiServer and auth config according to k8s docs. // Discover apiServer and auth config according to k8s docs.
// See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller // See https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#service-account-admission-controller
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT") host := os.Getenv("KUBERNETES_SERVICE_HOST")
port := os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 { if len(host) == 0 {
return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " + return nil, fmt.Errorf("cannot find KUBERNETES_SERVICE_HOST env var; it must be defined when running in k8s; " +
"probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?") "probably, `kubernetes_sd_config->api_server` is missing in Prometheus configs?")

View file

@ -3,15 +3,12 @@ package kubernetes
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"net/url" "net/url"
"regexp"
"sort" "sort"
"strconv"
"strings" "strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
// ObjectMeta represents ObjectMeta from k8s API. // ObjectMeta represents ObjectMeta from k8s API.
@ -28,29 +25,17 @@ type ObjectMeta struct {
func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) { func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]string) {
for _, lb := range om.Labels { for _, lb := range om.Labels {
ln := sanitizeLabelName(lb.Name) ln := discoveryutils.SanitizeLabelName(lb.Name)
m[fmt.Sprintf("%s_label_%s", prefix, ln)] = lb.Value m[fmt.Sprintf("%s_label_%s", prefix, ln)] = lb.Value
m[fmt.Sprintf("%s_labelpresent_%s", prefix, ln)] = "true" m[fmt.Sprintf("%s_labelpresent_%s", prefix, ln)] = "true"
} }
for _, a := range om.Annotations { for _, a := range om.Annotations {
an := sanitizeLabelName(a.Name) an := discoveryutils.SanitizeLabelName(a.Name)
m[fmt.Sprintf("%s_annotation_%s", prefix, an)] = a.Value m[fmt.Sprintf("%s_annotation_%s", prefix, an)] = a.Value
m[fmt.Sprintf("%s_annotationpresent_%s", prefix, an)] = "true" m[fmt.Sprintf("%s_annotationpresent_%s", prefix, an)] = "true"
} }
} }
// sanitizeLabelName replaces anything that doesn't match
// client_label.LabelNameRE with an underscore.
//
// This has been copied from Prometheus sources at util/strutil/strconv.go
func sanitizeLabelName(name string) string {
return invalidLabelCharRE.ReplaceAllString(name, "_")
}
var (
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
)
// SortedLabels represents sorted labels. // SortedLabels represents sorted labels.
type SortedLabels []prompbmarshal.Label type SortedLabels []prompbmarshal.Label
@ -94,29 +79,6 @@ type DaemonEndpoint struct {
Port int Port int
} }
func joinHostPort(host string, port int) string {
portStr := strconv.Itoa(port)
return net.JoinHostPort(host, portStr)
}
// APIConfig contains config for API server
type APIConfig struct {
Server string
AuthConfig *promauth.Config
Namespaces []string
Selectors []Selector
}
// Selector represents kubernetes selector.
//
// See https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
// and https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
type Selector struct {
Role string `yaml:"role"`
Label string `yaml:"label"`
Field string `yaml:"field"`
}
func joinSelectors(role string, namespaces []string, selectors []Selector) string { func joinSelectors(role string, namespaces []string, selectors []Selector) string {
var labelSelectors, fieldSelectors []string var labelSelectors, fieldSelectors []string
for _, ns := range namespaces { for _, ns := range namespaces {

View file

@ -3,10 +3,12 @@ package kubernetes
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
// getEndpointsLabels returns labels for k8s endpoints obtained from the given cfg. // getEndpointsLabels returns labels for k8s endpoints obtained from the given cfg.
func getEndpointsLabels(cfg *APIConfig) ([]map[string]string, error) { func getEndpointsLabels(cfg *apiConfig) ([]map[string]string, error) {
data, err := getAPIResponse(cfg, "endpoints", "/api/v1/endpoints") data, err := getAPIResponse(cfg, "endpoints", "/api/v1/endpoints")
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain endpoints data from API server: %s", err) return nil, fmt.Errorf("cannot obtain endpoints data from API server: %s", err)
@ -120,7 +122,7 @@ func (eps *Endpoints) appendTargetLabels(ms []map[string]string, pods []Pod, svc
if portSeen(cp.ContainerPort, ports) { if portSeen(cp.ContainerPort, ports) {
continue continue
} }
addr := joinHostPort(p.Status.PodIP, cp.ContainerPort) addr := discoveryutils.JoinHostPort(p.Status.PodIP, cp.ContainerPort)
m := map[string]string{ m := map[string]string{
"__address__": addr, "__address__": addr,
} }
@ -165,7 +167,7 @@ func getEndpointLabelsForAddressAndPort(podPortsSeen map[*Pod][]int, eps *Endpoi
} }
func getEndpointLabels(om ObjectMeta, ea EndpointAddress, epp EndpointPort, ready string) map[string]string { func getEndpointLabels(om ObjectMeta, ea EndpointAddress, epp EndpointPort, ready string) map[string]string {
addr := joinHostPort(ea.IP, epp.Port) addr := discoveryutils.JoinHostPort(ea.IP, epp.Port)
m := map[string]string{ m := map[string]string{
"__address__": addr, "__address__": addr,
"__meta_kubernetes_namespace": om.Namespace, "__meta_kubernetes_namespace": om.Namespace,

View file

@ -6,7 +6,7 @@ import (
) )
// getIngressesLabels returns labels for k8s ingresses obtained from the given cfg. // getIngressesLabels returns labels for k8s ingresses obtained from the given cfg.
func getIngressesLabels(cfg *APIConfig) ([]map[string]string, error) { func getIngressesLabels(cfg *apiConfig) ([]map[string]string, error) {
data, err := getAPIResponse(cfg, "ingress", "/apis/extensions/v1beta1/ingresses") data, err := getAPIResponse(cfg, "ingress", "/apis/extensions/v1beta1/ingresses")
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain ingresses data from API server: %s", err) return nil, fmt.Errorf("cannot obtain ingresses data from API server: %s", err)

View file

@ -2,11 +2,19 @@ package kubernetes
import ( import (
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
) )
// GetLabels returns labels for the given k8s role and the given cfg. // GetLabels returns labels for the given k8s role and the given cfg.
func GetLabels(cfg *APIConfig, role string) ([]map[string]string, error) { func GetLabels(ac *promauth.Config, sdc *SDConfig) ([]map[string]string, error) {
switch role { cfg := &apiConfig{
Server: sdc.APIServer,
AuthConfig: ac,
Namespaces: sdc.Namespaces.Names,
Selectors: sdc.Selectors,
}
switch sdc.Role {
case "node": case "node":
return getNodesLabels(cfg) return getNodesLabels(cfg)
case "service": case "service":
@ -18,6 +26,35 @@ func GetLabels(cfg *APIConfig, role string) ([]map[string]string, error) {
case "ingress": case "ingress":
return getIngressesLabels(cfg) return getIngressesLabels(cfg)
default: default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `service`, `pod`, `endpoints` or `ingress`; skipping it", role) return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `service`, `pod`, `endpoints` or `ingress`; skipping it", sdc.Role)
} }
} }
// SDConfig represents kubernetes-based service discovery config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
type SDConfig struct {
APIServer string `yaml:"api_server"`
Role string `yaml:"role"`
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth"`
BearerToken string `yaml:"bearer_token"`
BearerTokenFile string `yaml:"bearer_token_file"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
Namespaces Namespaces `yaml:"namespaces"`
Selectors []Selector `yaml:"selectors"`
}
// Namespaces represents namespaces for SDConfig
type Namespaces struct {
Names []string `yaml:"names"`
}
// Selector represents kubernetes selector.
//
// See https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
// and https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
type Selector struct {
Role string `yaml:"role"`
Label string `yaml:"label"`
Field string `yaml:"field"`
}

View file

@ -3,10 +3,12 @@ package kubernetes
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
// getNodesLabels returns labels for k8s nodes obtained from the given cfg. // getNodesLabels returns labels for k8s nodes obtained from the given cfg.
func getNodesLabels(cfg *APIConfig) ([]map[string]string, error) { func getNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
data, err := getAPIResponse(cfg, "node", "/api/v1/nodes") data, err := getAPIResponse(cfg, "node", "/api/v1/nodes")
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain nodes data from API server: %s", err) return nil, fmt.Errorf("cannot obtain nodes data from API server: %s", err)
@ -79,7 +81,7 @@ func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string {
// Skip node without address // Skip node without address
return ms return ms
} }
addr = joinHostPort(addr, n.Status.DaemonEndpoints.KubeletEndpoint.Port) addr = discoveryutils.JoinHostPort(addr, n.Status.DaemonEndpoints.KubeletEndpoint.Port)
m := map[string]string{ m := map[string]string{
"__address__": addr, "__address__": addr,
"instance": n.Metadata.Name, "instance": n.Metadata.Name,
@ -92,7 +94,7 @@ func (n *Node) appendTargetLabels(ms []map[string]string) []map[string]string {
continue continue
} }
addrTypesUsed[a.Type] = true addrTypesUsed[a.Type] = true
ln := sanitizeLabelName(a.Type) ln := discoveryutils.SanitizeLabelName(a.Type)
m[fmt.Sprintf("__meta_kubernetes_node_address_%s", ln)] = a.Address m[fmt.Sprintf("__meta_kubernetes_node_address_%s", ln)] = a.Address
} }
ms = append(ms, m) ms = append(ms, m)

View file

@ -5,10 +5,12 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
// getPodsLabels returns labels for k8s pods obtained from the given cfg // getPodsLabels returns labels for k8s pods obtained from the given cfg
func getPodsLabels(cfg *APIConfig) ([]map[string]string, error) { func getPodsLabels(cfg *apiConfig) ([]map[string]string, error) {
pods, err := getPods(cfg) pods, err := getPods(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
@ -20,7 +22,7 @@ func getPodsLabels(cfg *APIConfig) ([]map[string]string, error) {
return ms, nil return ms, nil
} }
func getPods(cfg *APIConfig) ([]Pod, error) { func getPods(cfg *apiConfig) ([]Pod, error) {
data, err := getAPIResponse(cfg, "pod", "/api/v1/pods") data, err := getAPIResponse(cfg, "pod", "/api/v1/pods")
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain pods data from API server: %s", err) return nil, fmt.Errorf("cannot obtain pods data from API server: %s", err)
@ -129,7 +131,7 @@ func appendPodLabels(ms []map[string]string, p *Pod, cs []Container, isInit stri
func getPodLabels(p *Pod, c Container, cp *ContainerPort, isInit string) map[string]string { func getPodLabels(p *Pod, c Container, cp *ContainerPort, isInit string) map[string]string {
addr := p.Status.PodIP addr := p.Status.PodIP
if cp != nil { if cp != nil {
addr = joinHostPort(addr, cp.ContainerPort) addr = discoveryutils.JoinHostPort(addr, cp.ContainerPort)
} }
m := map[string]string{ m := map[string]string{
"__address__": addr, "__address__": addr,

View file

@ -3,10 +3,12 @@ package kubernetes
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
) )
// getServicesLabels returns labels for k8s services obtained from the given cfg. // getServicesLabels returns labels for k8s services obtained from the given cfg.
func getServicesLabels(cfg *APIConfig) ([]map[string]string, error) { func getServicesLabels(cfg *apiConfig) ([]map[string]string, error) {
svcs, err := getServices(cfg) svcs, err := getServices(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
@ -18,7 +20,7 @@ func getServicesLabels(cfg *APIConfig) ([]map[string]string, error) {
return ms, nil return ms, nil
} }
func getServices(cfg *APIConfig) ([]Service, error) { func getServices(cfg *apiConfig) ([]Service, error) {
data, err := getAPIResponse(cfg, "service", "/api/v1/services") data, err := getAPIResponse(cfg, "service", "/api/v1/services")
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain services data from API server: %s", err) return nil, fmt.Errorf("cannot obtain services data from API server: %s", err)
@ -79,7 +81,7 @@ func parseServiceList(data []byte) (*ServiceList, error) {
func (s *Service) appendTargetLabels(ms []map[string]string) []map[string]string { func (s *Service) appendTargetLabels(ms []map[string]string) []map[string]string {
host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace) host := fmt.Sprintf("%s.%s.svc", s.Metadata.Name, s.Metadata.Namespace)
for _, sp := range s.Spec.Ports { for _, sp := range s.Spec.Ports {
addr := joinHostPort(host, sp.Port) addr := discoveryutils.JoinHostPort(host, sp.Port)
m := map[string]string{ m := map[string]string{
"__address__": addr, "__address__": addr,
"__meta_kubernetes_service_port_name": sp.Name, "__meta_kubernetes_service_port_name": sp.Name,

View file

@ -0,0 +1,27 @@
package discoveryutils
import (
"net"
"regexp"
"strconv"
)
// SanitizeLabelName replaces anything that doesn't match
// client_label.LabelNameRE with an underscore.
//
// This has been copied from Prometheus sources at util/strutil/strconv.go
func SanitizeLabelName(name string) string {
return invalidLabelCharRE.ReplaceAllString(name, "_")
}
var (
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
)
// JoinHostPort returns host:port.
//
// Host may be dns name, ipv4 or ipv6 address.
func JoinHostPort(host string, port int) string {
portStr := strconv.Itoa(port)
return net.JoinHostPort(host, portStr)
}