diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 344ba86b3..43b4fdd90 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" ) @@ -172,10 +173,37 @@ type unpackWork struct { br storage.BlockRef tr storage.TimeRange fetchData bool - doneCh chan error sb *sortBlock + doneCh chan error } +func (upw *unpackWork) reset() { + upw.br = storage.BlockRef{} + upw.tr = storage.TimeRange{} + upw.fetchData = false + upw.sb = nil + if n := len(upw.doneCh); n > 0 { + logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n) + } +} + +func getUnpackWork() *unpackWork { + v := unpackWorkPool.Get() + if v != nil { + return v.(*unpackWork) + } + return &unpackWork{ + doneCh: make(chan error, 1), + } +} + +func putUnpackWork(upw *unpackWork) { + upw.reset() + unpackWorkPool.Put(upw) +} + +var unpackWorkPool sync.Pool + func init() { for i := 0; i < gomaxprocs; i++ { go unpackWorker() @@ -206,12 +234,10 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData // Feed workers with work upws := make([]*unpackWork, len(pts.brs)) for i, br := range pts.brs { - upw := &unpackWork{ - br: br, - tr: tr, - fetchData: fetchData, - doneCh: make(chan error, 1), - } + upw := getUnpackWork() + upw.br = br + upw.tr = tr + upw.fetchData = fetchData unpackWorkCh <- upw upws[i] = upw } @@ -230,6 +256,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData } else if upw.sb != nil { putSortBlock(upw.sb) } + putUnpackWork(upw) } if firstErr != nil { return firstErr