From 91e290a8ffb1864ffadc83fdb35c2ead80e9a329 Mon Sep 17 00:00:00 2001
From: Nikolay <nik@victoriametrics.com>
Date: Wed, 20 Apr 2022 16:09:40 +0300
Subject: [PATCH] lib/promscrape: reduce latency for k8s GetLabels (#2454)

replaces internStringMap with sync.Map - it greatly reduces lock contention
concurently reload scrape work for api watcher - each object labels added by dedicated CPU

changes can be tested with following script https://gist.github.com/f41gh7/6f8f8d8719786aff1f18a85c23aebf70
---
 lib/promscrape/config.go                      | 24 +++++++-----
 .../discovery/kubernetes/api_watcher.go       | 38 +++++++++++++++----
 2 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go
index 40d2b8f21..72375c3c6 100644
--- a/lib/promscrape/config.go
+++ b/lib/promscrape/config.go
@@ -9,6 +9,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -1221,24 +1222,27 @@ func internLabelStrings(labels []prompbmarshal.Label) {
 }
 
 func internString(s string) string {
-	internStringsMapLock.Lock()
-	defer internStringsMapLock.Unlock()
-
-	if sInterned, ok := internStringsMap[s]; ok {
-		return sInterned
+	if sInterned, ok := internStringsMap.Load(s); ok {
+		return sInterned.(string)
+	}
+	isc := atomic.LoadUint64(&internStringCount)
+	if isc > 100e3 {
+		internStringsMapLock.Lock()
+		internStringsMap = sync.Map{}
+		atomic.AddUint64(&internStringCount, ^(isc - 1))
+		internStringsMapLock.Unlock()
 	}
 	// Make a new copy for s in order to remove references from possible bigger string s refers to.
 	sCopy := string(append([]byte{}, s...))
-	internStringsMap[sCopy] = sCopy
-	if len(internStringsMap) > 100e3 {
-		internStringsMap = make(map[string]string, 100e3)
-	}
+	internStringsMap.Store(sCopy, sCopy)
+	atomic.AddUint64(&internStringCount, 1)
 	return sCopy
 }
 
 var (
+	internStringCount    = uint64(0)
 	internStringsMapLock sync.Mutex
-	internStringsMap     = make(map[string]string, 100e3)
+	internStringsMap     = sync.Map{}
 )
 
 func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {
diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go
index da0801cee..8c5873394 100644
--- a/lib/promscrape/discovery/kubernetes/api_watcher.go
+++ b/lib/promscrape/discovery/kubernetes/api_watcher.go
@@ -16,6 +16,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
 	"github.com/VictoriaMetrics/metrics"
@@ -522,15 +523,36 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatch
 	for i := range aws {
 		swosByKey[i] = make(map[string][]interface{})
 	}
-	for key, o := range uw.objectsByKey {
-		labels := o.getTargetLabels(uw.gw)
-		for i, aw := range aws {
-			swos := aw.getScrapeWorkObjectsForLabels(labels)
-			if len(swos) > 0 {
-				swosByKey[i][key] = swos
-			}
-		}
+	// update swos concurrently,
+	// it must decrease reload time for high number of records at promscrape file
+	maxConcurrent := cgroup.AvailableCPUs() - 2
+	if maxConcurrent < 1 {
+		maxConcurrent = 1
 	}
+	limit := make(chan struct{}, maxConcurrent)
+	var (
+		mu sync.Mutex
+		wg sync.WaitGroup
+	)
+	for key, o := range uw.objectsByKey {
+		limit <- struct{}{}
+		wg.Add(1)
+		// update swsos for each target at separate CPU
+		go func(key string, o object) {
+			labels := o.getTargetLabels(uw.gw)
+			for i, aw := range aws {
+				swos := aw.getScrapeWorkObjectsForLabels(labels)
+				if len(swos) > 0 {
+					mu.Lock()
+					swosByKey[i][key] = swos
+					mu.Unlock()
+				}
+			}
+			wg.Done()
+			<-limit
+		}(key, o)
+	}
+	wg.Wait()
 	for i, aw := range aws {
 		aw.reloadScrapeWorks(uw, swosByKey[i])
 	}