mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmselect/netstorage: use more scalable algorithm for ditributing the work among among multiple channels on systems with big number of CPU cores
This commit is contained in:
parent
ca75432e66
commit
ed10141ff8
1 changed files with 13 additions and 3 deletions
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
"github.com/valyala/fastrand"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -119,7 +120,6 @@ func putTimeseriesWork(tsw *timeseriesWork) {
|
||||||
var tswPool sync.Pool
|
var tswPool sync.Pool
|
||||||
|
|
||||||
var timeseriesWorkChs []chan *timeseriesWork
|
var timeseriesWorkChs []chan *timeseriesWork
|
||||||
var timeseriesWorkIdx uint32
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs)
|
timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs)
|
||||||
|
@ -130,9 +130,14 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func scheduleTimeseriesWork(tsw *timeseriesWork) {
|
func scheduleTimeseriesWork(tsw *timeseriesWork) {
|
||||||
|
if len(timeseriesWorkChs) == 1 {
|
||||||
|
// Fast path for a single CPU core
|
||||||
|
timeseriesWorkChs[0] <- tsw
|
||||||
|
return
|
||||||
|
}
|
||||||
attempts := 0
|
attempts := 0
|
||||||
for {
|
for {
|
||||||
idx := atomic.AddUint32(×eriesWorkIdx, 1) % uint32(len(timeseriesWorkChs))
|
idx := fastrand.Uint32n(uint32(len(timeseriesWorkChs)))
|
||||||
select {
|
select {
|
||||||
case timeseriesWorkChs[idx] <- tsw:
|
case timeseriesWorkChs[idx] <- tsw:
|
||||||
return
|
return
|
||||||
|
@ -308,9 +313,14 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func scheduleUnpackWork(uw *unpackWork) {
|
func scheduleUnpackWork(uw *unpackWork) {
|
||||||
|
if len(unpackWorkChs) == 1 {
|
||||||
|
// Fast path for a single CPU core
|
||||||
|
unpackWorkChs[0] <- uw
|
||||||
|
return
|
||||||
|
}
|
||||||
attempts := 0
|
attempts := 0
|
||||||
for {
|
for {
|
||||||
idx := atomic.AddUint32(&unpackWorkIdx, 1) % uint32(len(unpackWorkChs))
|
idx := fastrand.Uint32n(uint32(len(unpackWorkChs)))
|
||||||
select {
|
select {
|
||||||
case unpackWorkChs[idx] <- uw:
|
case unpackWorkChs[idx] <- uw:
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue