app/vmselect/promql: use lock-less approach to gather results of parallel processing for evalRollup* funcs (#4004)

* vmselect/promql: refactor `evalRollupNoIncrementalAggregate` to use lock-less approach for parallel workers computation

Locking there is causing issues when running on highly multi-core system as it introduces lock contention during results merge.

New implementation uses lock less approach to store results per workerID and merges final result in the end, this is expected to significantly reduce lock contention and CPU usage for systems with high number of cores.

Related: #3966
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* vmselect/promql: add pooling for `timeseriesWithPadding` to reduce allocations

Related: #3966
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* vmselect/promql: refactor `evalRollupFuncWithSubquery` to avoid using locks

Uses same approach as `evalRollupNoIncrementalAggregate` to remove locking between workers and reduce lock contention.

Related: #3966
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-03-25 09:07:12 +03:00 committed by Aliaksandr Valialkin
parent d8e218d99e
commit 71044221f6
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -9,6 +9,7 @@ import (
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
@ -910,8 +911,12 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
if err != nil {
return nil, err
}
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
var tssLock sync.Mutex
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
for i := 0; i < netstorage.MaxWorkers(); i++ {
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
}
var samplesScannedTotal uint64
keepMetricNames := getKeepMetricNames(expr)
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
@ -921,20 +926,22 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = tsm.AppendTimeseriesTo(tss)
tssLock.Unlock()
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = append(tss, &ts)
tssLock.Unlock()
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
}
return values, timestamps
})
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
for i := range seriesByWorkerID {
tss = append(tss, seriesByWorkerID[i].tss...)
putTimeseriesPadded(seriesByWorkerID[i])
}
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
return tss, nil
@ -1201,12 +1208,39 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string,
return tss, nil
}
var tspPool sync.Pool
func getTimeseriesPadded() *timeseriesWithPadding {
v := tspPool.Get()
if v == nil {
return &timeseriesWithPadding{}
}
return v.(*timeseriesWithPadding)
}
func putTimeseriesPadded(tsp *timeseriesWithPadding) {
tsp.tss = tsp.tss[:0]
tspPool.Put(tsp)
}
type timeseriesWithPadding struct {
tss []*timeseries
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(timeseries{})%128]byte
}
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
defer qt.Done()
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
var tssLock sync.Mutex
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
for i := 0; i < netstorage.MaxWorkers(); i++ {
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
}
var samplesScannedTotal uint64
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
@ -1215,23 +1249,25 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = tsm.AppendTimeseriesTo(tss)
tssLock.Unlock()
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
tssLock.Lock()
tss = append(tss, &ts)
tssLock.Unlock()
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
}
return nil
})
if err != nil {
return nil, err
}
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
for i := range seriesByWorkerID {
tss = append(tss, seriesByWorkerID[i].tss...)
putTimeseriesPadded(seriesByWorkerID[i])
}
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("samplesScanned=%d", samplesScannedTotal)
return tss, nil