From 7cbcbea49d9d99c5bda2412fb890d3cf6bb58605 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@victoriametrics.com> Date: Tue, 12 Jul 2022 12:50:15 +0300 Subject: [PATCH] lib/mergeset: optimize merge speed a bit Use heap.Fix instead of heap.Pop + heap.Push when merging blocks --- lib/mergeset/merge.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 7f2fa5ccfa..4805f1ad7a 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -114,12 +114,13 @@ again: default: } - bsr := heap.Pop(&bsm.bsrHeap).(*blockStreamReader) + bsr := bsm.bsrHeap[0] var nextItem []byte hasNextItem := false - if len(bsm.bsrHeap) > 0 { - nextItem = bsm.bsrHeap[0].bh.firstItem + if len(bsm.bsrHeap) > 1 { + bsr := bsm.bsrHeap.getNextReader() + nextItem = bsr.bh.firstItem hasNextItem = true } items := bsr.Block.items @@ -139,19 +140,20 @@ again: if bsr.blockItemIdx == len(bsr.Block.items) { // bsr.Block is fully read. Proceed to the next block. if bsr.Next() { - heap.Push(&bsm.bsrHeap, bsr) + heap.Fix(&bsm.bsrHeap, 0) goto again } if err := bsr.Error(); err != nil { return fmt.Errorf("cannot read storageBlock: %w", err) } + heap.Pop(&bsm.bsrHeap) goto again } // The next item in the bsr.Block exceeds nextItem. // Adjust bsr.bh.firstItem and return bsr to heap. bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...) - heap.Push(&bsm.bsrHeap, bsr) + heap.Fix(&bsm.bsrHeap, 0) goto again } @@ -201,6 +203,21 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it type bsrHeap []*blockStreamReader +func (h bsrHeap) getNextReader() *blockStreamReader { + if len(h) < 2 { + return nil + } + if len(h) < 3 { + return h[1] + } + a := h[1] + b := h[2] + if string(a.bh.firstItem) <= string(b.bh.firstItem) { + return a + } + return b +} + func (bh *bsrHeap) Len() int { return len(*bh) }