From 5497997b726dd2b2eda75562a612bbbcf6e2d7fc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 1 Oct 2022 19:59:57 +0300 Subject: [PATCH] app/vmselect/promql: increase scalability of incremental aggregate calculations on systems with many CPU cores Use sync.Map instead of a global mutex there. This should lift scalability limits on systems with many CPU cores. --- app/vmselect/promql/aggr_incremental.go | 26 ++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/app/vmselect/promql/aggr_incremental.go b/app/vmselect/promql/aggr_incremental.go index 5be010532..581b3a8f5 100644 --- a/app/vmselect/promql/aggr_incremental.go +++ b/app/vmselect/promql/aggr_incremental.go @@ -65,8 +65,7 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{ type incrementalAggrFuncContext struct { ae *metricsql.AggrFuncExpr - mLock sync.Mutex - m map[uint]map[string]*incrementalAggrContext + m sync.Map callbacks *incrementalAggrFuncCallbacks } @@ -74,19 +73,20 @@ type incrementalAggrFuncContext struct { func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext { return &incrementalAggrFuncContext{ ae: ae, - m: make(map[uint]map[string]*incrementalAggrContext), callbacks: callbacks, } } func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) { - iafc.mLock.Lock() - m := iafc.m[workerID] - if m == nil { - m = make(map[string]*incrementalAggrContext, 1) - iafc.m[workerID] = m + v, ok := iafc.m.Load(workerID) + if !ok { + // It is safe creating and storing m in iafc.m without locking, + // since it is guaranteed that only a single goroutine can execute + // code for the given workerID at a time. + v = make(map[string]*incrementalAggrContext, 1) + iafc.m.Store(workerID, v) } - iafc.mLock.Unlock() + m := v.(map[string]*incrementalAggrContext) ts := tsOrig keepOriginal := iafc.callbacks.keepOriginal @@ -124,11 +124,10 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor } func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { - // There is no need in iafc.mLock.Lock here, since finalizeTimeseries must be called - // without concurrent goroutines touching iafc. mGlobal := make(map[string]*incrementalAggrContext) mergeAggrFunc := iafc.callbacks.mergeAggrFunc - for _, m := range iafc.m { + iafc.m.Range(func(k, v interface{}) bool { + m := v.(map[string]*incrementalAggrContext) for k, iac := range m { iacGlobal := mGlobal[k] if iacGlobal == nil { @@ -141,7 +140,8 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { } mergeAggrFunc(iacGlobal, iac) } - } + return true + }) tss := make([]*timeseries, 0, len(mGlobal)) finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc for _, iac := range mGlobal {