lib/promscrape: preserve the previously discovered targets on discovery errors per each job_name

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/582
This commit is contained in:
Aliaksandr Valialkin 2020-06-23 15:35:19 +03:00
parent a13cd60c6f
commit 8f0bcec6cc
4 changed files with 148 additions and 43 deletions

View file

@ -158,77 +158,160 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error {
return err
}
func getSWSByJob(sws []ScrapeWork) map[string][]ScrapeWork {
m := make(map[string][]ScrapeWork)
for _, sw := range sws {
m[sw.jobNameOriginal] = append(m[sw.jobNameOriginal], sw)
}
return m
}
// getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork {
func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.KubernetesSDConfigs {
sdc := &sc.KubernetesSDConfigs[j]
dst = appendKubernetesScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
var okLocal bool
dst, okLocal = appendKubernetesScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering kubernetes targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulSDScrapeWork() []ScrapeWork {
func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.ConsulSDConfigs {
sdc := &sc.ConsulSDConfigs[j]
dst = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
var okLocal bool
dst, okLocal = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering consul targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDNSSDScrapeWork() []ScrapeWork {
func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.DNSSDConfigs {
sdc := &sc.DNSSDConfigs[j]
dst = appendDNSScrapeWork(dst, sdc, sc.swc)
var okLocal bool
dst, okLocal = appendDNSScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering dns targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getEC2SDScrapeWork() []ScrapeWork {
func (cfg *Config) getEC2SDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.EC2SDConfigs {
sdc := &sc.EC2SDConfigs[j]
dst = appendEC2ScrapeWork(dst, sdc, sc.swc)
var okLocal bool
dst, okLocal = appendEC2ScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering ec2 targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getGCESDScrapeWork() []ScrapeWork {
func (cfg *Config) getGCESDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.GCESDConfigs {
sdc := &sc.GCESDConfigs[j]
dst = appendGCEScrapeWork(dst, sdc, sc.swc)
var okLocal bool
dst, okLocal = appendGCEScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering gce targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getFileSDScrapeWork(swsPrev []ScrapeWork) []ScrapeWork {
func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
// Create a map for the previous scrape work.
swsMapPrev := make(map[string][]ScrapeWork)
for i := range swsPrev {
sw := &swsPrev[i]
for i := range prev {
sw := &prev[i]
filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath")
if len(filepath) == 0 {
logger.Panicf("BUG: missing `__vm_filepath` label")
@ -341,49 +424,49 @@ type scrapeWorkConfig struct {
sampleLimit int
}
func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := kubernetes.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering kubernetes targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config"), true
}
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := consul.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering consul targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config"), true
}
func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := dns.GetLabels(sdc)
if err != nil {
logger.Errorf("error when discovering dns targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config"), true
}
func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := ec2.GetLabels(sdc)
if err != nil {
logger.Errorf("error when discovering ec2 targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config"), true
}
func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := gce.GetLabels(sdc)
if err != nil {
logger.Errorf("error when discovering gce targets for `job_name` %q: %s; skippint it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config"), true
}
func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []ScrapeWork {
@ -519,6 +602,8 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex
AuthConfig: swc.authConfig,
MetricRelabelConfigs: swc.metricRelabelConfigs,
SampleLimit: swc.sampleLimit,
jobNameOriginal: swc.jobName,
})
return dst, nil
}

View file

@ -451,7 +451,8 @@ scrape_configs:
Value: "rty",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
{
ScrapeURL: "http://host2:80/abc/de",
@ -489,7 +490,8 @@ scrape_configs:
Value: "rty",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
{
ScrapeURL: "http://localhost:9090/abc/de",
@ -527,7 +529,8 @@ scrape_configs:
Value: "test",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
})
}
@ -579,7 +582,8 @@ scrape_configs:
Value: "foo",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
})
f(`
@ -628,7 +632,8 @@ scrape_configs:
Value: "xxx",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
})
f(`
@ -700,6 +705,7 @@ scrape_configs:
AuthConfig: &promauth.Config{
Authorization: "Bearer xyz",
},
jobNameOriginal: "foo",
},
{
ScrapeURL: "https://aaa:443/foo/bar?p=x%26y&p=%3D",
@ -740,6 +746,7 @@ scrape_configs:
AuthConfig: &promauth.Config{
Authorization: "Bearer xyz",
},
jobNameOriginal: "foo",
},
{
ScrapeURL: "http://1.2.3.4:80/metrics",
@ -774,6 +781,7 @@ scrape_configs:
TLSServerName: "foobar",
TLSInsecureSkipVerify: true,
},
jobNameOriginal: "qwer",
},
})
f(`
@ -846,7 +854,8 @@ scrape_configs:
Value: "http://foo.bar:1234/metrics",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
})
f(`
@ -907,7 +916,8 @@ scrape_configs:
Value: "https",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
})
f(`
@ -949,7 +959,8 @@ scrape_configs:
Value: "3",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",
},
})
@ -997,6 +1008,7 @@ scrape_configs:
},
AuthConfig: &promauth.Config{},
MetricRelabelConfigs: prcs,
jobNameOriginal: "foo",
},
})
f(`
@ -1037,6 +1049,7 @@ scrape_configs:
AuthConfig: &promauth.Config{
Authorization: "Basic eHl6OnNlY3JldC1wYXNz",
},
jobNameOriginal: "foo",
},
})
f(`
@ -1075,6 +1088,7 @@ scrape_configs:
AuthConfig: &promauth.Config{
Authorization: "Bearer secret-pass",
},
jobNameOriginal: "foo",
},
})
snakeoilCert, err := tls.LoadX509KeyPair("testdata/ssl-cert-snakeoil.pem", "testdata/ssl-cert-snakeoil.key")
@ -1119,6 +1133,7 @@ scrape_configs:
AuthConfig: &promauth.Config{
TLSCertificate: &snakeoilCert,
},
jobNameOriginal: "foo",
},
})
f(`
@ -1179,7 +1194,8 @@ scrape_configs:
Value: "qwe",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "aaa",
},
})
f(`
@ -1233,7 +1249,8 @@ scrape_configs:
Value: "snmp",
},
},
AuthConfig: &promauth.Config{},
AuthConfig: &promauth.Config{},
jobNameOriginal: "snmp",
},
})
}

View file

@ -84,11 +84,11 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
scs := newScrapeConfigs(pushData)
scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() })
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork() })
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork() })
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork() })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork() })
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork() })
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
scs.add("gce_sd_configs", *gceSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
sighupCh := procutil.NewSighupChan()

View file

@ -66,6 +66,9 @@ type ScrapeWork struct {
// The maximum number of metrics to scrape after relabeling.
SampleLimit int
// The original 'job_name'
jobNameOriginal string
}
// key returns unique identifier for the given sw.