app/vmselect/netstorage: reduce memory allocations when unpacking time series data by using a pool for unpackWork entries

This should slightly reduce load on GC when processing queries that touch big number of time series.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/646 according to the provided memory profile there.
This commit is contained in:
Aliaksandr Valialkin 2020-07-22 14:53:54 +03:00
parent 31ae5911a8
commit dfb113f175

View file

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -172,10 +173,37 @@ type unpackWork struct {
br storage.BlockRef br storage.BlockRef
tr storage.TimeRange tr storage.TimeRange
fetchData bool fetchData bool
doneCh chan error
sb *sortBlock sb *sortBlock
doneCh chan error
} }
func (upw *unpackWork) reset() {
upw.br = storage.BlockRef{}
upw.tr = storage.TimeRange{}
upw.fetchData = false
upw.sb = nil
if n := len(upw.doneCh); n > 0 {
logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n)
}
}
func getUnpackWork() *unpackWork {
v := unpackWorkPool.Get()
if v != nil {
return v.(*unpackWork)
}
return &unpackWork{
doneCh: make(chan error, 1),
}
}
func putUnpackWork(upw *unpackWork) {
upw.reset()
unpackWorkPool.Put(upw)
}
var unpackWorkPool sync.Pool
func init() { func init() {
for i := 0; i < gomaxprocs; i++ { for i := 0; i < gomaxprocs; i++ {
go unpackWorker() go unpackWorker()
@ -206,12 +234,10 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData
// Feed workers with work // Feed workers with work
upws := make([]*unpackWork, len(pts.brs)) upws := make([]*unpackWork, len(pts.brs))
for i, br := range pts.brs { for i, br := range pts.brs {
upw := &unpackWork{ upw := getUnpackWork()
br: br, upw.br = br
tr: tr, upw.tr = tr
fetchData: fetchData, upw.fetchData = fetchData
doneCh: make(chan error, 1),
}
unpackWorkCh <- upw unpackWorkCh <- upw
upws[i] = upw upws[i] = upw
} }
@ -230,6 +256,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData
} else if upw.sb != nil { } else if upw.sb != nil {
putSortBlock(upw.sb) putSortBlock(upw.sb)
} }
putUnpackWork(upw)
} }
if firstErr != nil { if firstErr != nil {
return firstErr return firstErr