From bd4299fafe6d9de663446c95854bbb8ba99ef698 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Wed, 22 Jul 2020 14:53:54 +0300 Subject: [PATCH] app/vmselect/netstorage: reduce memory allocations when unpacking time series data by using a pool for unpackWork entries This should slightly reduce load on GC when processing queries that touch big number of time series. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/646 according to the provided memory profile --- app/vmselect/netstorage/netstorage.go | 46 +++++++++++++++++++++------ 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 64645f7828..187fdec9e2 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -177,10 +177,39 @@ type unpackWork struct { tr storage.TimeRange fetchData bool at *auth.Token - doneCh chan error sb *sortBlock + doneCh chan error } +func (upw *unpackWork) reset() { + upw.tbf = nil + upw.addr = tmpBlockAddr{} + upw.tr = storage.TimeRange{} + upw.fetchData = false + upw.at = nil + upw.sb = nil + if n := len(upw.doneCh); n > 0 { + logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n) + } +} + +func getUnpackWork() *unpackWork { + v := unpackWorkPool.Get() + if v != nil { + return v.(*unpackWork) + } + return &unpackWork{ + doneCh: make(chan error, 1), + } +} + +func putUnpackWork(upw *unpackWork) { + upw.reset() + unpackWorkPool.Put(upw) +} + +var unpackWorkPool sync.Pool + func init() { for i := 0; i < gomaxprocs; i++ { go unpackWorker() @@ -211,14 +240,12 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. // Feed workers with work upws := make([]*unpackWork, len(pts.addrs)) for i, addr := range pts.addrs { - upw := &unpackWork{ - tbf: tbf, - addr: addr, - tr: tr, - fetchData: fetchData, - at: at, - doneCh: make(chan error, 1), - } + upw := getUnpackWork() + upw.tbf = tbf + upw.addr = addr + upw.tr = tr + upw.fetchData = fetchData + upw.at = at unpackWorkCh <- upw upws[i] = upw } @@ -237,6 +264,7 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage. } else if upw.sb != nil { putSortBlock(upw.sb) } + putUnpackWork(upw) } if firstErr != nil { return firstErr