diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 72375c3c6e..d272a43482 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -1054,7 +1054,7 @@ func getLabelsContext() *labelsContext { } func putLabelsContext(lctx *labelsContext) { - labels := lctx.labels + labels := lctx.labels[:cap(lctx.labels)] for i := range labels { labels[i].Name = "" labels[i].Value = "" @@ -1222,27 +1222,24 @@ func internLabelStrings(labels []prompbmarshal.Label) { } func internString(s string) string { - if sInterned, ok := internStringsMap.Load(s); ok { - return sInterned.(string) - } - isc := atomic.LoadUint64(&internStringCount) - if isc > 100e3 { - internStringsMapLock.Lock() - internStringsMap = sync.Map{} - atomic.AddUint64(&internStringCount, ^(isc - 1)) - internStringsMapLock.Unlock() + if v, ok := internStringsMap.Load(s); ok { + sp := v.(*string) + return *sp } // Make a new copy for s in order to remove references from possible bigger string s refers to. sCopy := string(append([]byte{}, s...)) - internStringsMap.Store(sCopy, sCopy) - atomic.AddUint64(&internStringCount, 1) + internStringsMap.Store(sCopy, &sCopy) + n := atomic.AddUint64(&internStringsMapLen, 1) + if n > 100e3 { + atomic.StoreUint64(&internStringsMapLen, 0) + internStringsMap = &sync.Map{} + } return sCopy } var ( - internStringCount = uint64(0) - internStringsMapLock sync.Mutex - internStringsMap = sync.Map{} + internStringsMap = &sync.Map{} + internStringsMapLen uint64 ) func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string { diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index e60c85681a..85a5433583 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -14,6 +14,44 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) +func TestInternStringSerial(t *testing.T) { + if err := testInternString(t); err != nil { + t.Fatalf("unexpected error: %s", err) + } +} + +func TestInternStringConcurrent(t *testing.T) { + concurrency := 5 + resultCh := make(chan error, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + resultCh <- testInternString(t) + }() + } + timer := time.NewTimer(5*time.Second) + for i := 0; i < concurrency; i++ { + select { + case err := <-resultCh: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-timer.C: + t.Fatalf("timeout") + } + } +} + +func testInternString(t *testing.T) error { + for i := 0; i < 1000; i++ { + s := fmt.Sprintf("foo_%d", i) + s1 := internString(s) + if s != s1 { + return fmt.Errorf("unexpected string returned from internString; got %q; want %q", s1, s) + } + } + return nil +} + func TestMergeLabels(t *testing.T) { f := func(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string, resultExpected string) { t.Helper() diff --git a/lib/promscrape/config_timing_test.go b/lib/promscrape/config_timing_test.go new file mode 100644 index 0000000000..a4c38aef8c --- /dev/null +++ b/lib/promscrape/config_timing_test.go @@ -0,0 +1,25 @@ +package promscrape + +import ( + "fmt" + "testing" +) + +func BenchmarkInternString(b *testing.B) { + a := make([]string, 10000) + for i := range a { + a[i] = fmt.Sprintf("string_%d", i) + } + b.ReportAllocs() + b.SetBytes(int64(len(a))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for _, s := range a { + sResult := internString(s) + if sResult != s { + panic(fmt.Sprintf("unexpected string obtained; got %q; want %q", sResult, s)) + } + } + } + }) +} diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 8c5873394f..b142ec3991 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -108,7 +108,7 @@ func (aw *apiWatcher) reloadScrapeWorks(uw *urlWatcher, swosByKey map[string][]i } func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[string]string) { - swos := aw.getScrapeWorkObjectsForLabels(labels) + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) aw.swosByURLWatcherLock.Lock() swosByKey := aw.swosByURLWatcher[uw] if swosByKey == nil { @@ -134,10 +134,10 @@ func (aw *apiWatcher) removeScrapeWorks(uw *urlWatcher, key string) { aw.swosByURLWatcherLock.Unlock() } -func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} { +func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} { swos := make([]interface{}, 0, len(labelss)) for _, labels := range labelss { - swo := aw.swcFunc(labels) + swo := swcFunc(labels) // The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1 if swo != nil && !reflect.ValueOf(swo).IsNil() { swos = append(swos, swo) @@ -150,21 +150,14 @@ func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) func (aw *apiWatcher) getScrapeWorkObjects() []interface{} { aw.gw.registerPendingAPIWatchers() + swos := make([]interface{}, 0, aw.swosCount.Get()) aw.swosByURLWatcherLock.Lock() - defer aw.swosByURLWatcherLock.Unlock() - - size := 0 - for _, swosByKey := range aw.swosByURLWatcher { - for _, swosLocal := range swosByKey { - size += len(swosLocal) - } - } - swos := make([]interface{}, 0, size) for _, swosByKey := range aw.swosByURLWatcher { for _, swosLocal := range swosByKey { swos = append(swos, swosLocal...) } } + aw.swosByURLWatcherLock.Unlock() return swos } @@ -296,18 +289,6 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { if needStart { uw.reloadObjects() go uw.watchForUpdates() - if role == "endpoints" || role == "endpointslice" { - // Refresh endpoints and enpointslices targets in background, since they depend on other object types such as pod and service. - // This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 . - go func() { - for { - time.Sleep(5 * time.Second) - gw.mu.Lock() - uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws) - gw.mu.Unlock() - } - }() - } } } } @@ -378,7 +359,7 @@ type urlWatcher struct { // Batch registering saves CPU time needed for registering big number of Kubernetes objects // shared among big number of scrape jobs, since per-object labels are generated only once // for all the scrape jobs (each scrape job is associated with a single apiWatcher). - // See reloadScrapeWorksForAPIWatchersLocked for details. + // See registerPendingAPIWatchersLocked for details. awsPending map[*apiWatcher]struct{} // aws contains registered apiWatcher objects @@ -434,15 +415,45 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() { if len(uw.awsPending) == 0 { return } - awsPending := make([]*apiWatcher, 0, len(uw.awsPending)) + aws := make([]*apiWatcher, 0, len(uw.awsPending)) for aw := range uw.awsPending { - awsPending = append(awsPending, aw) uw.aws[aw] = struct{}{} + aws = append(aws, aw) + } + swosByKey := make([]map[string][]interface{}, len(aws)) + for i := range aws { + swosByKey[i] = make(map[string][]interface{}) + } + + // Generate ScrapeWork objects in parallel on available CPU cores. + // This should reduce the time needed for their generation on systems with many CPU cores. + var swosByKeyLock sync.Mutex + var wg sync.WaitGroup + limiterCh := make(chan struct{}, cgroup.AvailableCPUs()) + for key, o := range uw.objectsByKey { + labels := o.getTargetLabels(uw.gw) + wg.Add(1) + limiterCh <- struct{}{} + go func(key string, labels []map[string]string) { + for i, aw := range aws { + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + if len(swos) > 0 { + swosByKeyLock.Lock() + swosByKey[i][key] = swos + swosByKeyLock.Unlock() + } + } + wg.Done() + <-limiterCh + }(key, labels) + } + wg.Wait() + for i, aw := range aws { + aw.reloadScrapeWorks(uw, swosByKey[i]) } - uw.reloadScrapeWorksForAPIWatchersLocked(uw.awsPending) uw.awsPending = make(map[*apiWatcher]struct{}) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending)) - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending)) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(aws)) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(aws)) } func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) { @@ -485,19 +496,21 @@ func (uw *urlWatcher) reloadObjects() string { uw.gw.mu.Lock() var updated, removed, added int for key := range uw.objectsByKey { - if _, ok := objectsByKey[key]; ok { + o, ok := objectsByKey[key] + if ok { + uw.updateObjectLocked(key, o) updated++ } else { + uw.removeObjectLocked(key) removed++ } } - for key := range objectsByKey { + for key, o := range objectsByKey { if _, ok := uw.objectsByKey[key]; !ok { + uw.updateObjectLocked(key, o) added++ } } - uw.objectsByKey = objectsByKey - uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws) uw.gw.mu.Unlock() uw.objectsUpdated.Add(updated) @@ -511,53 +524,6 @@ func (uw *urlWatcher) reloadObjects() string { return uw.resourceVersion } -func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatcher]struct{}) { - if len(awsMap) == 0 { - return - } - aws := make([]*apiWatcher, 0, len(awsMap)) - for aw := range awsMap { - aws = append(aws, aw) - } - swosByKey := make([]map[string][]interface{}, len(aws)) - for i := range aws { - swosByKey[i] = make(map[string][]interface{}) - } - // update swos concurrently, - // it must decrease reload time for high number of records at promscrape file - maxConcurrent := cgroup.AvailableCPUs() - 2 - if maxConcurrent < 1 { - maxConcurrent = 1 - } - limit := make(chan struct{}, maxConcurrent) - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - for key, o := range uw.objectsByKey { - limit <- struct{}{} - wg.Add(1) - // update swsos for each target at separate CPU - go func(key string, o object) { - labels := o.getTargetLabels(uw.gw) - for i, aw := range aws { - swos := aw.getScrapeWorkObjectsForLabels(labels) - if len(swos) > 0 { - mu.Lock() - swosByKey[i][key] = swos - mu.Unlock() - } - } - wg.Done() - <-limit - }(key, o) - } - wg.Wait() - for i, aw := range aws { - aw.reloadScrapeWorks(uw, swosByKey[i]) - } -} - // watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes @@ -636,20 +602,12 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { key := o.key() uw.gw.mu.Lock() if _, ok := uw.objectsByKey[key]; !ok { - // if we.Type == "MODIFIED" is expected condition after recovering from the bookmarked resourceVersion. uw.objectsCount.Inc() uw.objectsAdded.Inc() } else { - // if we.Type == "ADDED" is expected condition after recovering from the bookmarked resourceVersion. uw.objectsUpdated.Inc() } - uw.objectsByKey[key] = o - if len(uw.aws) > 0 { - labels := o.getTargetLabels(uw.gw) - for aw := range uw.aws { - aw.setScrapeWorks(uw, key, labels) - } - } + uw.updateObjectLocked(key, o) uw.gw.mu.Unlock() case "DELETED": o, err := uw.parseObject(we.Object) @@ -661,11 +619,8 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { if _, ok := uw.objectsByKey[key]; ok { uw.objectsCount.Dec() uw.objectsRemoved.Inc() - delete(uw.objectsByKey, key) - } - for aw := range uw.aws { - aw.removeScrapeWorks(uw, key) } + uw.removeObjectLocked(key) uw.gw.mu.Unlock() case "BOOKMARK": // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks @@ -692,6 +647,29 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { } } +func (uw *urlWatcher) updateObjectLocked(key string, o object) { + oPrev, ok := uw.objectsByKey[key] + if ok && reflect.DeepEqual(oPrev, o) { + // Nothing to do, since the new object is equal to the previous one. + return + } + uw.objectsByKey[key] = o + if len(uw.aws) == 0 { + return + } + labels := o.getTargetLabels(uw.gw) + for aw := range uw.aws { + aw.setScrapeWorks(uw, key, labels) + } +} + +func (uw *urlWatcher) removeObjectLocked(key string) { + delete(uw.objectsByKey, key) + for aw := range uw.aws { + aw.removeScrapeWorks(uw, key) + } +} + // Bookmark is a bookmark message from Kubernetes Watch API. // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks type Bookmark struct {