From 3bbe9054d31827302ff4ee09854f2be41dec171c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 27 Jul 2022 23:04:58 +0300 Subject: [PATCH] lib/mergeset: do not update blockStreamReader.bh.firstItem during the merge Just read the current item directly from blockStreamReader.Block.Items with the helper method - blockStreamReader.CurrItem() --- lib/mergeset/block_stream_reader.go | 11 ++++++++--- lib/mergeset/merge.go | 21 ++++++++++----------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index a48bbe9c4a..42436716de 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -17,7 +17,8 @@ type blockStreamReader struct { // Block contains the current block if Next returned true. Block inmemoryBlock - blockItemIdx int + // The index of the current item in the Block, which is returned from CurrItem() + currItemIdx int path string @@ -66,7 +67,7 @@ type blockStreamReader struct { func (bsr *blockStreamReader) reset() { bsr.Block.Reset() - bsr.blockItemIdx = 0 + bsr.currItemIdx = 0 bsr.path = "" bsr.ph.Reset() bsr.mrs = nil @@ -185,6 +186,10 @@ func (bsr *blockStreamReader) MustClose() { bsr.reset() } +func (bsr *blockStreamReader) CurrItem() string { + return bsr.Block.items[bsr.currItemIdx].String(bsr.Block.data) +} + func (bsr *blockStreamReader) Next() bool { if bsr.err != nil { return false @@ -233,7 +238,7 @@ func (bsr *blockStreamReader) Next() bool { bsr.err = fmt.Errorf("too many blocks read: %d; must be smaller than partHeader.blocksCount %d", bsr.blocksRead, bsr.ph.blocksCount) return false } - bsr.blockItemIdx = 0 + bsr.currItemIdx = 0 bsr.itemsRead += uint64(len(bsr.Block.items)) if bsr.itemsRead > bsr.ph.itemsCount { bsr.err = fmt.Errorf("too many items read: %d; must be smaller than partHeader.itemsCount %d", bsr.itemsRead, bsr.ph.itemsCount) diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index b5eaf9cee3..41dbbf281c 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -116,18 +116,18 @@ again: bsr := bsm.bsrHeap[0] - var nextItem []byte + var nextItem string hasNextItem := false if len(bsm.bsrHeap) > 1 { bsr := bsm.bsrHeap.getNextReader() - nextItem = bsr.bh.firstItem + nextItem = bsr.CurrItem() hasNextItem = true } items := bsr.Block.items data := bsr.Block.data - for bsr.blockItemIdx < len(bsr.Block.items) { - item := items[bsr.blockItemIdx].Bytes(data) - if hasNextItem && string(item) > string(nextItem) { + for bsr.currItemIdx < len(bsr.Block.items) { + item := items[bsr.currItemIdx].Bytes(data) + if hasNextItem && string(item) > nextItem { break } if !bsm.ib.Add(item) { @@ -135,9 +135,9 @@ again: bsm.flushIB(bsw, ph, itemsMerged) continue } - bsr.blockItemIdx++ + bsr.currItemIdx++ } - if bsr.blockItemIdx == len(bsr.Block.items) { + if bsr.currItemIdx == len(bsr.Block.items) { // bsr.Block is fully read. Proceed to the next block. if bsr.Next() { heap.Fix(&bsm.bsrHeap, 0) @@ -151,8 +151,7 @@ again: } // The next item in the bsr.Block exceeds nextItem. - // Adjust bsr.bh.firstItem and return bsr to heap. - bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...) + // Return bsr to heap. heap.Fix(&bsm.bsrHeap, 0) goto again } @@ -212,7 +211,7 @@ func (bh bsrHeap) getNextReader() *blockStreamReader { } a := bh[1] b := bh[2] - if string(a.bh.firstItem) <= string(b.bh.firstItem) { + if a.CurrItem() <= b.CurrItem() { return a } return b @@ -229,7 +228,7 @@ func (bh *bsrHeap) Swap(i, j int) { func (bh *bsrHeap) Less(i, j int) bool { x := *bh - return string(x[i].bh.firstItem) < string(x[j].bh.firstItem) + return x[i].CurrItem() < x[j].CurrItem() } func (bh *bsrHeap) Pop() interface{} {