From 3d0549c98298d6057359893f7383d0f0c52a3838 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Wed, 20 Apr 2022 15:25:41 +0300
Subject: [PATCH] lib/promscrape: optimize getScrapeWork() function

Reduce the number of memory allocations in this function. This improves its performance by up to 50%.
This should improve service discovery speed when big number of potential targets with big number of meta-labels
are generated by service discovery.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2270
---
 docs/CHANGELOG.md             |   3 +-
 lib/promrelabel/sort.go       |  11 ++++
 lib/promscrape/config.go      | 113 ++++++++++++++++++++++++++--------
 lib/promscrape/config_test.go |  41 ++++++++++++
 4 files changed, 141 insertions(+), 27 deletions(-)

diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 4ea1f37ad..679b1cdc9 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -15,8 +15,9 @@ The following tip changes can be tested by building VictoriaMetrics components f
 
 ## tip
 
-* FEATUREL [vmagent](https://docs.victoriametrics.com/vmagent.html): allow filtering targets by target url and by target labels with [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) on `http://vmagent:8429/targets` page. This may be useful when `vmagent` scrapes big number of targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1796).
+* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow filtering targets by target url and by target labels with [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) on `http://vmagent:8429/targets` page. This may be useful when `vmagent` scrapes big number of targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1796).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce `-promscrape.config` reload duration when the config contains big number of jobs (aka [scrape_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) sections) and only a few of them are changed. Previously all the jobs were restarted. Now only the jobs with changed configs are restarted. This should reduce the probability of data miss because of slow config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2270).
+* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve service discovery speed for big number of scrape targets. This should help when `vmagent` discovers big number of targets (e.g. thousands) in Kubernetes cluster.
 * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add support for DNS-based discovery for notifiers in the same way as Prometheus does. See [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2460).
 * FEATURE: allow specifying TLS cipher suites for incoming https requests via `-tlsCipherSuites` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2404).
 * FEATURE: allow specifying TLS cipher suites for mTLS connections between cluster components via `-cluster.tlsCipherSuites` command-line flag. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection).
diff --git a/lib/promrelabel/sort.go b/lib/promrelabel/sort.go
index cfe5e03e3..617d75279 100644
--- a/lib/promrelabel/sort.go
+++ b/lib/promrelabel/sort.go
@@ -18,6 +18,17 @@ func SortLabels(labels []prompbmarshal.Label) {
 	labelsSorterPool.Put(ls)
 }
 
+// SortLabelsStable sorts labels using stable sort.
+func SortLabelsStable(labels []prompbmarshal.Label) {
+	ls := labelsSorterPool.Get().(*labelsSorter)
+	*ls = labels
+	if !sort.IsSorted(ls) {
+		sort.Stable(ls)
+	}
+	*ls = nil
+	labelsSorterPool.Put(ls)
+}
+
 var labelsSorterPool = &sync.Pool{
 	New: func() interface{} {
 		return &labelsSorter{}
diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go
index 3e5b2f38a..40d2b8f21 100644
--- a/lib/promscrape/config.go
+++ b/lib/promscrape/config.go
@@ -833,7 +833,9 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
 	}
 	swc := &scrapeWorkConfig{
 		scrapeInterval:       scrapeInterval,
+		scrapeIntervalString: scrapeInterval.String(),
 		scrapeTimeout:        scrapeTimeout,
+		scrapeTimeoutString:  scrapeTimeout.String(),
 		jobName:              jobName,
 		metricsPath:          metricsPath,
 		scheme:               scheme,
@@ -860,7 +862,9 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
 
 type scrapeWorkConfig struct {
 	scrapeInterval       time.Duration
+	scrapeIntervalString string
 	scrapeTimeout        time.Duration
+	scrapeTimeoutString  string
 	jobName              string
 	metricsPath          string
 	scheme               string
@@ -1036,20 +1040,46 @@ func needSkipScrapeWork(key string, membersCount, replicasCount, memberNum int)
 	return true
 }
 
+type labelsContext struct {
+	labels []prompbmarshal.Label
+}
+
+func getLabelsContext() *labelsContext {
+	v := labelsContextPool.Get()
+	if v == nil {
+		return &labelsContext{}
+	}
+	return v.(*labelsContext)
+}
+
+func putLabelsContext(lctx *labelsContext) {
+	labels := lctx.labels
+	for i := range labels {
+		labels[i].Name = ""
+		labels[i].Value = ""
+	}
+	lctx.labels = lctx.labels[:0]
+	labelsContextPool.Put(lctx)
+}
+
+var labelsContextPool sync.Pool
+
 var scrapeWorkKeyBufPool bytesutil.ByteBufferPool
 
 func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
-	labels := mergeLabels(swc, target, extraLabels, metaLabels)
+	lctx := getLabelsContext()
+	lctx.labels = mergeLabels(lctx.labels[:0], swc, target, extraLabels, metaLabels)
 	var originalLabels []prompbmarshal.Label
 	if !*dropOriginalLabels {
-		originalLabels = append([]prompbmarshal.Label{}, labels...)
+		originalLabels = append([]prompbmarshal.Label{}, lctx.labels...)
 	}
-	labels = swc.relabelConfigs.Apply(labels, 0, false)
-	labels = promrelabel.RemoveMetaLabels(labels[:0], labels)
+	lctx.labels = swc.relabelConfigs.Apply(lctx.labels, 0, false)
+	lctx.labels = promrelabel.RemoveMetaLabels(lctx.labels[:0], lctx.labels)
 	// Remove references to already deleted labels, so GC could clean strings for label name and label value past len(labels).
 	// This should reduce memory usage when relabeling creates big number of temporary labels with long names and/or values.
 	// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825 for details.
-	labels = append([]prompbmarshal.Label{}, labels...)
+	labels := append([]prompbmarshal.Label{}, lctx.labels...)
+	putLabelsContext(lctx)
 
 	// Verify whether the scrape work must be skipped because of `-promscrape.cluster.*` configs.
 	// Perform the verification on labels after the relabeling in order to guarantee that targets with the same set of labels
@@ -1229,40 +1259,71 @@ func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]s
 	return m
 }
 
-func mergeLabels(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) []prompbmarshal.Label {
-	// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
-	m := make(map[string]string, 6+len(swc.externalLabels)+len(swc.params)+len(extraLabels)+len(metaLabels))
-	for k, v := range swc.externalLabels {
-		m[k] = v
+func mergeLabels(dst []prompbmarshal.Label, swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string) []prompbmarshal.Label {
+	if len(dst) > 0 {
+		logger.Panicf("BUG: len(dst) must be 0; got %d", len(dst))
 	}
-	m["job"] = swc.jobName
-	m["__address__"] = target
-	m["__scheme__"] = swc.scheme
-	m["__metrics_path__"] = swc.metricsPath
-	m["__scrape_interval__"] = swc.scrapeInterval.String()
-	m["__scrape_timeout__"] = swc.scrapeTimeout.String()
+	// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
+	for k, v := range swc.externalLabels {
+		dst = appendLabel(dst, k, v)
+	}
+	dst = appendLabel(dst, "job", swc.jobName)
+	dst = appendLabel(dst, "__address__", target)
+	dst = appendLabel(dst, "__scheme__", swc.scheme)
+	dst = appendLabel(dst, "__metrics_path__", swc.metricsPath)
+	dst = appendLabel(dst, "__scrape_interval__", swc.scrapeIntervalString)
+	dst = appendLabel(dst, "__scrape_timeout__", swc.scrapeTimeoutString)
 	for k, args := range swc.params {
 		if len(args) == 0 {
 			continue
 		}
 		k = "__param_" + k
 		v := args[0]
-		m[k] = v
+		dst = appendLabel(dst, k, v)
 	}
 	for k, v := range extraLabels {
-		m[k] = v
+		dst = appendLabel(dst, k, v)
 	}
 	for k, v := range metaLabels {
-		m[k] = v
+		dst = appendLabel(dst, k, v)
 	}
-	result := make([]prompbmarshal.Label, 0, len(m))
-	for k, v := range m {
-		result = append(result, prompbmarshal.Label{
-			Name:  k,
-			Value: v,
-		})
+	if len(dst) < 2 {
+		return dst
 	}
-	return result
+	// Remove duplicate labels if any.
+	// Stable sorting is needed in order to preserve the order for labels with identical names.
+	// This is needed in order to remove labels with duplicate names other than the last one.
+	promrelabel.SortLabelsStable(dst)
+	prevName := dst[0].Name
+	hasDuplicateLabels := false
+	for _, label := range dst[1:] {
+		if label.Name == prevName {
+			hasDuplicateLabels = true
+			break
+		}
+		prevName = label.Name
+	}
+	if !hasDuplicateLabels {
+		return dst
+	}
+	prevName = dst[0].Name
+	tmp := dst[:1]
+	for _, label := range dst[1:] {
+		if label.Name == prevName {
+			tmp[len(tmp)-1] = label
+		} else {
+			tmp = append(tmp, label)
+			prevName = label.Name
+		}
+	}
+	return tmp
+}
+
+func appendLabel(dst []prompbmarshal.Label, name, value string) []prompbmarshal.Label {
+	return append(dst, prompbmarshal.Label{
+		Name:  name,
+		Value: value,
+	})
 }
 
 func addMissingPort(scheme, target string) string {
diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go
index bc2147a4f..e60c85681 100644
--- a/lib/promscrape/config_test.go
+++ b/lib/promscrape/config_test.go
@@ -14,6 +14,47 @@ import (
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
 )
 
+func TestMergeLabels(t *testing.T) {
+	f := func(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string, resultExpected string) {
+		t.Helper()
+		var labels []prompbmarshal.Label
+		labels = mergeLabels(labels[:0], swc, target, extraLabels, metaLabels)
+		result := promLabelsString(labels)
+		if result != resultExpected {
+			t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
+		}
+	}
+	f(&scrapeWorkConfig{}, "foo", nil, nil, `{__address__="foo",__metrics_path__="",__scheme__="",__scrape_interval__="",__scrape_timeout__="",job=""}`)
+	f(&scrapeWorkConfig{}, "foo", map[string]string{"foo": "bar"}, nil, `{__address__="foo",__metrics_path__="",__scheme__="",__scrape_interval__="",__scrape_timeout__="",foo="bar",job=""}`)
+	f(&scrapeWorkConfig{}, "foo", map[string]string{"job": "bar"}, nil, `{__address__="foo",__metrics_path__="",__scheme__="",__scrape_interval__="",__scrape_timeout__="",job="bar"}`)
+	f(&scrapeWorkConfig{
+		jobName:              "xyz",
+		scheme:               "https",
+		metricsPath:          "/foo/bar",
+		scrapeIntervalString: "15s",
+		scrapeTimeoutString:  "10s",
+		externalLabels: map[string]string{
+			"job": "bar",
+			"a":   "b",
+		},
+	}, "foo", nil, nil, `{__address__="foo",__metrics_path__="/foo/bar",__scheme__="https",__scrape_interval__="15s",__scrape_timeout__="10s",a="b",job="xyz"}`)
+	f(&scrapeWorkConfig{
+		jobName:     "xyz",
+		scheme:      "https",
+		metricsPath: "/foo/bar",
+		externalLabels: map[string]string{
+			"job": "bar",
+			"a":   "b",
+		},
+	}, "foo", map[string]string{
+		"job": "extra_job",
+		"foo": "extra_foo",
+		"a":   "xyz",
+	}, map[string]string{
+		"__meta_x": "y",
+	}, `{__address__="foo",__meta_x="y",__metrics_path__="/foo/bar",__scheme__="https",__scrape_interval__="",__scrape_timeout__="",a="xyz",foo="extra_foo",job="extra_job"}`)
+}
+
 func TestScrapeConfigUnmarshalMarshal(t *testing.T) {
 	f := func(data string) {
 		t.Helper()