lib/promscrape: reduce processing time for big number of discovered targets by processing them in parallel

This commit is contained in:
Aliaksandr Valialkin 2021-02-26 12:46:28 +02:00
parent 438428b5b0
commit f7b242540b

View file

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
@ -608,14 +609,42 @@ func appendGCEScrapeWork(dst []*ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkCo
func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []*ScrapeWork {
startTime := time.Now()
// Process targetLabels in parallel in order to reduce processing time for big number of targetLabels.
type result struct {
sw *ScrapeWork
err error
}
resultCh := make(chan result)
workCh := make(chan map[string]string)
goroutines := cgroup.AvailableCPUs()
for i := 0; i < goroutines; i++ {
go func() {
for metaLabels := range workCh {
target := metaLabels["__address__"]
sw, err := getScrapeWork(swc, target, nil, metaLabels)
if err != nil {
err = fmt.Errorf("skipping target %q for job_name %q in %s because of error: %w", target, swc.jobName, sectionName, err)
}
resultCh <- result{
sw: sw,
err: err,
}
}
}()
}
for _, metaLabels := range targetLabels {
target := metaLabels["__address__"]
var err error
dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels)
if err != nil {
logger.Errorf("error when parsing `%s` target %q for `job_name` %q: %s; skipping it", sectionName, target, swc.jobName, err)
workCh <- metaLabels
}
close(workCh)
for range targetLabels {
r := <-resultCh
if r.err != nil {
logger.Errorf("%s", r.err)
continue
}
if r.sw != nil {
dst = append(dst, r.sw)
}
}
metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_target_relabel_duration_seconds{type=%q}", sectionName)).UpdateDuration(startTime)
return dst
@ -673,18 +702,20 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName)
continue
}
var err error
dst, err = appendScrapeWork(dst, swc, target, stc.Labels, metaLabels)
sw, err := getScrapeWork(swc, target, stc.Labels, metaLabels)
if err != nil {
// Do not return this error, since other targets may be valid
logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err)
continue
}
if sw != nil {
dst = append(dst, sw)
}
}
return dst
}
func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) ([]*ScrapeWork, error) {
func getScrapeWork(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params)
var originalLabels []prompbmarshal.Label
if !*dropOriginalLabels {
@ -703,7 +734,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
if len(labels) == 0 {
// Drop target without labels.
droppedTargetsMap.Register(originalLabels)
return dst, nil
return nil, nil
}
// See https://www.robustperception.io/life-of-a-label
schemeRelabeled := promrelabel.GetLabelValueByName(labels, "__scheme__")
@ -714,12 +745,12 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
if len(addressRelabeled) == 0 {
// Drop target without scrape address.
droppedTargetsMap.Register(originalLabels)
return dst, nil
return nil, nil
}
if strings.Contains(addressRelabeled, "/") {
// Drop target with '/'
droppedTargetsMap.Register(originalLabels)
return dst, nil
return nil, nil
}
addressRelabeled = addMissingPort(schemeRelabeled, addressRelabeled)
metricsPathRelabeled := promrelabel.GetLabelValueByName(labels, "__metrics_path__")
@ -737,7 +768,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
paramsStr := url.Values(paramsRelabeled).Encode()
scrapeURL := fmt.Sprintf("%s://%s%s%s%s", schemeRelabeled, addressRelabeled, metricsPathRelabeled, optionalQuestion, paramsStr)
if _, err := url.Parse(scrapeURL); err != nil {
return dst, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w",
return nil, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w",
scrapeURL, swc.scheme, schemeRelabeled, target, addressRelabeled, swc.metricsPath, metricsPathRelabeled, swc.jobName, err)
}
// Set missing "instance" label according to https://www.robustperception.io/life-of-a-label
@ -750,7 +781,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
}
// Reduce memory usage by interning all the strings in labels.
internLabelStrings(labels)
dst = append(dst, &ScrapeWork{
sw := &ScrapeWork{
ScrapeURL: scrapeURL,
ScrapeInterval: swc.scrapeInterval,
ScrapeTimeout: swc.scrapeTimeout,
@ -768,8 +799,8 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
ScrapeAlignInterval: swc.scrapeAlignInterval,
jobNameOriginal: swc.jobName,
})
return dst, nil
}
return sw, nil
}
func internLabelStrings(labels []prompbmarshal.Label) {