mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmselect/netstorage: use more scalable algorithm for ditributing the work among among multiple channels on systems with big number of CPU cores
This commit is contained in:
parent
df117f85bd
commit
b92702f6d5
1 changed files with 13 additions and 3 deletions
|
@ -29,6 +29,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
"github.com/valyala/fastrand"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -122,7 +123,6 @@ func putTimeseriesWork(tsw *timeseriesWork) {
|
|||
var tswPool sync.Pool
|
||||
|
||||
var timeseriesWorkChs []chan *timeseriesWork
|
||||
var timeseriesWorkIdx uint32
|
||||
|
||||
func init() {
|
||||
timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs)
|
||||
|
@ -133,9 +133,14 @@ func init() {
|
|||
}
|
||||
|
||||
func scheduleTimeseriesWork(tsw *timeseriesWork) {
|
||||
if len(timeseriesWorkChs) == 1 {
|
||||
// Fast path for a single CPU core
|
||||
timeseriesWorkChs[0] <- tsw
|
||||
return
|
||||
}
|
||||
attempts := 0
|
||||
for {
|
||||
idx := atomic.AddUint32(×eriesWorkIdx, 1) % uint32(len(timeseriesWorkChs))
|
||||
idx := fastrand.Uint32n(uint32(len(timeseriesWorkChs)))
|
||||
select {
|
||||
case timeseriesWorkChs[idx] <- tsw:
|
||||
return
|
||||
|
@ -316,9 +321,14 @@ func init() {
|
|||
}
|
||||
|
||||
func scheduleUnpackWork(uw *unpackWork) {
|
||||
if len(unpackWorkChs) == 1 {
|
||||
// Fast path for a single CPU core
|
||||
unpackWorkChs[0] <- uw
|
||||
return
|
||||
}
|
||||
attempts := 0
|
||||
for {
|
||||
idx := atomic.AddUint32(&unpackWorkIdx, 1) % uint32(len(unpackWorkChs))
|
||||
idx := fastrand.Uint32n(uint32(len(unpackWorkChs)))
|
||||
select {
|
||||
case unpackWorkChs[idx] <- uw:
|
||||
return
|
||||
|
|
Loading…
Reference in a new issue