mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
lib/mergeset: skip createing temporary part objects when merging source inmemory parts
This should reduce CPU usage when adding new entries to inverted index. This should alos prevent from creating stalled cleaner goroutines for the created temporary parts, since they were never closed. This should fix the following issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/316 .
This commit is contained in:
parent
eceaf13e5e
commit
7836ad8907
1 changed files with 15 additions and 14 deletions
|
@ -565,7 +565,7 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
|
|||
|
||||
func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrapper {
|
||||
// Convert blocksToMerge into inmemoryPart's
|
||||
pws := make([]*partWrapper, 0, len(blocksToMerge))
|
||||
mps := make([]*inmemoryPart, 0, len(blocksToMerge))
|
||||
for _, ib := range blocksToMerge {
|
||||
if len(ib.items) == 0 {
|
||||
continue
|
||||
|
@ -573,24 +573,25 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe
|
|||
mp := getInmemoryPart()
|
||||
mp.Init(ib)
|
||||
putInmemoryBlock(ib)
|
||||
mps = append(mps, mp)
|
||||
}
|
||||
if len(mps) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(mps) == 1 {
|
||||
// Nothing to merge. Just return a single inmemory part.
|
||||
mp := mps[0]
|
||||
p := mp.NewPart()
|
||||
pw := &partWrapper{
|
||||
return &partWrapper{
|
||||
p: p,
|
||||
mp: mp,
|
||||
refCount: 1,
|
||||
}
|
||||
pws = append(pws, pw)
|
||||
}
|
||||
if len(pws) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(pws) == 1 {
|
||||
return pws[0]
|
||||
}
|
||||
defer func() {
|
||||
// Return source inmemoryParts to pool.
|
||||
for _, pw := range pws {
|
||||
putInmemoryPart(pw.mp)
|
||||
for _, mp := range mps {
|
||||
putInmemoryPart(mp)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -599,10 +600,10 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe
|
|||
defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))
|
||||
|
||||
// Prepare blockStreamReaders for source parts.
|
||||
bsrs := make([]*blockStreamReader, 0, len(pws))
|
||||
for _, pw := range pws {
|
||||
bsrs := make([]*blockStreamReader, 0, len(mps))
|
||||
for _, mp := range mps {
|
||||
bsr := getBlockStreamReader()
|
||||
bsr.InitFromInmemoryPart(pw.mp)
|
||||
bsr.InitFromInmemoryPart(mp)
|
||||
bsrs = append(bsrs, bsr)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue