diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index de67195d2..89af8a3c5 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -91,6 +91,34 @@ type timeseriesWork struct { rowsProcessed int } +func (tsw *timeseriesWork) reset() { + tsw.mustStop = 0 + tsw.rss = nil + tsw.pts = nil + tsw.f = nil + if n := len(tsw.doneCh); n > 0 { + logger.Panicf("BUG: tsw.doneCh must be empty during reset; it contains %d items instead", n) + } + tsw.rowsProcessed = 0 +} + +func getTimeseriesWork() *timeseriesWork { + v := tswPool.Get() + if v == nil { + v = ×eriesWork{ + doneCh: make(chan error, 1), + } + } + return v.(*timeseriesWork) +} + +func putTimeseriesWork(tsw *timeseriesWork) { + tsw.reset() + tswPool.Put(tsw) +} + +var tswPool sync.Pool + func init() { for i := 0; i < gomaxprocs; i++ { go timeseriesWorker(uint(i)) @@ -147,12 +175,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { // Feed workers with work. tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) for i := range rss.packedTimeseries { - tsw := ×eriesWork{ - rss: rss, - pts: &rss.packedTimeseries[i], - f: f, - doneCh: make(chan error, 1), - } + tsw := getTimeseriesWork() + tsw.rss = rss + tsw.pts = &rss.packedTimeseries[i] + tsw.f = f timeseriesWorkCh <- tsw tsws[i] = tsw } @@ -163,7 +189,8 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { var firstErr error rowsProcessedTotal := 0 for _, tsw := range tsws { - if err := <-tsw.doneCh; err != nil && firstErr == nil { + err := <-tsw.doneCh + if err != nil && firstErr == nil { // Return just the first error, since other errors // are likely duplicate the first error. firstErr = err @@ -173,6 +200,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { } } rowsProcessedTotal += tsw.rowsProcessed + putTimeseriesWork(tsw) } perQueryRowsProcessed.Update(float64(rowsProcessedTotal))