mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
app/vmselect: optimize incremental aggregates a bit
Substitute sync.Map with an ordinary slice indexed by workerID. This should reduce the overhead when updating the incremental aggregate state
This commit is contained in:
parent
a3bc64e7ed
commit
c780c6a280
2 changed files with 34 additions and 23 deletions
|
@ -225,10 +225,17 @@ type result struct {
|
||||||
|
|
||||||
var resultPool sync.Pool
|
var resultPool sync.Pool
|
||||||
|
|
||||||
|
// MaxWorkers returns the maximum number of workers netstorage can spin when calling RunParallel()
|
||||||
|
func MaxWorkers() int {
|
||||||
|
return gomaxprocs
|
||||||
|
}
|
||||||
|
|
||||||
|
var gomaxprocs = cgroup.AvailableCPUs()
|
||||||
|
|
||||||
// RunParallel runs f in parallel for all the results from rss.
|
// RunParallel runs f in parallel for all the results from rss.
|
||||||
//
|
//
|
||||||
// f shouldn't hold references to rs after returning.
|
// f shouldn't hold references to rs after returning.
|
||||||
// workerID is the id of the worker goroutine that calls f.
|
// workerID is the id of the worker goroutine that calls f. The workerID is in the range [0..MaxWorkers()-1].
|
||||||
// Data processing is immediately stopped if f returns non-nil error.
|
// Data processing is immediately stopped if f returns non-nil error.
|
||||||
//
|
//
|
||||||
// rss becomes unusable after the call to RunParallel.
|
// rss becomes unusable after the call to RunParallel.
|
||||||
|
@ -262,7 +269,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
||||||
tsw.f = f
|
tsw.f = f
|
||||||
tsw.mustStop = &mustStop
|
tsw.mustStop = &mustStop
|
||||||
}
|
}
|
||||||
if gomaxprocs == 1 || tswsLen == 1 {
|
maxWorkers := MaxWorkers()
|
||||||
|
if maxWorkers == 1 || tswsLen == 1 {
|
||||||
// It is faster to process time series in the current goroutine.
|
// It is faster to process time series in the current goroutine.
|
||||||
tsw := getTimeseriesWork()
|
tsw := getTimeseriesWork()
|
||||||
tmpResult := getTmpResult()
|
tmpResult := getTmpResult()
|
||||||
|
@ -298,8 +306,8 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
||||||
|
|
||||||
// Prepare worker channels.
|
// Prepare worker channels.
|
||||||
workers := len(tsws)
|
workers := len(tsws)
|
||||||
if workers > gomaxprocs {
|
if workers > maxWorkers {
|
||||||
workers = gomaxprocs
|
workers = maxWorkers
|
||||||
}
|
}
|
||||||
itemsPerWorker := (len(tsws) + workers - 1) / workers
|
itemsPerWorker := (len(tsws) + workers - 1) / workers
|
||||||
workChs := make([]chan *timeseriesWork, workers)
|
workChs := make([]chan *timeseriesWork, workers)
|
||||||
|
@ -351,8 +359,6 @@ var (
|
||||||
seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`)
|
seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`)
|
||||||
)
|
)
|
||||||
|
|
||||||
var gomaxprocs = cgroup.AvailableCPUs()
|
|
||||||
|
|
||||||
type packedTimeseries struct {
|
type packedTimeseries struct {
|
||||||
metricName string
|
metricName string
|
||||||
addrs []tmpBlockAddr
|
addrs []tmpBlockAddr
|
||||||
|
|
|
@ -3,8 +3,9 @@ package promql
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
"github.com/VictoriaMetrics/metricsql"
|
||||||
)
|
)
|
||||||
|
@ -63,31 +64,36 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type incrementalAggrContextMap struct {
|
||||||
|
m map[string]*incrementalAggrContext
|
||||||
|
|
||||||
|
// The padding prevents false sharing on widespread platforms with
|
||||||
|
// 128 mod (cache line size) = 0 .
|
||||||
|
_ [128 - unsafe.Sizeof(map[string]*incrementalAggrContext{})%128]byte
|
||||||
|
}
|
||||||
|
|
||||||
type incrementalAggrFuncContext struct {
|
type incrementalAggrFuncContext struct {
|
||||||
ae *metricsql.AggrFuncExpr
|
ae *metricsql.AggrFuncExpr
|
||||||
|
|
||||||
m sync.Map
|
byWorkerID []incrementalAggrContextMap
|
||||||
|
|
||||||
callbacks *incrementalAggrFuncCallbacks
|
callbacks *incrementalAggrFuncCallbacks
|
||||||
}
|
}
|
||||||
|
|
||||||
func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext {
|
func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext {
|
||||||
return &incrementalAggrFuncContext{
|
return &incrementalAggrFuncContext{
|
||||||
ae: ae,
|
ae: ae,
|
||||||
callbacks: callbacks,
|
byWorkerID: make([]incrementalAggrContextMap, netstorage.MaxWorkers()),
|
||||||
|
callbacks: callbacks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) {
|
func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) {
|
||||||
v, ok := iafc.m.Load(workerID)
|
v := &iafc.byWorkerID[workerID]
|
||||||
if !ok {
|
if v.m == nil {
|
||||||
// It is safe creating and storing m in iafc.m without locking,
|
v.m = make(map[string]*incrementalAggrContext, 1)
|
||||||
// since it is guaranteed that only a single goroutine can execute
|
|
||||||
// code for the given workerID at a time.
|
|
||||||
v = make(map[string]*incrementalAggrContext, 1)
|
|
||||||
iafc.m.Store(workerID, v)
|
|
||||||
}
|
}
|
||||||
m := v.(map[string]*incrementalAggrContext)
|
m := v.m
|
||||||
|
|
||||||
ts := tsOrig
|
ts := tsOrig
|
||||||
keepOriginal := iafc.callbacks.keepOriginal
|
keepOriginal := iafc.callbacks.keepOriginal
|
||||||
|
@ -128,9 +134,9 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor
|
||||||
func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
|
func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
|
||||||
mGlobal := make(map[string]*incrementalAggrContext)
|
mGlobal := make(map[string]*incrementalAggrContext)
|
||||||
mergeAggrFunc := iafc.callbacks.mergeAggrFunc
|
mergeAggrFunc := iafc.callbacks.mergeAggrFunc
|
||||||
iafc.m.Range(func(k, v interface{}) bool {
|
byWorkerID := iafc.byWorkerID
|
||||||
m := v.(map[string]*incrementalAggrContext)
|
for i := range byWorkerID {
|
||||||
for k, iac := range m {
|
for k, iac := range byWorkerID[i].m {
|
||||||
iacGlobal := mGlobal[k]
|
iacGlobal := mGlobal[k]
|
||||||
if iacGlobal == nil {
|
if iacGlobal == nil {
|
||||||
if iafc.ae.Limit > 0 && len(mGlobal) >= iafc.ae.Limit {
|
if iafc.ae.Limit > 0 && len(mGlobal) >= iafc.ae.Limit {
|
||||||
|
@ -142,8 +148,7 @@ func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
|
||||||
}
|
}
|
||||||
mergeAggrFunc(iacGlobal, iac)
|
mergeAggrFunc(iacGlobal, iac)
|
||||||
}
|
}
|
||||||
return true
|
}
|
||||||
})
|
|
||||||
tss := make([]*timeseries, 0, len(mGlobal))
|
tss := make([]*timeseries, 0, len(mGlobal))
|
||||||
finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc
|
finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc
|
||||||
for _, iac := range mGlobal {
|
for _, iac := range mGlobal {
|
||||||
|
|
Loading…
Reference in a new issue