diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 4f97506b6..b0c8eb578 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" @@ -608,14 +609,42 @@ func appendGCEScrapeWork(dst []*ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkCo func appendScrapeWorkForTargetLabels(dst []*ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []*ScrapeWork { startTime := time.Now() + // Process targetLabels in parallel in order to reduce processing time for big number of targetLabels. + type result struct { + sw *ScrapeWork + err error + } + resultCh := make(chan result) + workCh := make(chan map[string]string) + goroutines := cgroup.AvailableCPUs() + for i := 0; i < goroutines; i++ { + go func() { + for metaLabels := range workCh { + target := metaLabels["__address__"] + sw, err := getScrapeWork(swc, target, nil, metaLabels) + if err != nil { + err = fmt.Errorf("skipping target %q for job_name %q in %s because of error: %w", target, swc.jobName, sectionName, err) + } + resultCh <- result{ + sw: sw, + err: err, + } + } + }() + } for _, metaLabels := range targetLabels { - target := metaLabels["__address__"] - var err error - dst, err = appendScrapeWork(dst, swc, target, nil, metaLabels) - if err != nil { - logger.Errorf("error when parsing `%s` target %q for `job_name` %q: %s; skipping it", sectionName, target, swc.jobName, err) + workCh <- metaLabels + } + close(workCh) + for range targetLabels { + r := <-resultCh + if r.err != nil { + logger.Errorf("%s", r.err) continue } + if r.sw != nil { + dst = append(dst, r.sw) + } } metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_target_relabel_duration_seconds{type=%q}", sectionName)).UpdateDuration(startTime) return dst @@ -673,18 +702,20 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf logger.Errorf("`static_configs` target for `job_name` %q cannot be empty; skipping it", swc.jobName) continue } - var err error - dst, err = appendScrapeWork(dst, swc, target, stc.Labels, metaLabels) + sw, err := getScrapeWork(swc, target, stc.Labels, metaLabels) if err != nil { // Do not return this error, since other targets may be valid logger.Errorf("error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it", target, swc.jobName, err) continue } + if sw != nil { + dst = append(dst, sw) + } } return dst } -func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) ([]*ScrapeWork, error) { +func getScrapeWork(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) { labels := mergeLabels(swc.jobName, swc.scheme, target, swc.metricsPath, extraLabels, swc.externalLabels, metaLabels, swc.params) var originalLabels []prompbmarshal.Label if !*dropOriginalLabels { @@ -703,7 +734,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e if len(labels) == 0 { // Drop target without labels. droppedTargetsMap.Register(originalLabels) - return dst, nil + return nil, nil } // See https://www.robustperception.io/life-of-a-label schemeRelabeled := promrelabel.GetLabelValueByName(labels, "__scheme__") @@ -714,12 +745,12 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e if len(addressRelabeled) == 0 { // Drop target without scrape address. droppedTargetsMap.Register(originalLabels) - return dst, nil + return nil, nil } if strings.Contains(addressRelabeled, "/") { // Drop target with '/' droppedTargetsMap.Register(originalLabels) - return dst, nil + return nil, nil } addressRelabeled = addMissingPort(schemeRelabeled, addressRelabeled) metricsPathRelabeled := promrelabel.GetLabelValueByName(labels, "__metrics_path__") @@ -737,7 +768,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e paramsStr := url.Values(paramsRelabeled).Encode() scrapeURL := fmt.Sprintf("%s://%s%s%s%s", schemeRelabeled, addressRelabeled, metricsPathRelabeled, optionalQuestion, paramsStr) if _, err := url.Parse(scrapeURL); err != nil { - return dst, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w", + return nil, fmt.Errorf("invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w", scrapeURL, swc.scheme, schemeRelabeled, target, addressRelabeled, swc.metricsPath, metricsPathRelabeled, swc.jobName, err) } // Set missing "instance" label according to https://www.robustperception.io/life-of-a-label @@ -750,7 +781,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e } // Reduce memory usage by interning all the strings in labels. internLabelStrings(labels) - dst = append(dst, &ScrapeWork{ + sw := &ScrapeWork{ ScrapeURL: scrapeURL, ScrapeInterval: swc.scrapeInterval, ScrapeTimeout: swc.scrapeTimeout, @@ -768,8 +799,8 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e ScrapeAlignInterval: swc.scrapeAlignInterval, jobNameOriginal: swc.jobName, - }) - return dst, nil + } + return sw, nil } func internLabelStrings(labels []prompbmarshal.Label) {