app/vmselect/promql: pass workerID to the callback inside doParallel()

This opens the possibility to remove tssLock from evalRollupFuncWithSubquery()
in the follow-up commit from @zekker6 in order to speed up the code
for systems with many CPU cores.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
This commit is contained in:
Aliaksandr Valialkin 2023-03-20 20:54:54 -07:00
parent 9b5245a836
commit d8e218d99e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -914,7 +914,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
var tssLock sync.Mutex
var samplesScannedTotal uint64
keepMetricNames := getKeepMetricNames(expr)
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) {
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps)
for _, rc := range rcs {
@ -958,28 +958,36 @@ func getKeepMetricNames(expr metricsql.Expr) bool {
return false
}
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) {
concurrency := cgroup.AvailableCPUs()
if concurrency > len(tss) {
concurrency = len(tss)
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64)) {
workers := netstorage.MaxWorkers()
if workers > len(tss) {
workers = len(tss)
}
workCh := make(chan *timeseries, concurrency)
seriesPerWorker := (len(tss) + workers - 1) / workers
workChs := make([]chan *timeseries, workers)
for i := range workChs {
workChs[i] = make(chan *timeseries, seriesPerWorker)
}
for i, ts := range tss {
idx := i % len(workChs)
workChs[idx] <- ts
}
for _, workCh := range workChs {
close(workCh)
}
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(workerID uint) {
defer wg.Done()
var tmpValues []float64
var tmpTimestamps []int64
for ts := range workCh {
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps)
for ts := range workChs[workerID] {
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID)
}
}()
}(uint(i))
}
for _, ts := range tss {
workCh <- ts
}
close(workCh)
wg.Wait()
}