Revert "lib/mergeset: remove a pool for inmemoryBlock structs"

This reverts commit 793fe39921.

Reason to revert: production testing revealed possible slowdown when registering big number of new time series
This commit is contained in:
Aliaksandr Valialkin 2021-05-28 01:09:32 +03:00
parent 7b33bc67a1
commit 6865f3b497
3 changed files with 19 additions and 7 deletions

View file

@ -540,6 +540,17 @@ func putLensBuffer(lb *lensBuffer) {
lensBufferPool.Put(lb) lensBufferPool.Put(lb)
} }
func newInmemoryBlock() *inmemoryBlock { func getInmemoryBlock() *inmemoryBlock {
return &inmemoryBlock{} v := ibPool.Get()
if v == nil {
return &inmemoryBlock{}
}
return v.(*inmemoryBlock)
} }
func putInmemoryBlock(ib *inmemoryBlock) {
ib.Reset()
ibPool.Put(ib)
}
var ibPool sync.Pool

View file

@ -316,7 +316,7 @@ func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
ps.sb.lensData = bytesutil.Resize(ps.sb.lensData, int(bh.lensBlockSize)) ps.sb.lensData = bytesutil.Resize(ps.sb.lensData, int(bh.lensBlockSize))
ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset)) ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset))
ib := newInmemoryBlock() ib := getInmemoryBlock()
if err := ib.UnmarshalData(&ps.sb, bh.firstItem, bh.commonPrefix, bh.itemsCount, bh.marshalType); err != nil { if err := ib.UnmarshalData(&ps.sb, bh.firstItem, bh.commonPrefix, bh.itemsCount, bh.marshalType); err != nil {
return nil, fmt.Errorf("cannot unmarshal storage block with %d items: %w", bh.itemsCount, err) return nil, fmt.Errorf("cannot unmarshal storage block with %d items: %w", bh.itemsCount, err)
} }

View file

@ -182,15 +182,16 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
ris.mu.Lock() ris.mu.Lock()
ibs := ris.ibs ibs := ris.ibs
if len(ibs) == 0 { if len(ibs) == 0 {
ib := newInmemoryBlock() ib := getInmemoryBlock()
ibs = append(ibs, ib) ibs = append(ibs, ib)
ris.ibs = ibs ris.ibs = ibs
} }
ib := ibs[len(ibs)-1] ib := ibs[len(ibs)-1]
for _, item := range items { for _, item := range items {
if !ib.Add(item) { if !ib.Add(item) {
ib = newInmemoryBlock() ib = getInmemoryBlock()
if !ib.Add(item) { if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item)) err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))
break break
} }
@ -674,13 +675,13 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrapper { func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrapper {
// Convert blocksToMerge into inmemoryPart's // Convert blocksToMerge into inmemoryPart's
mps := make([]*inmemoryPart, 0, len(blocksToMerge)) mps := make([]*inmemoryPart, 0, len(blocksToMerge))
for i, ib := range blocksToMerge { for _, ib := range blocksToMerge {
if len(ib.items) == 0 { if len(ib.items) == 0 {
continue continue
} }
mp := getInmemoryPart() mp := getInmemoryPart()
mp.Init(ib) mp.Init(ib)
blocksToMerge[i] = nil putInmemoryBlock(ib)
mps = append(mps, mp) mps = append(mps, mp)
} }
if len(mps) == 0 { if len(mps) == 0 {