lib/promscrape: follow-up after 91e290a8ff

This commit is contained in:
Aliaksandr Valialkin 2022-04-20 16:11:37 +03:00
parent 91e290a8ff
commit 2341bd48d7
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 147 additions and 109 deletions

View file

@ -1054,7 +1054,7 @@ func getLabelsContext() *labelsContext {
}
func putLabelsContext(lctx *labelsContext) {
labels := lctx.labels
labels := lctx.labels[:cap(lctx.labels)]
for i := range labels {
labels[i].Name = ""
labels[i].Value = ""
@ -1222,27 +1222,24 @@ func internLabelStrings(labels []prompbmarshal.Label) {
}
func internString(s string) string {
if sInterned, ok := internStringsMap.Load(s); ok {
return sInterned.(string)
}
isc := atomic.LoadUint64(&internStringCount)
if isc > 100e3 {
internStringsMapLock.Lock()
internStringsMap = sync.Map{}
atomic.AddUint64(&internStringCount, ^(isc - 1))
internStringsMapLock.Unlock()
if v, ok := internStringsMap.Load(s); ok {
sp := v.(*string)
return *sp
}
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sCopy := string(append([]byte{}, s...))
internStringsMap.Store(sCopy, sCopy)
atomic.AddUint64(&internStringCount, 1)
internStringsMap.Store(sCopy, &sCopy)
n := atomic.AddUint64(&internStringsMapLen, 1)
if n > 100e3 {
atomic.StoreUint64(&internStringsMapLen, 0)
internStringsMap = &sync.Map{}
}
return sCopy
}
var (
internStringCount = uint64(0)
internStringsMapLock sync.Mutex
internStringsMap = sync.Map{}
internStringsMap = &sync.Map{}
internStringsMapLen uint64
)
func getParamsFromLabels(labels []prompbmarshal.Label, paramsOrig map[string][]string) map[string][]string {

View file

@ -14,6 +14,44 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
func TestInternStringSerial(t *testing.T) {
if err := testInternString(t); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestInternStringConcurrent(t *testing.T) {
concurrency := 5
resultCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
resultCh <- testInternString(t)
}()
}
timer := time.NewTimer(5*time.Second)
for i := 0; i < concurrency; i++ {
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-timer.C:
t.Fatalf("timeout")
}
}
}
func testInternString(t *testing.T) error {
for i := 0; i < 1000; i++ {
s := fmt.Sprintf("foo_%d", i)
s1 := internString(s)
if s != s1 {
return fmt.Errorf("unexpected string returned from internString; got %q; want %q", s1, s)
}
}
return nil
}
func TestMergeLabels(t *testing.T) {
f := func(swc *scrapeWorkConfig, target string, extraLabels, metaLabels map[string]string, resultExpected string) {
t.Helper()

View file

@ -0,0 +1,25 @@
package promscrape
import (
"fmt"
"testing"
)
func BenchmarkInternString(b *testing.B) {
a := make([]string, 10000)
for i := range a {
a[i] = fmt.Sprintf("string_%d", i)
}
b.ReportAllocs()
b.SetBytes(int64(len(a)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, s := range a {
sResult := internString(s)
if sResult != s {
panic(fmt.Sprintf("unexpected string obtained; got %q; want %q", sResult, s))
}
}
}
})
}

View file

@ -108,7 +108,7 @@ func (aw *apiWatcher) reloadScrapeWorks(uw *urlWatcher, swosByKey map[string][]i
}
func (aw *apiWatcher) setScrapeWorks(uw *urlWatcher, key string, labels []map[string]string) {
swos := aw.getScrapeWorkObjectsForLabels(labels)
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
aw.swosByURLWatcherLock.Lock()
swosByKey := aw.swosByURLWatcher[uw]
if swosByKey == nil {
@ -134,10 +134,10 @@ func (aw *apiWatcher) removeScrapeWorks(uw *urlWatcher, key string) {
aw.swosByURLWatcherLock.Unlock()
}
func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string) []interface{} {
func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} {
swos := make([]interface{}, 0, len(labelss))
for _, labels := range labelss {
swo := aw.swcFunc(labels)
swo := swcFunc(labels)
// The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1
if swo != nil && !reflect.ValueOf(swo).IsNil() {
swos = append(swos, swo)
@ -150,21 +150,14 @@ func (aw *apiWatcher) getScrapeWorkObjectsForLabels(labelss []map[string]string)
func (aw *apiWatcher) getScrapeWorkObjects() []interface{} {
aw.gw.registerPendingAPIWatchers()
swos := make([]interface{}, 0, aw.swosCount.Get())
aw.swosByURLWatcherLock.Lock()
defer aw.swosByURLWatcherLock.Unlock()
size := 0
for _, swosByKey := range aw.swosByURLWatcher {
for _, swosLocal := range swosByKey {
size += len(swosLocal)
}
}
swos := make([]interface{}, 0, size)
for _, swosByKey := range aw.swosByURLWatcher {
for _, swosLocal := range swosByKey {
swos = append(swos, swosLocal...)
}
}
aw.swosByURLWatcherLock.Unlock()
return swos
}
@ -296,18 +289,6 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
if needStart {
uw.reloadObjects()
go uw.watchForUpdates()
if role == "endpoints" || role == "endpointslice" {
// Refresh endpoints and enpointslices targets in background, since they depend on other object types such as pod and service.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
go func() {
for {
time.Sleep(5 * time.Second)
gw.mu.Lock()
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
gw.mu.Unlock()
}
}()
}
}
}
}
@ -378,7 +359,7 @@ type urlWatcher struct {
// Batch registering saves CPU time needed for registering big number of Kubernetes objects
// shared among big number of scrape jobs, since per-object labels are generated only once
// for all the scrape jobs (each scrape job is associated with a single apiWatcher).
// See reloadScrapeWorksForAPIWatchersLocked for details.
// See registerPendingAPIWatchersLocked for details.
awsPending map[*apiWatcher]struct{}
// aws contains registered apiWatcher objects
@ -434,15 +415,45 @@ func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
if len(uw.awsPending) == 0 {
return
}
awsPending := make([]*apiWatcher, 0, len(uw.awsPending))
aws := make([]*apiWatcher, 0, len(uw.awsPending))
for aw := range uw.awsPending {
awsPending = append(awsPending, aw)
uw.aws[aw] = struct{}{}
aws = append(aws, aw)
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
// Generate ScrapeWork objects in parallel on available CPU cores.
// This should reduce the time needed for their generation on systems with many CPU cores.
var swosByKeyLock sync.Mutex
var wg sync.WaitGroup
limiterCh := make(chan struct{}, cgroup.AvailableCPUs())
for key, o := range uw.objectsByKey {
labels := o.getTargetLabels(uw.gw)
wg.Add(1)
limiterCh <- struct{}{}
go func(key string, labels []map[string]string) {
for i, aw := range aws {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels)
if len(swos) > 0 {
swosByKeyLock.Lock()
swosByKey[i][key] = swos
swosByKeyLock.Unlock()
}
}
wg.Done()
<-limiterCh
}(key, labels)
}
wg.Wait()
for i, aw := range aws {
aw.reloadScrapeWorks(uw, swosByKey[i])
}
uw.reloadScrapeWorksForAPIWatchersLocked(uw.awsPending)
uw.awsPending = make(map[*apiWatcher]struct{})
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(aws))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(aws))
}
func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) {
@ -485,19 +496,21 @@ func (uw *urlWatcher) reloadObjects() string {
uw.gw.mu.Lock()
var updated, removed, added int
for key := range uw.objectsByKey {
if _, ok := objectsByKey[key]; ok {
o, ok := objectsByKey[key]
if ok {
uw.updateObjectLocked(key, o)
updated++
} else {
uw.removeObjectLocked(key)
removed++
}
}
for key := range objectsByKey {
for key, o := range objectsByKey {
if _, ok := uw.objectsByKey[key]; !ok {
uw.updateObjectLocked(key, o)
added++
}
}
uw.objectsByKey = objectsByKey
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
uw.gw.mu.Unlock()
uw.objectsUpdated.Add(updated)
@ -511,53 +524,6 @@ func (uw *urlWatcher) reloadObjects() string {
return uw.resourceVersion
}
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatcher]struct{}) {
if len(awsMap) == 0 {
return
}
aws := make([]*apiWatcher, 0, len(awsMap))
for aw := range awsMap {
aws = append(aws, aw)
}
swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws {
swosByKey[i] = make(map[string][]interface{})
}
// update swos concurrently,
// it must decrease reload time for high number of records at promscrape file
maxConcurrent := cgroup.AvailableCPUs() - 2
if maxConcurrent < 1 {
maxConcurrent = 1
}
limit := make(chan struct{}, maxConcurrent)
var (
mu sync.Mutex
wg sync.WaitGroup
)
for key, o := range uw.objectsByKey {
limit <- struct{}{}
wg.Add(1)
// update swsos for each target at separate CPU
go func(key string, o object) {
labels := o.getTargetLabels(uw.gw)
for i, aw := range aws {
swos := aw.getScrapeWorkObjectsForLabels(labels)
if len(swos) > 0 {
mu.Lock()
swosByKey[i][key] = swos
mu.Unlock()
}
}
wg.Done()
<-limit
}(key, o)
}
wg.Wait()
for i, aw := range aws {
aw.reloadScrapeWorks(uw, swosByKey[i])
}
}
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
@ -636,20 +602,12 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
key := o.key()
uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; !ok {
// if we.Type == "MODIFIED" is expected condition after recovering from the bookmarked resourceVersion.
uw.objectsCount.Inc()
uw.objectsAdded.Inc()
} else {
// if we.Type == "ADDED" is expected condition after recovering from the bookmarked resourceVersion.
uw.objectsUpdated.Inc()
}
uw.objectsByKey[key] = o
if len(uw.aws) > 0 {
labels := o.getTargetLabels(uw.gw)
for aw := range uw.aws {
aw.setScrapeWorks(uw, key, labels)
}
}
uw.updateObjectLocked(key, o)
uw.gw.mu.Unlock()
case "DELETED":
o, err := uw.parseObject(we.Object)
@ -661,11 +619,8 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
if _, ok := uw.objectsByKey[key]; ok {
uw.objectsCount.Dec()
uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key)
}
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
}
uw.removeObjectLocked(key)
uw.gw.mu.Unlock()
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
@ -692,6 +647,29 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
}
}
func (uw *urlWatcher) updateObjectLocked(key string, o object) {
oPrev, ok := uw.objectsByKey[key]
if ok && reflect.DeepEqual(oPrev, o) {
// Nothing to do, since the new object is equal to the previous one.
return
}
uw.objectsByKey[key] = o
if len(uw.aws) == 0 {
return
}
labels := o.getTargetLabels(uw.gw)
for aw := range uw.aws {
aw.setScrapeWorks(uw, key, labels)
}
}
func (uw *urlWatcher) removeObjectLocked(key string) {
delete(uw.objectsByKey, key)
for aw := range uw.aws {
aw.removeScrapeWorks(uw, key)
}
}
// Bookmark is a bookmark message from Kubernetes Watch API.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct {