From 18af01c38790d3337b72e9e6c80d4d5818f78b34 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 20 Mar 2023 15:37:00 -0700 Subject: [PATCH] app/vmselect: optimize incremental aggregates a bit Substitute sync.Map with an ordinary slice indexed by workerID. This should reduce the overhead when updating the incremental aggregate state --- app/vmselect/netstorage/netstorage.go | 18 ++++++++---- app/vmselect/promql/aggr_incremental.go | 39 ++++++++++++++----------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 02f1a475d..180c3bfe9 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -225,10 +225,17 @@ type result struct { var resultPool sync.Pool +// MaxWorkers returns the maximum number of workers netstorage can spin when calling RunParallel() +func MaxWorkers() int { + return gomaxprocs +} + +var gomaxprocs = cgroup.AvailableCPUs() + // RunParallel runs f in parallel for all the results from rss. // // f shouldn't hold references to rs after returning. -// workerID is the id of the worker goroutine that calls f. +// workerID is the id of the worker goroutine that calls f. The workerID is in the range [0..MaxWorkers()-1]. // Data processing is immediately stopped if f returns non-nil error. // // rss becomes unusable after the call to RunParallel. @@ -262,7 +269,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke tsw.f = f tsw.mustStop = &mustStop } - if gomaxprocs == 1 || tswsLen == 1 { + maxWorkers := MaxWorkers() + if maxWorkers == 1 || tswsLen == 1 { // It is faster to process time series in the current goroutine. tsw := getTimeseriesWork() tmpResult := getTmpResult() @@ -298,8 +306,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke // Prepare worker channels. workers := len(tsws) - if workers > gomaxprocs { - workers = gomaxprocs + if workers > maxWorkers { + workers = maxWorkers } itemsPerWorker := (len(tsws) + workers - 1) / workers workChs := make([]chan *timeseriesWork, workers) @@ -351,8 +359,6 @@ var ( seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`) ) -var gomaxprocs = cgroup.AvailableCPUs() - type packedTimeseries struct { metricName string addrs []tmpBlockAddr diff --git a/app/vmselect/promql/aggr_incremental.go b/app/vmselect/promql/aggr_incremental.go index 26f91ed0e..f3d5bd416 100644 --- a/app/vmselect/promql/aggr_incremental.go +++ b/app/vmselect/promql/aggr_incremental.go @@ -3,8 +3,9 @@ package promql import ( "math" "strings" - "sync" + "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/metricsql" ) @@ -63,31 +64,36 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{ }, } +type incrementalAggrContextMap struct { + m map[string]*incrementalAggrContext + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(map[string]*incrementalAggrContext{})%128]byte +} + type incrementalAggrFuncContext struct { ae *metricsql.AggrFuncExpr - m sync.Map + byWorkerID []incrementalAggrContextMap callbacks *incrementalAggrFuncCallbacks } func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext { return &incrementalAggrFuncContext{ - ae: ae, - callbacks: callbacks, + ae: ae, + byWorkerID: make([]incrementalAggrContextMap, netstorage.MaxWorkers()), + callbacks: callbacks, } } func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) { - v, ok := iafc.m.Load(workerID) - if !ok { - // It is safe creating and storing m in iafc.m without locking, - // since it is guaranteed that only a single goroutine can execute - // code for the given workerID at a time. - v = make(map[string]*incrementalAggrContext, 1) - iafc.m.Store(workerID, v) + v := &iafc.byWorkerID[workerID] + if v.m == nil { + v.m = make(map[string]*incrementalAggrContext, 1) } - m := v.(map[string]*incrementalAggrContext) + m := v.m ts := tsOrig keepOriginal := iafc.callbacks.keepOriginal @@ -128,9 +134,9 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { mGlobal := make(map[string]*incrementalAggrContext) mergeAggrFunc := iafc.callbacks.mergeAggrFunc - iafc.m.Range(func(k, v interface{}) bool { - m := v.(map[string]*incrementalAggrContext) - for k, iac := range m { + byWorkerID := iafc.byWorkerID + for i := range byWorkerID { + for k, iac := range byWorkerID[i].m { iacGlobal := mGlobal[k] if iacGlobal == nil { if iafc.ae.Limit > 0 && len(mGlobal) >= iafc.ae.Limit { @@ -142,8 +148,7 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries { } mergeAggrFunc(iacGlobal, iac) } - return true - }) + } tss := make([]*timeseries, 0, len(mGlobal)) finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc for _, iac := range mGlobal {