diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index c4c32081e..acd7ce35b 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "sync/atomic" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" @@ -904,8 +905,12 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName if err != nil { return nil, err } - tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) - var tssLock sync.Mutex + + seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) + for i := 0; i < netstorage.MaxWorkers(); i++ { + seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) + } + var samplesScannedTotal uint64 keepMetricNames := getKeepMetricNames(expr) doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) { @@ -915,20 +920,22 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = tsm.AppendTimeseriesTo(tss) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = append(tss, &ts) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return values, timestamps }) + tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) + for i := range seriesByWorkerID { + tss = append(tss, seriesByWorkerID[i].tss...) + putTimeseriesPadded(seriesByWorkerID[i]) + } + rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal) return tss, nil @@ -1199,12 +1206,39 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, return tss, nil } +var tspPool sync.Pool + +func getTimeseriesPadded() *timeseriesWithPadding { + v := tspPool.Get() + if v == nil { + return ×eriesWithPadding{} + } + return v.(*timeseriesWithPadding) +} + +func putTimeseriesPadded(tsp *timeseriesWithPadding) { + tsp.tss = tsp.tss[:0] + tspPool.Put(tsp) +} + +type timeseriesWithPadding struct { + tss []*timeseries + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(timeseries{})%128]byte +} + func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() - tss := make([]*timeseries, 0, rss.Len()*len(rcs)) - var tssLock sync.Mutex + + seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) + for i := 0; i < netstorage.MaxWorkers(); i++ { + seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) + } + var samplesScannedTotal uint64 err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) @@ -1213,23 +1247,25 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = tsm.AppendTimeseriesTo(tss) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) atomic.AddUint64(&samplesScannedTotal, samplesScanned) - tssLock.Lock() - tss = append(tss, &ts) - tssLock.Unlock() + seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return nil }) if err != nil { return nil, err } + tss := make([]*timeseries, 0, rss.Len()*len(rcs)) + for i := range seriesByWorkerID { + tss = append(tss, seriesByWorkerID[i].tss...) + putTimeseriesPadded(seriesByWorkerID[i]) + } + rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("samplesScanned=%d", samplesScannedTotal) return tss, nil