app/vmselect/netstorage: reduce memory allocations when unpacking time series

Unpack time series with less than 4M samples in the currently running goroutine.
Previously a new goroutine was being started for unpacking the samples.
This was requiring additional memory allocations.
This commit is contained in:
Aliaksandr Valialkin 2023-01-09 23:10:41 -08:00
parent c8bd3534cb
commit 158a280822
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -210,7 +210,7 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
// (e.g. the number of available CPU cores).
itemsPerWorker := 1
if len(rss.packedTimeseries) > gomaxprocs {
itemsPerWorker = 1 + len(rss.packedTimeseries)/gomaxprocs
itemsPerWorker = (len(rss.packedTimeseries) + gomaxprocs - 1) / gomaxprocs
}
var start int
var i uint
@ -367,26 +367,34 @@ func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) {
}
func unpackWorker(ch <-chan *unpackWork) {
v := tmpBlockPool.Get()
if v == nil {
v = &storage.Block{}
}
tmpBlock := v.(*storage.Block)
tmpBlock := getTmpStorageBlock()
for upw := range ch {
upw.unpack(tmpBlock)
}
tmpBlockPool.Put(v)
putTmpStorageBlock(tmpBlock)
}
var tmpBlockPool sync.Pool
func getTmpStorageBlock() *storage.Block {
v := tmpStorageBlockPool.Get()
if v == nil {
v = &storage.Block{}
}
return v.(*storage.Block)
}
func putTmpStorageBlock(sb *storage.Block) {
tmpStorageBlockPool.Put(sb)
}
var tmpStorageBlockPool sync.Pool
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
//
// It is better to load a single goroutine for up to one second on a system with many CPU cores
// It is better to load a single goroutine for up to 100ms on a system with many CPU cores
// in order to reduce inter-CPU memory ping-pong.
// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows.
// So the batch size should be 40M / 8K = 5K.
var unpackBatchSize = 5000
// So the batch size should be 100ms * 40M / 8K = 500.
var unpackBatchSize = 500
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr storage.TimeRange) error {
@ -394,12 +402,58 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
}
sbh := getSortBlocksHeap()
var err error
sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbfs, tr)
if err != nil {
putSortBlocksHeap(sbh)
return err
}
dedupInterval := storage.GetDedupInterval()
mergeSortBlocks(dst, sbh, dedupInterval)
putSortBlocksHeap(sbh)
return nil
}
// Spin up local workers.
func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) {
addrsLen := len(pts.addrs)
upwsLen := (addrsLen + unpackBatchSize - 1) / unpackBatchSize
if upwsLen == 1 {
// Fast path for common case - unpack all the data in the current goroutine
upw := getUnpackWork()
upw.tbfs = tbfs
for _, addr := range pts.addrs {
upw.ws = append(upw.ws, unpackWorkItem{
addr: addr,
tr: tr,
})
}
pts.addrs = pts.addrs[:0]
tmpBlock := getTmpStorageBlock()
upw.unpack(tmpBlock)
putTmpStorageBlock(tmpBlock)
if err := <-upw.doneCh; err != nil {
return dst, err
}
samples := 0
for _, sb := range upw.sbs {
samples += len(sb.Timestamps)
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
return dst, fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
}
dst = append(dst, sb)
}
putUnpackWork(upw)
return dst, nil
}
// Slow path - spin up multiple local workers for parallel data unpacking.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
addrsLen := len(pts.addrs)
workers := addrsLen / unpackBatchSize
workers := upwsLen
if workers > gomaxprocs {
workers = gomaxprocs
}
@ -409,11 +463,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
workChs := make([]chan *unpackWork, workers)
var workChsWG sync.WaitGroup
for i := 0; i < workers; i++ {
// Use unbuffered channel on purpose, since there are high chances
// that only a single unpackWork is needed to unpack.
// The unbuffered channel should reduce inter-CPU ping-pong in this case,
// which should improve the performance in a system with many CPU cores.
workChs[i] = make(chan *unpackWork)
workChs[i] = make(chan *unpackWork, 1)
workChsWG.Add(1)
go func(workerID int) {
defer workChsWG.Done()
@ -422,7 +472,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
}
// Feed workers with work
upws := make([]*unpackWork, 0, 1+addrsLen/unpackBatchSize)
upws := make([]*unpackWork, 0, upwsLen)
upw := getUnpackWork()
upw.tbfs = tbfs
for _, addr := range pts.addrs {
@ -441,14 +491,8 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
upws = append(upws, upw)
pts.addrs = pts.addrs[:0]
// Wait until work is complete
// Collect the unpacked sortBlock items
samples := 0
sbh := getSortBlocksHeap()
sbs := sbh.sbs
if n := addrsLen - cap(sbs); n > 0 {
sbs = append(sbs[:cap(sbs)], make([]*sortBlock, n)...)
}
sbs = sbs[:0]
var firstErr error
for _, upw := range upws {
if err := <-upw.doneCh; err != nil && firstErr == nil {
@ -458,22 +502,20 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
if firstErr == nil {
for _, sb := range upw.sbs {
samples += len(sb.Timestamps)
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
break
}
dst = append(dst, sb)
}
if *maxSamplesPerSeries <= 0 || samples < *maxSamplesPerSeries {
sbs = append(sbs, upw.sbs...)
} else {
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
}
}
if firstErr != nil {
} else {
for _, sb := range upw.sbs {
putSortBlock(sb)
}
}
putUnpackWork(upw)
}
sbh.sbs = sbs
// Shut down local workers
for _, workCh := range workChs {
@ -481,13 +523,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
}
workChsWG.Wait()
if firstErr != nil {
return firstErr
}
dedupInterval := storage.GetDedupInterval()
mergeSortBlocks(dst, sbh, dedupInterval)
putSortBlocksHeap(sbh)
return nil
return dst, firstErr
}
func getSortBlock() *sortBlock {