mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmselect/netstorage: improve scalability of series unpacking on multi-CPU systems
This commit is contained in:
parent
3059e4feec
commit
6c42db87a8
1 changed files with 55 additions and 15 deletions
|
@ -80,8 +80,6 @@ func (rss *Results) Cancel() {
|
|||
rss.tbf = nil
|
||||
}
|
||||
|
||||
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
|
||||
|
||||
type timeseriesWork struct {
|
||||
mustStop *uint32
|
||||
rss *Results
|
||||
|
@ -120,16 +118,38 @@ func putTimeseriesWork(tsw *timeseriesWork) {
|
|||
|
||||
var tswPool sync.Pool
|
||||
|
||||
var timeseriesWorkChs []chan *timeseriesWork
|
||||
var timeseriesWorkIdx uint32
|
||||
|
||||
func init() {
|
||||
for i := 0; i < gomaxprocs; i++ {
|
||||
go timeseriesWorker(uint(i))
|
||||
timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs)
|
||||
for i := range timeseriesWorkChs {
|
||||
timeseriesWorkChs[i] = make(chan *timeseriesWork, 16)
|
||||
go timeseriesWorker(timeseriesWorkChs[i], uint(i))
|
||||
}
|
||||
}
|
||||
|
||||
func timeseriesWorker(workerID uint) {
|
||||
func scheduleTimeseriesWork(tsw *timeseriesWork) {
|
||||
attempts := 0
|
||||
for {
|
||||
idx := atomic.AddUint32(×eriesWorkIdx, 1) % uint32(len(timeseriesWorkChs))
|
||||
select {
|
||||
case timeseriesWorkChs[idx] <- tsw:
|
||||
return
|
||||
default:
|
||||
attempts++
|
||||
if attempts >= len(timeseriesWorkChs) {
|
||||
timeseriesWorkChs[idx] <- tsw
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
|
||||
var rs Result
|
||||
var rsLastResetTime uint64
|
||||
for tsw := range timeseriesWorkCh {
|
||||
for tsw := range ch {
|
||||
if atomic.LoadUint32(tsw.mustStop) != 0 {
|
||||
tsw.doneCh <- nil
|
||||
continue
|
||||
|
@ -185,7 +205,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
|
|||
tsw.pts = &rss.packedTimeseries[i]
|
||||
tsw.f = f
|
||||
tsw.mustStop = &mustStop
|
||||
timeseriesWorkCh <- tsw
|
||||
scheduleTimeseriesWork(tsw)
|
||||
tsws[i] = tsw
|
||||
}
|
||||
seriesProcessedTotal := len(rss.packedTimeseries)
|
||||
|
@ -218,8 +238,6 @@ type packedTimeseries struct {
|
|||
addrs []tmpBlockAddr
|
||||
}
|
||||
|
||||
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
|
||||
|
||||
type unpackWorkItem struct {
|
||||
addr tmpBlockAddr
|
||||
tr storage.TimeRange
|
||||
|
@ -283,15 +301,37 @@ func putUnpackWork(upw *unpackWork) {
|
|||
|
||||
var unpackWorkPool sync.Pool
|
||||
|
||||
var unpackWorkChs []chan *unpackWork
|
||||
var unpackWorkIdx uint32
|
||||
|
||||
func init() {
|
||||
for i := 0; i < gomaxprocs; i++ {
|
||||
go unpackWorker()
|
||||
unpackWorkChs = make([]chan *unpackWork, gomaxprocs)
|
||||
for i := range unpackWorkChs {
|
||||
unpackWorkChs[i] = make(chan *unpackWork, 128)
|
||||
go unpackWorker(unpackWorkChs[i])
|
||||
}
|
||||
}
|
||||
|
||||
func unpackWorker() {
|
||||
func scheduleUnpackWork(uw *unpackWork) {
|
||||
attempts := 0
|
||||
for {
|
||||
idx := atomic.AddUint32(&unpackWorkIdx, 1) % uint32(len(unpackWorkChs))
|
||||
select {
|
||||
case unpackWorkChs[idx] <- uw:
|
||||
return
|
||||
default:
|
||||
attempts++
|
||||
if attempts >= len(unpackWorkChs) {
|
||||
unpackWorkChs[idx] <- uw
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func unpackWorker(ch <-chan *unpackWork) {
|
||||
var tmpBlock storage.Block
|
||||
for upw := range unpackWorkCh {
|
||||
for upw := range ch {
|
||||
upw.unpack(&tmpBlock)
|
||||
}
|
||||
}
|
||||
|
@ -320,7 +360,7 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
|||
upw.at = at
|
||||
for _, addr := range pts.addrs {
|
||||
if len(upw.ws) >= unpackBatchSize {
|
||||
unpackWorkCh <- upw
|
||||
scheduleUnpackWork(upw)
|
||||
upws = append(upws, upw)
|
||||
upw = getUnpackWork()
|
||||
upw.tbf = tbf
|
||||
|
@ -331,7 +371,7 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
|||
tr: tr,
|
||||
})
|
||||
}
|
||||
unpackWorkCh <- upw
|
||||
scheduleUnpackWork(upw)
|
||||
upws = append(upws, upw)
|
||||
pts.addrs = pts.addrs[:0]
|
||||
|
||||
|
|
Loading…
Reference in a new issue