mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect: optimize incremental aggregates a bit
Substitute sync.Map with an ordinary slice indexed by workerID. This should reduce the overhead when updating the incremental aggregate state
This commit is contained in:
parent
48ee15ac42
commit
83a8f87131
2 changed files with 34 additions and 24 deletions
|
@ -207,10 +207,17 @@ type result struct {
|
|||
|
||||
var resultPool sync.Pool
|
||||
|
||||
// MaxWorkers returns the maximum number of workers netstorage can spin when calling RunParallel()
|
||||
func MaxWorkers() int {
|
||||
return gomaxprocs
|
||||
}
|
||||
|
||||
var gomaxprocs = cgroup.AvailableCPUs()
|
||||
|
||||
// RunParallel runs f in parallel for all the results from rss.
|
||||
//
|
||||
// f shouldn't hold references to rs after returning.
|
||||
// workerID is the id of the worker goroutine that calls f.
|
||||
// workerID is the id of the worker goroutine that calls f. The workerID is in the range [0..MaxWorkers()-1].
|
||||
// Data processing is immediately stopped if f returns non-nil error.
|
||||
//
|
||||
// rss becomes unusable after the call to RunParallel.
|
||||
|
@ -244,7 +251,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
|||
tsw.f = f
|
||||
tsw.mustStop = &mustStop
|
||||
}
|
||||
if gomaxprocs == 1 || tswsLen == 1 {
|
||||
maxWorkers := MaxWorkers()
|
||||
if maxWorkers == 1 || tswsLen == 1 {
|
||||
// It is faster to process time series in the current goroutine.
|
||||
tsw := getTimeseriesWork()
|
||||
tmpResult := getTmpResult()
|
||||
|
@ -280,8 +288,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
|||
|
||||
// Prepare worker channels.
|
||||
workers := len(tsws)
|
||||
if workers > gomaxprocs {
|
||||
workers = gomaxprocs
|
||||
if workers > maxWorkers {
|
||||
workers = maxWorkers
|
||||
}
|
||||
itemsPerWorker := (len(tsws) + workers - 1) / workers
|
||||
workChs := make([]chan *timeseriesWork, workers)
|
||||
|
@ -333,8 +341,6 @@ var (
|
|||
seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`)
|
||||
)
|
||||
|
||||
var gomaxprocs = cgroup.AvailableCPUs()
|
||||
|
||||
type packedTimeseries struct {
|
||||
metricName string
|
||||
brs []blockRef
|
||||
|
@ -1017,7 +1023,6 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
|||
indexSearchDuration.UpdateDuration(startTime)
|
||||
|
||||
// Start workers that call f in parallel on available CPU cores.
|
||||
gomaxprocs := cgroup.AvailableCPUs()
|
||||
workCh := make(chan *exportWork, gomaxprocs*8)
|
||||
var (
|
||||
errGlobal error
|
||||
|
|
|
@ -3,8 +3,9 @@ package promql
|
|||
import (
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
@ -63,31 +64,36 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{
|
|||
},
|
||||
}
|
||||
|
||||
type incrementalAggrContextMap struct {
|
||||
m map[string]*incrementalAggrContext
|
||||
|
||||
// The padding prevents false sharing on widespread platforms with
|
||||
// 128 mod (cache line size) = 0 .
|
||||
_ [128 - unsafe.Sizeof(map[string]*incrementalAggrContext{})%128]byte
|
||||
}
|
||||
|
||||
type incrementalAggrFuncContext struct {
|
||||
ae *metricsql.AggrFuncExpr
|
||||
|
||||
m sync.Map
|
||||
byWorkerID []incrementalAggrContextMap
|
||||
|
||||
callbacks *incrementalAggrFuncCallbacks
|
||||
}
|
||||
|
||||
func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext {
|
||||
return &incrementalAggrFuncContext{
|
||||
ae: ae,
|
||||
callbacks: callbacks,
|
||||
ae: ae,
|
||||
byWorkerID: make([]incrementalAggrContextMap, netstorage.MaxWorkers()),
|
||||
callbacks: callbacks,
|
||||
}
|
||||
}
|
||||
|
||||
func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) {
|
||||
v, ok := iafc.m.Load(workerID)
|
||||
if !ok {
|
||||
// It is safe creating and storing m in iafc.m without locking,
|
||||
// since it is guaranteed that only a single goroutine can execute
|
||||
// code for the given workerID at a time.
|
||||
v = make(map[string]*incrementalAggrContext, 1)
|
||||
iafc.m.Store(workerID, v)
|
||||
v := &iafc.byWorkerID[workerID]
|
||||
if v.m == nil {
|
||||
v.m = make(map[string]*incrementalAggrContext, 1)
|
||||
}
|
||||
m := v.(map[string]*incrementalAggrContext)
|
||||
m := v.m
|
||||
|
||||
ts := tsOrig
|
||||
keepOriginal := iafc.callbacks.keepOriginal
|
||||
|
@ -128,9 +134,9 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor
|
|||
func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
|
||||
mGlobal := make(map[string]*incrementalAggrContext)
|
||||
mergeAggrFunc := iafc.callbacks.mergeAggrFunc
|
||||
iafc.m.Range(func(k, v interface{}) bool {
|
||||
m := v.(map[string]*incrementalAggrContext)
|
||||
for k, iac := range m {
|
||||
byWorkerID := iafc.byWorkerID
|
||||
for i := range byWorkerID {
|
||||
for k, iac := range byWorkerID[i].m {
|
||||
iacGlobal := mGlobal[k]
|
||||
if iacGlobal == nil {
|
||||
if iafc.ae.Limit > 0 && len(mGlobal) >= iafc.ae.Limit {
|
||||
|
@ -142,8 +148,7 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
|
|||
}
|
||||
mergeAggrFunc(iacGlobal, iac)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
tss := make([]*timeseries, 0, len(mGlobal))
|
||||
finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc
|
||||
for _, iac := range mGlobal {
|
||||
|
|
Loading…
Reference in a new issue