app/vmselect/promql: increase scalability of incremental aggregate calculations on systems with many CPU cores

Use sync.Map instead of a global mutex there. This should lift scalability limits
on systems with many CPU cores.
This commit is contained in:
Aliaksandr Valialkin 2022-10-01 19:59:57 +03:00
parent fcc7ab71b3
commit fb1cc3cc94
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -65,8 +65,7 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{
type incrementalAggrFuncContext struct { type incrementalAggrFuncContext struct {
ae *metricsql.AggrFuncExpr ae *metricsql.AggrFuncExpr
mLock sync.Mutex m sync.Map
m map[uint]map[string]*incrementalAggrContext
callbacks *incrementalAggrFuncCallbacks callbacks *incrementalAggrFuncCallbacks
} }
@ -74,19 +73,20 @@ type incrementalAggrFuncContext struct {
func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext { func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext {
return &incrementalAggrFuncContext{ return &incrementalAggrFuncContext{
ae: ae, ae: ae,
m: make(map[uint]map[string]*incrementalAggrContext),
callbacks: callbacks, callbacks: callbacks,
} }
} }
func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) { func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) {
iafc.mLock.Lock() v, ok := iafc.m.Load(workerID)
m := iafc.m[workerID] if !ok {
if m == nil { // It is safe creating and storing m in iafc.m without locking,
m = make(map[string]*incrementalAggrContext, 1) // since it is guaranteed that only a single goroutine can execute
iafc.m[workerID] = m // code for the given workerID at a time.
v = make(map[string]*incrementalAggrContext, 1)
iafc.m.Store(workerID, v)
} }
iafc.mLock.Unlock() m := v.(map[string]*incrementalAggrContext)
ts := tsOrig ts := tsOrig
keepOriginal := iafc.callbacks.keepOriginal keepOriginal := iafc.callbacks.keepOriginal
@ -124,11 +124,10 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor
} }
func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
// There is no need in iafc.mLock.Lock here, since finalizeTimeseries must be called
// without concurrent goroutines touching iafc.
mGlobal := make(map[string]*incrementalAggrContext) mGlobal := make(map[string]*incrementalAggrContext)
mergeAggrFunc := iafc.callbacks.mergeAggrFunc mergeAggrFunc := iafc.callbacks.mergeAggrFunc
for _, m := range iafc.m { iafc.m.Range(func(k, v interface{}) bool {
m := v.(map[string]*incrementalAggrContext)
for k, iac := range m { for k, iac := range m {
iacGlobal := mGlobal[k] iacGlobal := mGlobal[k]
if iacGlobal == nil { if iacGlobal == nil {
@ -141,7 +140,8 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
} }
mergeAggrFunc(iacGlobal, iac) mergeAggrFunc(iacGlobal, iac)
} }
} return true
})
tss := make([]*timeseries, 0, len(mGlobal)) tss := make([]*timeseries, 0, len(mGlobal))
finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc
for _, iac := range mGlobal { for _, iac := range mGlobal {