lib/promscrape: initial implementation for gce_sd_configs aga Prometheus-compatible service discovery for Google Compute Engine

This commit is contained in:
Aliaksandr Valialkin 2020-04-24 17:50:21 +03:00
parent cf68c5f66a
commit 069690e3bd
18 changed files with 654 additions and 77 deletions

View file

@ -133,11 +133,12 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
See [these docs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) for details.
* `kubernetes_sd_configs` - for scraping targets in Kubernetes (k8s).
See [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) for details.
* `gce_sd_configs` - for scraping targets in Google Compute Engine (GCE).
See [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) for details.
The following service discovery mechanisms will be added to `vmagent` soon:
* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config)
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)

View file

@ -260,6 +260,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [static_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config)
* [file_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config)
* [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config)
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
In the future other `*_sd_config` types will be supported.

View file

@ -133,11 +133,12 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
See [these docs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) for details.
* `kubernetes_sd_configs` - for scraping targets in Kubernetes (k8s).
See [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) for details.
* `gce_sd_configs` - for scraping targets in Google Compute Engine (GCE).
See [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config) for details.
The following service discovery mechanisms will be added to `vmagent` soon:
* [ec2_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config)
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)

1
go.mod
View file

@ -18,6 +18,7 @@ require (
github.com/valyala/histogram v1.0.1
github.com/valyala/quicktemplate v1.4.1
golang.org/x/net v0.0.0-20200421231249-e086a090c8fd // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f
golang.org/x/tools v0.0.0-20200423205358-59e73619c742 // indirect
google.golang.org/api v0.22.0

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"gopkg.in/yaml.v2"
)
@ -60,6 +61,7 @@ type ScrapeConfig struct {
StaticConfigs []StaticConfig `yaml:"static_configs"`
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"`
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"`
GCESDConfigs []gce.SDConfig `yaml:"gce_sd_configs"`
RelabelConfigs []promrelabel.RelabelConfig `yaml:"relabel_configs"`
MetricRelabelConfigs []promrelabel.RelabelConfig `yaml:"metric_relabel_configs"`
SampleLimit int `yaml:"sample_limit"`
@ -147,6 +149,14 @@ func (cfg *Config) kubernetesSDConfigsCount() int {
return n
}
func (cfg *Config) gceSDConfigsCount() int {
n := 0
for i := range cfg.ScrapeConfigs {
n += len(cfg.ScrapeConfigs[i].GCESDConfigs)
}
return n
}
func (cfg *Config) fileSDConfigsCount() int {
n := 0
for i := range cfg.ScrapeConfigs {
@ -168,6 +178,19 @@ func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork {
return dst
}
// getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getGCESDScrapeWork() []ScrapeWork {
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for j := range sc.GCESDConfigs {
sdc := &sc.GCESDConfigs[j]
dst = appendGCEScrapeWork(dst, sdc, sc.swc)
}
}
return dst
}
// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
// Create a map for the previous scrape work.
@ -297,13 +320,25 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base
logger.Errorf("error when discovering kubernetes nodes for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config")
}
func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
targetLabels, err := gce.GetLabels(sdc)
if err != nil {
logger.Errorf("error when discovering gce nodes for `job_name` %q: %s; skippint it", swc.jobName, err)
return dst
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config")
}
func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []ScrapeWork {
for _, metaLabels := range targetLabels {
target := metaLabels["__address__"]
var err error
dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels)
if err != nil {
logger.Errorf("error when parsing `kubernetes_sd_config` target %q with role %q for `job_name` %q: %s; skipping it",
target, sdc.Role, swc.jobName, err)
logger.Errorf("error when parsing `%s` target %q for `job_name` %q: %s; skipping it", sectionName, target, swc.jobName, err)
continue
}
}

View file

@ -0,0 +1,97 @@
package gce
import (
"context"
"fmt"
"net/http"
"net/url"
"sync"
"time"
"golang.org/x/oauth2/google"
)
type apiConfig struct {
client *http.Client
apiURL string
project string
tagSeparator string
port int
}
func getAPIConfig(sdc *SDConfig) (*apiConfig, error) {
apiConfigMapLock.Lock()
defer apiConfigMapLock.Unlock()
if !hasAPIConfigMapCleaner {
hasAPIConfigMapCleaner = true
go apiConfigMapCleaner()
}
e := apiConfigMap[sdc]
if e != nil {
e.lastAccessTime = time.Now()
return e.cfg, nil
}
cfg, err := newAPIConfig(sdc)
if err != nil {
return nil, err
}
apiConfigMap[sdc] = &apiConfigMapEntry{
cfg: cfg,
lastAccessTime: time.Now(),
}
return cfg, nil
}
func apiConfigMapCleaner() {
tc := time.NewTicker(15 * time.Minute)
for currentTime := range tc.C {
apiConfigMapLock.Lock()
for k, e := range apiConfigMap {
if currentTime.Sub(e.lastAccessTime) > 10*time.Minute {
delete(apiConfigMap, k)
}
}
apiConfigMapLock.Unlock()
}
}
type apiConfigMapEntry struct {
cfg *apiConfig
lastAccessTime time.Time
}
var (
apiConfigMap = make(map[*SDConfig]*apiConfigMapEntry)
apiConfigMapLock sync.Mutex
hasAPIConfigMapCleaner bool
)
func newAPIConfig(sdc *SDConfig) (*apiConfig, error) {
ctx := context.Background()
client, err := google.DefaultClient(ctx, "https://www.googleapis.com/auth/compute.readonly")
if err != nil {
return nil, fmt.Errorf("cannot create oauth2 client for gce: %s", err)
}
// See https://cloud.google.com/compute/docs/reference/rest/v1/instances/list
apiURL := fmt.Sprintf("https://compute.googleapis.com/compute/v1/projects/%s/zones/%s/instances", sdc.Project, sdc.Zone)
if len(sdc.Filter) > 0 {
apiURL += fmt.Sprintf("?filter=%s", url.QueryEscape(sdc.Filter))
}
tagSeparator := ","
if sdc.TagSeparator != nil {
tagSeparator = *sdc.TagSeparator
}
port := 80
if sdc.Port != nil {
port = *sdc.Port
}
return &apiConfig{
client: client,
apiURL: apiURL,
project: sdc.Project,
tagSeparator: tagSeparator,
port: port,
}, nil
}

View file

@ -0,0 +1,31 @@
package gce
import (
"fmt"
)
// SDConfig represents service discovery config for gce.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config
type SDConfig struct {
Project string `yaml:"project"`
Zone string `yaml:"zone"`
Filter string `yaml:"filter"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.gceSDCheckInterval` command-line option.
Port *int `yaml:"port"`
TagSeparator *string `yaml:"tag_separator"`
}
// GetLabels returns gce labels according to sdc.
func GetLabels(sdc *SDConfig) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc)
if err != nil {
return nil, fmt.Errorf("cannot get API config: %s", err)
}
ms, err := getInstancesLabels(cfg)
if err != nil {
return nil, fmt.Errorf("error when fetching instances data from GCE: %s", err)
}
return ms, nil
}

View file

@ -0,0 +1,170 @@
package gce
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// getInstancesLabels returns labels for gce instances obtained from the given cfg
func getInstancesLabels(cfg *apiConfig) ([]map[string]string, error) {
insts, err := getInstances(cfg)
if err != nil {
return nil, err
}
var ms []map[string]string
for _, inst := range insts {
ms = inst.appendTargetLabels(ms, cfg.project, cfg.tagSeparator, cfg.port)
}
return ms, nil
}
func getInstances(cfg *apiConfig) ([]Instance, error) {
var result []Instance
pageToken := ""
for {
insts, nextPageToken, err := getInstancesPage(cfg, pageToken)
if err != nil {
return nil, err
}
result = append(result, insts...)
if len(nextPageToken) == 0 {
return result, nil
}
pageToken = nextPageToken
}
}
func getInstancesPage(cfg *apiConfig, pageToken string) ([]Instance, string, error) {
apiURL := cfg.apiURL
if len(pageToken) > 0 {
// See https://cloud.google.com/compute/docs/reference/rest/v1/instances/list about pageToken
prefix := "?"
if strings.Contains(apiURL, "?") {
prefix = "&"
}
apiURL += fmt.Sprintf("%spageToken=%s", prefix, url.QueryEscape(pageToken))
}
resp, err := cfg.client.Get(apiURL)
if err != nil {
return nil, "", fmt.Errorf("cannot obtain instances data from API server: %s", err)
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, "", fmt.Errorf("cannot read instances data from API server: %s", err)
}
if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unexpected status code when reading instances data from API server; got %d; want %d; response body: %q",
resp.StatusCode, http.StatusOK, data)
}
il, err := parseInstanceList(data)
if err != nil {
return nil, "", fmt.Errorf("cannot parse instances response from API server: %s", err)
}
return il.Items, il.NextPageToken, nil
}
// InstanceList is response to https://cloud.google.com/compute/docs/reference/rest/v1/instances/list
type InstanceList struct {
Items []Instance
NextPageToken string
}
// Instance is instance from https://cloud.google.com/compute/docs/reference/rest/v1/instances/list
type Instance struct {
ID string `json:"id"`
Name string
Status string
MachineType string
Zone string
NetworkInterfaces []NetworkInterface
Tags TagList
Metadata MetadataList
Labels discoveryutils.SortedLabels
}
// NetworkInterface is network interface from https://cloud.google.com/compute/docs/reference/rest/v1/instances/list
type NetworkInterface struct {
Network string
Subnetwork string
NetworkIP string
AccessConfigs []AccessConfig
}
// AccessConfig is access config from https://cloud.google.com/compute/docs/reference/rest/v1/instances/list
type AccessConfig struct {
Type string
NatIP string
}
// TagList is tag list from https://cloud.google.com/compute/docs/reference/rest/v1/instances/list
type TagList struct {
Items []string
}
// MetadataList is metadataList from https://cloud.google.com/compute/docs/reference/rest/v1/instances/list
type MetadataList struct {
Items []MetadataEntry
}
// MetadataEntry is a single entry from metadata
type MetadataEntry struct {
Key string
Value string
}
// parseInstanceList parses InstanceList from data.
func parseInstanceList(data []byte) (*InstanceList, error) {
var il InstanceList
if err := json.Unmarshal(data, &il); err != nil {
return nil, fmt.Errorf("cannot unmarshal InstanceList from %q: %s", data, err)
}
return &il, nil
}
func (inst *Instance) appendTargetLabels(ms []map[string]string, project, tagSeparator string, port int) []map[string]string {
if len(inst.NetworkInterfaces) == 0 {
return ms
}
iface := inst.NetworkInterfaces[0]
addr := discoveryutils.JoinHostPort(iface.NetworkIP, port)
m := map[string]string{
"__address__": addr,
"__meta_gce_instance_id": inst.ID,
"__meta_gce_instance_status": inst.Status,
"__meta_gce_instance_name": inst.Name,
"__meta_gce_machine_type": inst.MachineType,
"__meta_gce_network": iface.Network,
"__meta_gce_private_ip": iface.NetworkIP,
"__meta_gce_project": project,
"__meta_gce_subnetwork": iface.Subnetwork,
"__meta_gce_zone": inst.Zone,
}
if len(inst.Tags.Items) > 0 {
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
m["__meta_gce_tags"] = tagSeparator + strings.Join(inst.Tags.Items, tagSeparator) + tagSeparator
}
for _, item := range inst.Metadata.Items {
key := discoveryutils.SanitizeLabelName(item.Key)
m["__meta_gce_metadata_"+key] = item.Value
}
for _, label := range inst.Labels {
name := discoveryutils.SanitizeLabelName(label.Name)
m["__meta_gce_label_"+name] = label.Value
}
if len(iface.AccessConfigs) > 0 {
ac := iface.AccessConfigs[0]
if ac.Type == "ONE_TO_ONE_NAT" {
m["__meta_gce_public_ip"] = ac.NatIP
}
}
ms = append(ms, m)
return ms
}

View file

@ -0,0 +1,179 @@
package gce
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParseInstanceListFailure(t *testing.T) {
f := func(s string) {
t.Helper()
il, err := parseInstanceList([]byte(s))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if il != nil {
t.Fatalf("unexpected non-nil InstanceList: %v", il)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
}
func TestParseInstanceListSuccess(t *testing.T) {
data := `
{
"id": "projects/victoriametrics-test/zones/us-east1-b/instances",
"items": [
{
"id": "7897352091592122",
"creationTimestamp": "2020-02-16T07:10:14.357-08:00",
"name": "play-1m-1-vmagent",
"tags": {
"items": [
"play",
"play-1m-1",
"vmagent"
],
"fingerprint": "O44NvJ36CCo="
},
"machineType": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/zones/us-east1-b/machineTypes/f1-micro",
"status": "RUNNING",
"zone": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/zones/us-east1-b",
"networkInterfaces": [
{
"network": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/global/networks/default",
"subnetwork": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/regions/us-east1/subnetworks/play-1m-1-snw",
"networkIP": "10.11.2.7",
"name": "nic0",
"fingerprint": "O4eNOfaplJ4=",
"kind": "compute#networkInterface"
}
],
"disks": [
{
"type": "PERSISTENT",
"mode": "READ_WRITE",
"source": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/zones/us-east1-b/disks/play-1m-1-vmagent",
"deviceName": "boot",
"index": 0,
"boot": true,
"autoDelete": true,
"licenses": [
"https://www.googleapis.com/compute/v1/projects/cos-cloud-shielded/global/licenses/shielded-cos",
"https://www.googleapis.com/compute/v1/projects/cos-cloud/global/licenses/cos",
"https://www.googleapis.com/compute/v1/projects/cos-cloud/global/licenses/cos-pcid"
],
"interface": "SCSI",
"guestOsFeatures": [
{
"type": "VIRTIO_SCSI_MULTIQUEUE"
},
{
"type": "UEFI_COMPATIBLE"
}
],
"diskSizeGb": "10",
"kind": "compute#attachedDisk"
}
],
"metadata": {
"fingerprint": "BAFZwTyaAxQ=",
"items": [
{
"key": "gce-container-declaration",
"value": "foobar"
}
],
"kind": "compute#metadata"
},
"serviceAccounts": [
{
"email": "12-compute@developer.gserviceaccount.com",
"scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/logging.write",
"https://www.googleapis.com/auth/monitoring.write",
"https://www.googleapis.com/auth/servicecontrol",
"https://www.googleapis.com/auth/service.management.readonly"
]
}
],
"selfLink": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/zones/us-east1-b/instances/play-1m-1-vmagent",
"scheduling": {
"onHostMaintenance": "MIGRATE",
"automaticRestart": true,
"preemptible": false
},
"cpuPlatform": "Intel Haswell",
"labels": {
"goog-dm": "play-deployment",
"cluster_num": "1",
"cluster_retention": "1m",
"env": "play",
"type": "vmagent"
},
"labelFingerprint": "-CXeRXMQiVc=",
"startRestricted": false,
"deletionProtection": false,
"shieldedInstanceConfig": {
"enableSecureBoot": false,
"enableVtpm": true,
"enableIntegrityMonitoring": true
},
"shieldedInstanceIntegrityPolicy": {
"updateAutoLearnPolicy": true
},
"fingerprint": "hd3NB2-9QIg=",
"kind": "compute#instance"
}
]
}
`
il, err := parseInstanceList([]byte(data))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(il.Items) != 1 {
t.Fatalf("unexpected length of InstanceList.Items; got %d; want %d", len(il.Items), 1)
}
inst := il.Items[0]
// Check inst.appendTargetLabels()
project := "proj-1"
tagSeparator := ","
port := 80
labelss := inst.appendTargetLabels(nil, project, tagSeparator, port)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "10.11.2.7:80",
"__meta_gce_instance_id": "7897352091592122",
"__meta_gce_instance_name": "play-1m-1-vmagent",
"__meta_gce_instance_status": "RUNNING",
"__meta_gce_label_cluster_num": "1",
"__meta_gce_label_cluster_retention": "1m",
"__meta_gce_label_env": "play",
"__meta_gce_label_goog_dm": "play-deployment",
"__meta_gce_label_type": "vmagent",
"__meta_gce_machine_type": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/zones/us-east1-b/machineTypes/f1-micro",
"__meta_gce_metadata_gce_container_declaration": "foobar",
"__meta_gce_network": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/global/networks/default",
"__meta_gce_private_ip": "10.11.2.7",
"__meta_gce_project": "proj-1",
"__meta_gce_subnetwork": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/regions/us-east1/subnetworks/play-1m-1-snw",
"__meta_gce_tags": ",play,play-1m-1,vmagent,",
"__meta_gce_zone": "https://www.googleapis.com/compute/v1/projects/victoriametrics-test/zones/us-east1-b",
}),
}
if !reflect.DeepEqual(sortedLabelss, expectedLabelss) {
t.Fatalf("unexpected labels:\ngot\n%v\nwant\n%v", sortedLabelss, expectedLabelss)
}
}

View file

@ -1,13 +1,10 @@
package kubernetes
import (
"encoding/json"
"fmt"
"net/url"
"sort"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
@ -18,8 +15,8 @@ type ObjectMeta struct {
Name string
Namespace string
UID string
Labels SortedLabels
Annotations SortedLabels
Labels discoveryutils.SortedLabels
Annotations discoveryutils.SortedLabels
OwnerReferences []OwnerReference
}
@ -36,33 +33,6 @@ func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m map[string]s
}
}
// SortedLabels represents sorted labels.
type SortedLabels []prompbmarshal.Label
// UnmarshalJSON unmarshals JSON from data.
func (sls *SortedLabels) UnmarshalJSON(data []byte) error {
var m map[string]string
if err := json.Unmarshal(data, &m); err != nil {
return err
}
*sls = getSortedLabels(m)
return nil
}
func getSortedLabels(m map[string]string) SortedLabels {
a := make([]prompbmarshal.Label, 0, len(m))
for k, v := range m {
a = append(a, prompbmarshal.Label{
Name: k,
Value: v,
})
}
sort.Slice(a, func(i, j int) bool {
return a[i].Name < a[j].Name
})
return a
}
// OwnerReference represents OwnerReferense from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ownerreference-v1-meta

View file

@ -5,6 +5,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParseEndpointsListFailure(t *testing.T) {
@ -85,10 +86,10 @@ func TestParseEndpointsListSuccess(t *testing.T) {
labelss := endpoint.appendTargetLabels(nil, nil, nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.17.0.2:8443",
"__meta_kubernetes_endpoint_address_target_kind": "Pod",
"__meta_kubernetes_endpoint_address_target_name": "coredns-6955765f44-lnp6t",

View file

@ -5,6 +5,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParseIngressListFailure(t *testing.T) {
@ -83,10 +84,10 @@ func TestParseIngressListSuccess(t *testing.T) {
labelss := ig.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "foobar",
"__meta_kubernetes_ingress_annotation_kubectl_kubernetes_io_last_applied_configuration": `{"apiVersion":"networking.k8s.io/v1beta1","kind":"Ingress","metadata":{"annotations":{},"name":"test-ingress","namespace":"default"},"spec":{"backend":{"serviceName":"testsvc","servicePort":80}}}` + "\n",
"__meta_kubernetes_ingress_annotationpresent_kubectl_kubernetes_io_last_applied_configuration": "true",

View file

@ -6,30 +6,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
// GetLabels returns labels for the given k8s role and the given cfg.
func GetLabels(ac *promauth.Config, sdc *SDConfig) ([]map[string]string, error) {
cfg := &apiConfig{
Server: sdc.APIServer,
AuthConfig: ac,
Namespaces: sdc.Namespaces.Names,
Selectors: sdc.Selectors,
}
switch sdc.Role {
case "node":
return getNodesLabels(cfg)
case "service":
return getServicesLabels(cfg)
case "pod":
return getPodsLabels(cfg)
case "endpoints":
return getEndpointsLabels(cfg)
case "ingress":
return getIngressesLabels(cfg)
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `service`, `pod`, `endpoints` or `ingress`; skipping it", sdc.Role)
}
}
// SDConfig represents kubernetes-based service discovery config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
@ -58,3 +34,27 @@ type Selector struct {
Label string `yaml:"label"`
Field string `yaml:"field"`
}
// GetLabels returns labels for the given k8s role and the given ac and sdc.
func GetLabels(ac *promauth.Config, sdc *SDConfig) ([]map[string]string, error) {
cfg := &apiConfig{
Server: sdc.APIServer,
AuthConfig: ac,
Namespaces: sdc.Namespaces.Names,
Selectors: sdc.Selectors,
}
switch sdc.Role {
case "node":
return getNodesLabels(cfg)
case "service":
return getServicesLabels(cfg)
case "pod":
return getPodsLabels(cfg)
case "endpoints":
return getEndpointsLabels(cfg)
case "ingress":
return getIngressesLabels(cfg)
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `node`, `service`, `pod`, `endpoints` or `ingress`; skipping it", sdc.Role)
}
}

View file

@ -3,6 +3,8 @@ package kubernetes
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParseNodeListFailure(t *testing.T) {
@ -236,7 +238,7 @@ func TestParseNodeListSuccess(t *testing.T) {
if meta.Name != "m01" {
t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "m01")
}
expectedLabels := getSortedLabels(map[string]string{
expectedLabels := discoveryutils.GetSortedLabels(map[string]string{
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux",
"kubernetes.io/arch": "amd64",
@ -251,7 +253,7 @@ func TestParseNodeListSuccess(t *testing.T) {
if !reflect.DeepEqual(meta.Labels, expectedLabels) {
t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels)
}
expectedAnnotations := getSortedLabels(map[string]string{
expectedAnnotations := discoveryutils.GetSortedLabels(map[string]string{
"kubeadm.alpha.kubernetes.io/cri-socket": "/var/run/dockershim.sock",
"node.alpha.kubernetes.io/ttl": "0",
"volumes.kubernetes.io/controller-managed-attach-detach": "true",
@ -275,8 +277,8 @@ func TestParseNodeListSuccess(t *testing.T) {
}
// Check node.appendTargetLabels()
labels := getSortedLabels(node.appendTargetLabels(nil)[0])
expectedLabels = getSortedLabels(map[string]string{
labels := discoveryutils.GetSortedLabels(node.appendTargetLabels(nil)[0])
expectedLabels = discoveryutils.GetSortedLabels(map[string]string{
"instance": "m01",
"__address__": "172.17.0.2:10250",
"__meta_kubernetes_node_name": "m01",

View file

@ -5,6 +5,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParsePodListFailure(t *testing.T) {
@ -240,10 +241,10 @@ func TestParsePodListSuccess(t *testing.T) {
labelss := pod.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabels := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "172.17.0.2:1234",
"__meta_kubernetes_namespace": "kube-system",

View file

@ -5,6 +5,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func TestParseServiceListFailure(t *testing.T) {
@ -100,7 +101,7 @@ func TestParseServiceListSuccess(t *testing.T) {
if meta.Name != "kube-dns" {
t.Fatalf("unexpected ObjectMeta.Name; got %q; want %q", meta.Name, "kube-dns")
}
expectedLabels := getSortedLabels(map[string]string{
expectedLabels := discoveryutils.GetSortedLabels(map[string]string{
"k8s-app": "kube-dns",
"kubernetes.io/cluster-service": "true",
"kubernetes.io/name": "KubeDNS",
@ -108,7 +109,7 @@ func TestParseServiceListSuccess(t *testing.T) {
if !reflect.DeepEqual(meta.Labels, expectedLabels) {
t.Fatalf("unexpected ObjectMeta.Labels\ngot\n%v\nwant\n%v", meta.Labels, expectedLabels)
}
expectedAnnotations := getSortedLabels(map[string]string{
expectedAnnotations := discoveryutils.GetSortedLabels(map[string]string{
"prometheus.io/port": "9153",
"prometheus.io/scrape": "true",
})
@ -148,10 +149,10 @@ func TestParseServiceListSuccess(t *testing.T) {
labelss := service.appendTargetLabels(nil)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range labelss {
sortedLabelss = append(sortedLabelss, getSortedLabels(labels))
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
expectedLabelss := [][]prompbmarshal.Label{
getSortedLabels(map[string]string{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:53",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_service_name": "kube-dns",
@ -174,7 +175,7 @@ func TestParseServiceListSuccess(t *testing.T) {
"__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true",
}),
getSortedLabels(map[string]string{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:53",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_service_name": "kube-dns",
@ -197,7 +198,7 @@ func TestParseServiceListSuccess(t *testing.T) {
"__meta_kubernetes_service_annotationpresent_prometheus_io_port": "true",
"__meta_kubernetes_service_annotationpresent_prometheus_io_scrape": "true",
}),
getSortedLabels(map[string]string{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "kube-dns.kube-system.svc:9153",
"__meta_kubernetes_namespace": "kube-system",
"__meta_kubernetes_service_name": "kube-dns",

View file

@ -1,9 +1,13 @@
package discoveryutils
import (
"encoding/json"
"net"
"regexp"
"sort"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// SanitizeLabelName replaces anything that doesn't match
@ -25,3 +29,31 @@ func JoinHostPort(host string, port int) string {
portStr := strconv.Itoa(port)
return net.JoinHostPort(host, portStr)
}
// SortedLabels represents sorted labels.
type SortedLabels []prompbmarshal.Label
// UnmarshalJSON unmarshals JSON from data.
func (sls *SortedLabels) UnmarshalJSON(data []byte) error {
var m map[string]string
if err := json.Unmarshal(data, &m); err != nil {
return err
}
*sls = GetSortedLabels(m)
return nil
}
// GetSortedLabels returns SortedLabels built from m.
func GetSortedLabels(m map[string]string) SortedLabels {
a := make([]prompbmarshal.Label, 0, len(m))
for k, v := range m {
a = append(a, prompbmarshal.Label{
Name: k,
Value: v,
})
}
sort.Slice(a, func(i, j int) bool {
return a[i].Name < a[j].Name
})
return a
}

View file

@ -18,10 +18,13 @@ var (
configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in '-promscrape.config' file. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes")
fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", 30*time.Second, "Interval for checking for changes in 'file_sd_config'. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config")
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config for details")
kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details")
gceSDCheckInterval = flag.Duration("promscrape.gceSDCheckInterval", time.Minute, "Interval for checking for changes in gce. "+
"This works only if `gce_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details")
promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config for details")
)
@ -89,6 +92,11 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
defer wg.Done()
runKubernetesSDScrapers(cfg, pushData, stopCh)
}()
wg.Add(1)
go func() {
defer wg.Done()
runGCESDScrapers(cfg, pushData, stopCh)
}()
waitForChans:
select {
@ -194,6 +202,51 @@ var (
kubernetesSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="kubernetes_sd"}`)
)
func runGCESDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) {
if cfg.gceSDConfigsCount() == 0 {
return
}
sws := cfg.getGCESDScrapeWork()
ticker := time.NewTicker(*gceSDCheckInterval)
defer ticker.Stop()
mustStop := false
for !mustStop {
localStopCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func(sws []ScrapeWork) {
defer wg.Done()
logger.Infof("starting %d scrapers for `gce_sd_config` targets", len(sws))
gceSDTargets.Set(uint64(len(sws)))
runScrapeWorkers(sws, pushData, localStopCh)
gceSDTargets.Set(0)
logger.Infof("stopped all the %d scrapers for `gce_sd_config` targets", len(sws))
}(sws)
waitForChans:
select {
case <-ticker.C:
swsNew := cfg.getGCESDScrapeWork()
if equalStaticConfigForScrapeWorks(swsNew, sws) {
// Nothing changed, continue waiting for updated scrape work
goto waitForChans
}
logger.Infof("restarting scrapers for changed `gce_sd_config` targets")
sws = swsNew
case <-stopCh:
mustStop = true
}
close(localStopCh)
wg.Wait()
gceSDReloads.Inc()
}
}
var (
gceSDTargets = metrics.NewCounter(`vm_promscrape_targets{type="gce_sd"}`)
gceSDReloads = metrics.NewCounter(`vm_promscrape_reloads_total{type="gce_sd"}`)
)
func runFileSDScrapers(cfg *Config, pushData func(wr *prompbmarshal.WriteRequest), stopCh <-chan struct{}) {
if cfg.fileSDConfigsCount() == 0 {
return