From 7205c79c5a617faff1c77b3b316f9fd53633b529 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Sat, 25 Mar 2023 09:07:12 +0300 Subject: [PATCH] app/vmselect/promql: use lock-less approach to gather results of parallel processing for `evalRollup*` funcs (#4004) * vmselect/promql: refactor `evalRollupNoIncrementalAggregate` to use lock-less approach for parallel workers computation Locking there is causing issues when running on highly multi-core system as it introduces lock contention during results merge. New implementation uses lock less approach to store results per workerID and merges final result in the end, this is expected to significantly reduce lock contention and CPU usage for systems with high number of cores. Related: #3966 Signed-off-by: Zakhar Bessarab * vmselect/promql: add pooling for `timeseriesWithPadding` to reduce allocations Related: #3966 Signed-off-by: Zakhar Bessarab * vmselect/promql: refactor `evalRollupFuncWithSubquery` to avoid using locks Uses same approach as `evalRollupNoIncrementalAggregate` to remove locking between workers and reduce lock contention. Related: #3966 Signed-off-by: Zakhar Bessarab --------- Signed-off-by: Zakhar Bessarab --- app/vmselect/promql/eval.go | 68 ++++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index c4c32081e..acd7ce35b 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "sync/atomic" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" @@ -904,8 +905,12 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName if err != nil { return nil, err } - tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) - var tssLock sync.Mutex + + seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) + for i := 0; i < netstorage.MaxWorkers(); i++ { + seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) + } + var samplesScannedTotal uint64 keepMetricNames := getKeepMetricNames(expr) doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) { @@ -915,20 +920,22 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = tsm.AppendTimeseriesTo(tss) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = append(tss, &ts) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return values, timestamps }) + tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) + for i := range seriesByWorkerID { + tss = append(tss, seriesByWorkerID[i].tss...) + putTimeseriesPadded(seriesByWorkerID[i]) + } + rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal) return tss, nil @@ -1199,12 +1206,39 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, return tss, nil } +var tspPool sync.Pool + +func getTimeseriesPadded() *timeseriesWithPadding { + v := tspPool.Get() + if v == nil { + return ×eriesWithPadding{} + } + return v.(*timeseriesWithPadding) +} + +func putTimeseriesPadded(tsp *timeseriesWithPadding) { + tsp.tss = tsp.tss[:0] + tspPool.Put(tsp) +} + +type timeseriesWithPadding struct { + tss []*timeseries + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(timeseries{})%128]byte +} + func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() - tss := make([]*timeseries, 0, rss.Len()*len(rcs)) - var tssLock sync.Mutex + + seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) + for i := 0; i < netstorage.MaxWorkers(); i++ { + seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) + } + var samplesScannedTotal uint64 err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) @@ -1213,23 +1247,25 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = tsm.AppendTimeseriesTo(tss) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = append(tss, &ts) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return nil }) if err != nil { return nil, err } + tss := make([]*timeseries, 0, rss.Len()*len(rcs)) + for i := range seriesByWorkerID { + tss = append(tss, seriesByWorkerID[i].tss...) + putTimeseriesPadded(seriesByWorkerID[i]) + } + rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("samplesScanned=%d", samplesScannedTotal) return tss, nil