app/vmselect/netstorage: reduce CPU contention when upacking time series blocks by unpacking batches of such blocks instead of a single block

This should improve query performance on systems with big number of CPU cores (16 and more)
This commit is contained in:
Aliaksandr Valialkin 2020-08-06 17:42:15 +03:00
parent d20c2156e4
commit 3f85c06b65

View file

@ -169,24 +169,50 @@ type packedTimeseries struct {
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
type unpackWorkItem struct {
br storage.BlockRef
tr storage.TimeRange
}
type unpackWork struct {
br storage.BlockRef
tr storage.TimeRange
ws []unpackWorkItem
fetchData bool
sb *sortBlock
sbs []*sortBlock
doneCh chan error
}
func (upw *unpackWork) reset() {
upw.br = storage.BlockRef{}
upw.tr = storage.TimeRange{}
ws := upw.ws
for i := range ws {
w := &ws[i]
w.br = storage.BlockRef{}
w.tr = storage.TimeRange{}
}
upw.ws = upw.ws[:0]
upw.fetchData = false
upw.sb = nil
sbs := upw.sbs
for i := range sbs {
sbs[i] = nil
}
upw.sbs = upw.sbs[:0]
if n := len(upw.doneCh); n > 0 {
logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n)
}
}
func (upw *unpackWork) unpack() {
for _, w := range upw.ws {
sb := getSortBlock()
if err := sb.unpackFrom(w.br, w.tr, upw.fetchData); err != nil {
putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
return
}
upw.sbs = append(upw.sbs, sb)
}
upw.doneCh <- nil
}
func getUnpackWork() *unpackWork {
v := unpackWorkPool.Get()
if v != nil {
@ -212,17 +238,15 @@ func init() {
func unpackWorker() {
for upw := range unpackWorkCh {
sb := getSortBlock()
if err := sb.unpackFrom(upw.br, upw.tr, upw.fetchData); err != nil {
putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
continue
}
upw.sb = sb
upw.doneCh <- nil
upw.unpack()
}
}
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
//
// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system.
const unpackBatchSize = 16
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error {
dst.reset()
@ -232,15 +256,23 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData
}
// Feed workers with work
upws := make([]*unpackWork, len(pts.brs))
for i, br := range pts.brs {
upw := getUnpackWork()
upw.br = br
upw.tr = tr
upw.fetchData = fetchData
unpackWorkCh <- upw
upws[i] = upw
upws := make([]*unpackWork, 0, 1+len(pts.brs)/unpackBatchSize)
upw := getUnpackWork()
upw.fetchData = fetchData
for _, br := range pts.brs {
if len(upw.ws) >= unpackBatchSize {
unpackWorkCh <- upw
upws = append(upws, upw)
upw = getUnpackWork()
upw.fetchData = fetchData
}
upw.ws = append(upw.ws, unpackWorkItem{
br: br,
tr: tr,
})
}
unpackWorkCh <- upw
upws = append(upws, upw)
pts.brs = pts.brs[:0]
// Wait until work is complete
@ -252,9 +284,11 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData
firstErr = err
}
if firstErr == nil {
sbs = append(sbs, upw.sb)
} else if upw.sb != nil {
putSortBlock(upw.sb)
sbs = append(sbs, upw.sbs...)
} else {
for _, sb := range upw.sbs {
putSortBlock(sb)
}
}
putUnpackWork(upw)
}