mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/netstorage: reduce memory allocations when unpacking time series
Unpack time series with less than 400K samples in the currently running goroutine. Previously a new goroutine was being started for unpacking the samples. This was requiring additional memory allocations.
This commit is contained in:
parent
9a563a6aef
commit
e640ff72f1
1 changed files with 79 additions and 43 deletions
|
@ -193,7 +193,7 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
|||
// (e.g. the number of available CPU cores).
|
||||
itemsPerWorker := 1
|
||||
if len(rss.packedTimeseries) > gomaxprocs {
|
||||
itemsPerWorker = 1 + len(rss.packedTimeseries)/gomaxprocs
|
||||
itemsPerWorker = (len(rss.packedTimeseries) + gomaxprocs - 1) / gomaxprocs
|
||||
}
|
||||
var start int
|
||||
var i uint
|
||||
|
@ -350,26 +350,34 @@ func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) {
|
|||
}
|
||||
|
||||
func unpackWorker(ch <-chan *unpackWork) {
|
||||
v := tmpBlockPool.Get()
|
||||
if v == nil {
|
||||
v = &storage.Block{}
|
||||
}
|
||||
tmpBlock := v.(*storage.Block)
|
||||
tmpBlock := getTmpStorageBlock()
|
||||
for upw := range ch {
|
||||
upw.unpack(tmpBlock)
|
||||
}
|
||||
tmpBlockPool.Put(v)
|
||||
putTmpStorageBlock(tmpBlock)
|
||||
}
|
||||
|
||||
var tmpBlockPool sync.Pool
|
||||
func getTmpStorageBlock() *storage.Block {
|
||||
v := tmpStorageBlockPool.Get()
|
||||
if v == nil {
|
||||
v = &storage.Block{}
|
||||
}
|
||||
return v.(*storage.Block)
|
||||
}
|
||||
|
||||
func putTmpStorageBlock(sb *storage.Block) {
|
||||
tmpStorageBlockPool.Put(sb)
|
||||
}
|
||||
|
||||
var tmpStorageBlockPool sync.Pool
|
||||
|
||||
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
|
||||
//
|
||||
// It is better to load a single goroutine for up to one second on a system with many CPU cores
|
||||
// It is better to load a single goroutine for up to 100ms on a system with many CPU cores
|
||||
// in order to reduce inter-CPU memory ping-pong.
|
||||
// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows.
|
||||
// So the batch size should be 40M / 8K = 5K.
|
||||
var unpackBatchSize = 5000
|
||||
// So the batch size should be 100ms * 40M / 8K = 500.
|
||||
var unpackBatchSize = 500
|
||||
|
||||
// Unpack unpacks pts to dst.
|
||||
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error {
|
||||
|
@ -377,12 +385,58 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
||||
}
|
||||
sbh := getSortBlocksHeap()
|
||||
var err error
|
||||
sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbf, tr)
|
||||
if err != nil {
|
||||
putSortBlocksHeap(sbh)
|
||||
return err
|
||||
}
|
||||
dedupInterval := storage.GetDedupInterval()
|
||||
mergeSortBlocks(dst, sbh, dedupInterval)
|
||||
putSortBlocksHeap(sbh)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Spin up local workers.
|
||||
func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) {
|
||||
brsLen := len(pts.brs)
|
||||
upwsLen := (brsLen + unpackBatchSize - 1) / unpackBatchSize
|
||||
if upwsLen == 1 {
|
||||
// Fast path for common case - unpack all the data in the current goroutine
|
||||
upw := getUnpackWork()
|
||||
upw.tbf = tbf
|
||||
for _, br := range pts.brs {
|
||||
upw.ws = append(upw.ws, unpackWorkItem{
|
||||
br: br,
|
||||
tr: tr,
|
||||
})
|
||||
}
|
||||
pts.brs = pts.brs[:0]
|
||||
tmpBlock := getTmpStorageBlock()
|
||||
upw.unpack(tmpBlock)
|
||||
putTmpStorageBlock(tmpBlock)
|
||||
|
||||
if err := <-upw.doneCh; err != nil {
|
||||
return dst, err
|
||||
}
|
||||
samples := 0
|
||||
for _, sb := range upw.sbs {
|
||||
samples += len(sb.Timestamps)
|
||||
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
|
||||
return dst, fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
|
||||
"or reduce time range for the query", *maxSamplesPerSeries)
|
||||
}
|
||||
dst = append(dst, sb)
|
||||
}
|
||||
|
||||
putUnpackWork(upw)
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
// Slow path - spin up multiple local workers for parallel data unpacking.
|
||||
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
|
||||
// which reduces the scalability on systems with many CPU cores.
|
||||
brsLen := len(pts.brs)
|
||||
workers := brsLen / unpackBatchSize
|
||||
workers := upwsLen
|
||||
if workers > gomaxprocs {
|
||||
workers = gomaxprocs
|
||||
}
|
||||
|
@ -392,11 +446,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
workChs := make([]chan *unpackWork, workers)
|
||||
var workChsWG sync.WaitGroup
|
||||
for i := 0; i < workers; i++ {
|
||||
// Use unbuffered channel on purpose, since there are high chances
|
||||
// that only a single unpackWork is needed to unpack.
|
||||
// The unbuffered channel should reduce inter-CPU ping-pong in this case,
|
||||
// which should improve the performance in a system with many CPU cores.
|
||||
workChs[i] = make(chan *unpackWork)
|
||||
workChs[i] = make(chan *unpackWork, 1)
|
||||
workChsWG.Add(1)
|
||||
go func(workerID int) {
|
||||
defer workChsWG.Done()
|
||||
|
@ -405,7 +455,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
}
|
||||
|
||||
// Feed workers with work
|
||||
upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize)
|
||||
upws := make([]*unpackWork, 0, upwsLen)
|
||||
upw := getUnpackWork()
|
||||
upw.tbf = tbf
|
||||
for _, br := range pts.brs {
|
||||
|
@ -424,14 +474,8 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
upws = append(upws, upw)
|
||||
pts.brs = pts.brs[:0]
|
||||
|
||||
// Wait until work is complete
|
||||
// Collect the unpacked sortBlock items
|
||||
samples := 0
|
||||
sbh := getSortBlocksHeap()
|
||||
sbs := sbh.sbs
|
||||
if n := brsLen - cap(sbs); n > 0 {
|
||||
sbs = append(sbs[:cap(sbs)], make([]*sortBlock, n)...)
|
||||
}
|
||||
sbs = sbs[:0]
|
||||
var firstErr error
|
||||
for _, upw := range upws {
|
||||
if err := <-upw.doneCh; err != nil && firstErr == nil {
|
||||
|
@ -441,22 +485,20 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
if firstErr == nil {
|
||||
for _, sb := range upw.sbs {
|
||||
samples += len(sb.Timestamps)
|
||||
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
|
||||
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
|
||||
"or reduce time range for the query", *maxSamplesPerSeries)
|
||||
break
|
||||
}
|
||||
dst = append(dst, sb)
|
||||
}
|
||||
if *maxSamplesPerSeries <= 0 || samples < *maxSamplesPerSeries {
|
||||
sbs = append(sbs, upw.sbs...)
|
||||
} else {
|
||||
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
|
||||
"or reduce time range for the query", *maxSamplesPerSeries)
|
||||
}
|
||||
}
|
||||
if firstErr != nil {
|
||||
} else {
|
||||
for _, sb := range upw.sbs {
|
||||
putSortBlock(sb)
|
||||
}
|
||||
}
|
||||
putUnpackWork(upw)
|
||||
}
|
||||
sbh.sbs = sbs
|
||||
|
||||
// Shut down local workers
|
||||
for _, workCh := range workChs {
|
||||
|
@ -464,13 +506,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
}
|
||||
workChsWG.Wait()
|
||||
|
||||
if firstErr != nil {
|
||||
return firstErr
|
||||
}
|
||||
dedupInterval := storage.GetDedupInterval()
|
||||
mergeSortBlocks(dst, sbh, dedupInterval)
|
||||
putSortBlocksHeap(sbh)
|
||||
return nil
|
||||
return dst, firstErr
|
||||
}
|
||||
|
||||
func getSortBlock() *sortBlock {
|
||||
|
|
Loading…
Reference in a new issue