From c4756f94da54640c9c3043ac17e127f5fbceab5e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Tue, 16 Feb 2021 16:08:37 +0200 Subject: [PATCH] app/vmselect/netstorage: reuse timeseriesWork objects in order to reduce memory allocations --- app/vmselect/netstorage/netstorage.go | 42 ++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 684006f8a5..ddd0fd586d 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -90,6 +90,34 @@ type timeseriesWork struct { rowsProcessed int } +func (tsw *timeseriesWork) reset() { + tsw.mustStop = 0 + tsw.rss = nil + tsw.pts = nil + tsw.f = nil + if n := len(tsw.doneCh); n > 0 { + logger.Panicf("BUG: tsw.doneCh must be empty during reset; it contains %d items instead", n) + } + tsw.rowsProcessed = 0 +} + +func getTimeseriesWork() *timeseriesWork { + v := tswPool.Get() + if v == nil { + v = ×eriesWork{ + doneCh: make(chan error, 1), + } + } + return v.(*timeseriesWork) +} + +func putTimeseriesWork(tsw *timeseriesWork) { + tsw.reset() + tswPool.Put(tsw) +} + +var tswPool sync.Pool + func init() { for i := 0; i < gomaxprocs; i++ { go timeseriesWorker(uint(i)) @@ -143,12 +171,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { // Feed workers with work. tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) for i := range rss.packedTimeseries { - tsw := ×eriesWork{ - rss: rss, - pts: &rss.packedTimeseries[i], - f: f, - doneCh: make(chan error, 1), - } + tsw := getTimeseriesWork() + tsw.rss = rss + tsw.pts = &rss.packedTimeseries[i] + tsw.f = f timeseriesWorkCh <- tsw tsws[i] = tsw } @@ -159,7 +185,8 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { var firstErr error rowsProcessedTotal := 0 for _, tsw := range tsws { - if err := <-tsw.doneCh; err != nil && firstErr == nil { + err := <-tsw.doneCh + if err != nil && firstErr == nil { // Return just the first error, since other errors // are likely duplicate the first error. firstErr = err @@ -169,6 +196,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { } } rowsProcessedTotal += tsw.rowsProcessed + putTimeseriesWork(tsw) } perQueryRowsProcessed.Update(float64(rowsProcessedTotal))