From d8e218d99eab18c0ebe951733ca49896be9928ec Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 20 Mar 2023 20:54:54 -0700 Subject: [PATCH] app/vmselect/promql: pass workerID to the callback inside doParallel() This opens the possibility to remove tssLock from evalRollupFuncWithSubquery() in the follow-up commit from @zekker6 in order to speed up the code for systems with many CPU cores. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966 --- app/vmselect/promql/eval.go | 40 ++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index a45bede8b..f9b4841c5 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -914,7 +914,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName var tssLock sync.Mutex var samplesScannedTotal uint64 keepMetricNames := getKeepMetricNames(expr) - doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) { + doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) { values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) for _, rc := range rcs { @@ -958,28 +958,36 @@ func getKeepMetricNames(expr metricsql.Expr) bool { return false } -func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) { - concurrency := cgroup.AvailableCPUs() - if concurrency > len(tss) { - concurrency = len(tss) +func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) { + workers := netstorage.MaxWorkers() + if workers > len(tss) { + workers = len(tss) } - workCh := make(chan *timeseries, concurrency) + seriesPerWorker := (len(tss) + workers - 1) / workers + workChs := make([]chan *timeseries, workers) + for i := range workChs { + workChs[i] = make(chan *timeseries, seriesPerWorker) + } + for i, ts := range tss { + idx := i % len(workChs) + workChs[idx] <- ts + } + for _, workCh := range workChs { + close(workCh) + } + var wg sync.WaitGroup - wg.Add(concurrency) - for i := 0; i < concurrency; i++ { - go func() { + wg.Add(workers) + for i := 0; i < workers; i++ { + go func(workerID uint) { defer wg.Done() var tmpValues []float64 var tmpTimestamps []int64 - for ts := range workCh { - tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps) + for ts := range workChs[workerID] { + tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID) } - }() + }(uint(i)) } - for _, ts := range tss { - workCh <- ts - } - close(workCh) wg.Wait() }