app/vmselect/netstorage: reuse timeseriesWork objects in order to reduce memory allocations

This commit is contained in:
Aliaksandr Valialkin 2021-02-16 16:08:37 +02:00
parent a3a09a3c6e
commit 73c9da16b8

View file

@ -91,6 +91,34 @@ type timeseriesWork struct {
rowsProcessed int rowsProcessed int
} }
func (tsw *timeseriesWork) reset() {
tsw.mustStop = 0
tsw.rss = nil
tsw.pts = nil
tsw.f = nil
if n := len(tsw.doneCh); n > 0 {
logger.Panicf("BUG: tsw.doneCh must be empty during reset; it contains %d items instead", n)
}
tsw.rowsProcessed = 0
}
func getTimeseriesWork() *timeseriesWork {
v := tswPool.Get()
if v == nil {
v = &timeseriesWork{
doneCh: make(chan error, 1),
}
}
return v.(*timeseriesWork)
}
func putTimeseriesWork(tsw *timeseriesWork) {
tsw.reset()
tswPool.Put(tsw)
}
var tswPool sync.Pool
func init() { func init() {
for i := 0; i < gomaxprocs; i++ { for i := 0; i < gomaxprocs; i++ {
go timeseriesWorker(uint(i)) go timeseriesWorker(uint(i))
@ -147,12 +175,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
// Feed workers with work. // Feed workers with work.
tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
for i := range rss.packedTimeseries { for i := range rss.packedTimeseries {
tsw := &timeseriesWork{ tsw := getTimeseriesWork()
rss: rss, tsw.rss = rss
pts: &rss.packedTimeseries[i], tsw.pts = &rss.packedTimeseries[i]
f: f, tsw.f = f
doneCh: make(chan error, 1),
}
timeseriesWorkCh <- tsw timeseriesWorkCh <- tsw
tsws[i] = tsw tsws[i] = tsw
} }
@ -163,7 +189,8 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
var firstErr error var firstErr error
rowsProcessedTotal := 0 rowsProcessedTotal := 0
for _, tsw := range tsws { for _, tsw := range tsws {
if err := <-tsw.doneCh; err != nil && firstErr == nil { err := <-tsw.doneCh
if err != nil && firstErr == nil {
// Return just the first error, since other errors // Return just the first error, since other errors
// are likely duplicate the first error. // are likely duplicate the first error.
firstErr = err firstErr = err
@ -173,6 +200,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
} }
} }
rowsProcessedTotal += tsw.rowsProcessed rowsProcessedTotal += tsw.rowsProcessed
putTimeseriesWork(tsw)
} }
perQueryRowsProcessed.Update(float64(rowsProcessedTotal)) perQueryRowsProcessed.Update(float64(rowsProcessedTotal))