diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 4ad34b6a9..0d1f1de2d 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -207,10 +207,17 @@ type result struct { var resultPool sync.Pool +// MaxWorkers returns the maximum number of workers netstorage can spin when calling RunParallel() +func MaxWorkers() int { + return gomaxprocs +} + +var gomaxprocs = cgroup.AvailableCPUs() + // RunParallel runs f in parallel for all the results from rss. // // f shouldn't hold references to rs after returning. -// workerID is the id of the worker goroutine that calls f. +// workerID is the id of the worker goroutine that calls f. The workerID is in the range [0..MaxWorkers()-1]. // Data processing is immediately stopped if f returns non-nil error. // // rss becomes unusable after the call to RunParallel. @@ -244,7 +251,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke tsw.f = f tsw.mustStop = &mustStop } - if gomaxprocs == 1 || tswsLen == 1 { + maxWorkers := MaxWorkers() + if maxWorkers == 1 || tswsLen == 1 { // It is faster to process time series in the current goroutine. tsw := getTimeseriesWork() tmpResult := getTmpResult() @@ -280,8 +288,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke // Prepare worker channels. workers := len(tsws) - if workers > gomaxprocs { - workers = gomaxprocs + if workers > maxWorkers { + workers = maxWorkers } itemsPerWorker := (len(tsws) + workers - 1) / workers workChs := make([]chan *timeseriesWork, workers) @@ -333,8 +341,6 @@ var ( seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`) ) -var gomaxprocs = cgroup.AvailableCPUs() - type packedTimeseries struct { metricName string brs []blockRef @@ -1017,7 +1023,6 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear indexSearchDuration.UpdateDuration(startTime) // Start workers that call f in parallel on available CPU cores. - gomaxprocs := cgroup.AvailableCPUs() workCh := make(chan *exportWork, gomaxprocs*8) var ( errGlobal error diff --git a/app/vmselect/promql/aggr_incremental.go b/app/vmselect/promql/aggr_incremental.go index 26f91ed0e..f3d5bd416 100644 --- a/app/vmselect/promql/aggr_incremental.go +++ b/app/vmselect/promql/aggr_incremental.go @@ -3,8 +3,9 @@ package promql import ( "math" "strings" - "sync" + "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/metricsql" ) @@ -63,31 +64,36 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{ }, } +type incrementalAggrContextMap struct { + m map[string]*incrementalAggrContext + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(map[string]*incrementalAggrContext{})%128]byte +} + type incrementalAggrFuncContext struct { ae *metricsql.AggrFuncExpr - m sync.Map + byWorkerID []incrementalAggrContextMap callbacks *incrementalAggrFuncCallbacks } func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext { return &incrementalAggrFuncContext{ - ae: ae, - callbacks: callbacks, + ae: ae, + byWorkerID: make([]incrementalAggrContextMap, netstorage.MaxWorkers()), + callbacks: callbacks, } } func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) { - v, ok := iafc.m.Load(workerID) - if !ok { - // It is safe creating and storing m in iafc.m without locking, - // since it is guaranteed that only a single goroutine can execute - // code for the given workerID at a time. - v = make(map[string]*incrementalAggrContext, 1) - iafc.m.Store(workerID, v) + v := &iafc.byWorkerID[workerID] + if v.m == nil { + v.m = make(map[string]*incrementalAggrContext, 1) } - m := v.(map[string]*incrementalAggrContext) + m := v.m ts := tsOrig keepOriginal := iafc.callbacks.keepOriginal @@ -128,9 +134,9 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { mGlobal := make(map[string]*incrementalAggrContext) mergeAggrFunc := iafc.callbacks.mergeAggrFunc - iafc.m.Range(func(k, v interface{}) bool { - m := v.(map[string]*incrementalAggrContext) - for k, iac := range m { + byWorkerID := iafc.byWorkerID + for i := range byWorkerID { + for k, iac := range byWorkerID[i].m { iacGlobal := mGlobal[k] if iacGlobal == nil { if iafc.ae.Limit > 0 && len(mGlobal) >= iafc.ae.Limit { @@ -142,8 +148,7 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { } mergeAggrFunc(iacGlobal, iac) } - return true - }) + } tss := make([]*timeseries, 0, len(mGlobal)) finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc for _, iac := range mGlobal {