From 7836ad89078847083ae8f9da92cafabe60a0b274 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 13 Feb 2020 14:06:51 +0200 Subject: [PATCH] lib/mergeset: skip createing temporary part objects when merging source inmemory parts This should reduce CPU usage when adding new entries to inverted index. This should alos prevent from creating stalled cleaner goroutines for the created temporary parts, since they were never closed. This should fix the following issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/316 . --- lib/mergeset/table.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index ea81ebd7c8..40e66f8bea 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -565,7 +565,7 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) { func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrapper { // Convert blocksToMerge into inmemoryPart's - pws := make([]*partWrapper, 0, len(blocksToMerge)) + mps := make([]*inmemoryPart, 0, len(blocksToMerge)) for _, ib := range blocksToMerge { if len(ib.items) == 0 { continue @@ -573,24 +573,25 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe mp := getInmemoryPart() mp.Init(ib) putInmemoryBlock(ib) + mps = append(mps, mp) + } + if len(mps) == 0 { + return nil + } + if len(mps) == 1 { + // Nothing to merge. Just return a single inmemory part. + mp := mps[0] p := mp.NewPart() - pw := &partWrapper{ + return &partWrapper{ p: p, mp: mp, refCount: 1, } - pws = append(pws, pw) - } - if len(pws) == 0 { - return nil - } - if len(pws) == 1 { - return pws[0] } defer func() { // Return source inmemoryParts to pool. - for _, pw := range pws { - putInmemoryPart(pw.mp) + for _, mp := range mps { + putInmemoryPart(mp) } }() @@ -599,10 +600,10 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe defer atomic.AddUint64(&tb.activeMerges, ^uint64(0)) // Prepare blockStreamReaders for source parts. - bsrs := make([]*blockStreamReader, 0, len(pws)) - for _, pw := range pws { + bsrs := make([]*blockStreamReader, 0, len(mps)) + for _, mp := range mps { bsr := getBlockStreamReader() - bsr.InitFromInmemoryPart(pw.mp) + bsr.InitFromInmemoryPart(mp) bsrs = append(bsrs, bsr) }