diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 7f2fa5ccfa..4805f1ad7a 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -114,12 +114,13 @@ again: default: } - bsr := heap.Pop(&bsm.bsrHeap).(*blockStreamReader) + bsr := bsm.bsrHeap[0] var nextItem []byte hasNextItem := false - if len(bsm.bsrHeap) > 0 { - nextItem = bsm.bsrHeap[0].bh.firstItem + if len(bsm.bsrHeap) > 1 { + bsr := bsm.bsrHeap.getNextReader() + nextItem = bsr.bh.firstItem hasNextItem = true } items := bsr.Block.items @@ -139,19 +140,20 @@ again: if bsr.blockItemIdx == len(bsr.Block.items) { // bsr.Block is fully read. Proceed to the next block. if bsr.Next() { - heap.Push(&bsm.bsrHeap, bsr) + heap.Fix(&bsm.bsrHeap, 0) goto again } if err := bsr.Error(); err != nil { return fmt.Errorf("cannot read storageBlock: %w", err) } + heap.Pop(&bsm.bsrHeap) goto again } // The next item in the bsr.Block exceeds nextItem. // Adjust bsr.bh.firstItem and return bsr to heap. bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...) - heap.Push(&bsm.bsrHeap, bsr) + heap.Fix(&bsm.bsrHeap, 0) goto again } @@ -201,6 +203,21 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it type bsrHeap []*blockStreamReader +func (h bsrHeap) getNextReader() *blockStreamReader { + if len(h) < 2 { + return nil + } + if len(h) < 3 { + return h[1] + } + a := h[1] + b := h[2] + if string(a.bh.firstItem) <= string(b.bh.firstItem) { + return a + } + return b +} + func (bh *bsrHeap) Len() int { return len(*bh) }