mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/mergeset: do not update blockStreamReader.bh.firstItem during the merge
Just read the current item directly from blockStreamReader.Block.Items with the helper method - blockStreamReader.CurrItem()
This commit is contained in:
parent
547cb1edce
commit
3bbe9054d3
2 changed files with 18 additions and 14 deletions
|
@ -17,7 +17,8 @@ type blockStreamReader struct {
|
||||||
// Block contains the current block if Next returned true.
|
// Block contains the current block if Next returned true.
|
||||||
Block inmemoryBlock
|
Block inmemoryBlock
|
||||||
|
|
||||||
blockItemIdx int
|
// The index of the current item in the Block, which is returned from CurrItem()
|
||||||
|
currItemIdx int
|
||||||
|
|
||||||
path string
|
path string
|
||||||
|
|
||||||
|
@ -66,7 +67,7 @@ type blockStreamReader struct {
|
||||||
|
|
||||||
func (bsr *blockStreamReader) reset() {
|
func (bsr *blockStreamReader) reset() {
|
||||||
bsr.Block.Reset()
|
bsr.Block.Reset()
|
||||||
bsr.blockItemIdx = 0
|
bsr.currItemIdx = 0
|
||||||
bsr.path = ""
|
bsr.path = ""
|
||||||
bsr.ph.Reset()
|
bsr.ph.Reset()
|
||||||
bsr.mrs = nil
|
bsr.mrs = nil
|
||||||
|
@ -185,6 +186,10 @@ func (bsr *blockStreamReader) MustClose() {
|
||||||
bsr.reset()
|
bsr.reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bsr *blockStreamReader) CurrItem() string {
|
||||||
|
return bsr.Block.items[bsr.currItemIdx].String(bsr.Block.data)
|
||||||
|
}
|
||||||
|
|
||||||
func (bsr *blockStreamReader) Next() bool {
|
func (bsr *blockStreamReader) Next() bool {
|
||||||
if bsr.err != nil {
|
if bsr.err != nil {
|
||||||
return false
|
return false
|
||||||
|
@ -233,7 +238,7 @@ func (bsr *blockStreamReader) Next() bool {
|
||||||
bsr.err = fmt.Errorf("too many blocks read: %d; must be smaller than partHeader.blocksCount %d", bsr.blocksRead, bsr.ph.blocksCount)
|
bsr.err = fmt.Errorf("too many blocks read: %d; must be smaller than partHeader.blocksCount %d", bsr.blocksRead, bsr.ph.blocksCount)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
bsr.blockItemIdx = 0
|
bsr.currItemIdx = 0
|
||||||
bsr.itemsRead += uint64(len(bsr.Block.items))
|
bsr.itemsRead += uint64(len(bsr.Block.items))
|
||||||
if bsr.itemsRead > bsr.ph.itemsCount {
|
if bsr.itemsRead > bsr.ph.itemsCount {
|
||||||
bsr.err = fmt.Errorf("too many items read: %d; must be smaller than partHeader.itemsCount %d", bsr.itemsRead, bsr.ph.itemsCount)
|
bsr.err = fmt.Errorf("too many items read: %d; must be smaller than partHeader.itemsCount %d", bsr.itemsRead, bsr.ph.itemsCount)
|
||||||
|
|
|
@ -116,18 +116,18 @@ again:
|
||||||
|
|
||||||
bsr := bsm.bsrHeap[0]
|
bsr := bsm.bsrHeap[0]
|
||||||
|
|
||||||
var nextItem []byte
|
var nextItem string
|
||||||
hasNextItem := false
|
hasNextItem := false
|
||||||
if len(bsm.bsrHeap) > 1 {
|
if len(bsm.bsrHeap) > 1 {
|
||||||
bsr := bsm.bsrHeap.getNextReader()
|
bsr := bsm.bsrHeap.getNextReader()
|
||||||
nextItem = bsr.bh.firstItem
|
nextItem = bsr.CurrItem()
|
||||||
hasNextItem = true
|
hasNextItem = true
|
||||||
}
|
}
|
||||||
items := bsr.Block.items
|
items := bsr.Block.items
|
||||||
data := bsr.Block.data
|
data := bsr.Block.data
|
||||||
for bsr.blockItemIdx < len(bsr.Block.items) {
|
for bsr.currItemIdx < len(bsr.Block.items) {
|
||||||
item := items[bsr.blockItemIdx].Bytes(data)
|
item := items[bsr.currItemIdx].Bytes(data)
|
||||||
if hasNextItem && string(item) > string(nextItem) {
|
if hasNextItem && string(item) > nextItem {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if !bsm.ib.Add(item) {
|
if !bsm.ib.Add(item) {
|
||||||
|
@ -135,9 +135,9 @@ again:
|
||||||
bsm.flushIB(bsw, ph, itemsMerged)
|
bsm.flushIB(bsw, ph, itemsMerged)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
bsr.blockItemIdx++
|
bsr.currItemIdx++
|
||||||
}
|
}
|
||||||
if bsr.blockItemIdx == len(bsr.Block.items) {
|
if bsr.currItemIdx == len(bsr.Block.items) {
|
||||||
// bsr.Block is fully read. Proceed to the next block.
|
// bsr.Block is fully read. Proceed to the next block.
|
||||||
if bsr.Next() {
|
if bsr.Next() {
|
||||||
heap.Fix(&bsm.bsrHeap, 0)
|
heap.Fix(&bsm.bsrHeap, 0)
|
||||||
|
@ -151,8 +151,7 @@ again:
|
||||||
}
|
}
|
||||||
|
|
||||||
// The next item in the bsr.Block exceeds nextItem.
|
// The next item in the bsr.Block exceeds nextItem.
|
||||||
// Adjust bsr.bh.firstItem and return bsr to heap.
|
// Return bsr to heap.
|
||||||
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)
|
|
||||||
heap.Fix(&bsm.bsrHeap, 0)
|
heap.Fix(&bsm.bsrHeap, 0)
|
||||||
goto again
|
goto again
|
||||||
}
|
}
|
||||||
|
@ -212,7 +211,7 @@ func (bh bsrHeap) getNextReader() *blockStreamReader {
|
||||||
}
|
}
|
||||||
a := bh[1]
|
a := bh[1]
|
||||||
b := bh[2]
|
b := bh[2]
|
||||||
if string(a.bh.firstItem) <= string(b.bh.firstItem) {
|
if a.CurrItem() <= b.CurrItem() {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
return b
|
return b
|
||||||
|
@ -229,7 +228,7 @@ func (bh *bsrHeap) Swap(i, j int) {
|
||||||
|
|
||||||
func (bh *bsrHeap) Less(i, j int) bool {
|
func (bh *bsrHeap) Less(i, j int) bool {
|
||||||
x := *bh
|
x := *bh
|
||||||
return string(x[i].bh.firstItem) < string(x[j].bh.firstItem)
|
return x[i].CurrItem() < x[j].CurrItem()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bh *bsrHeap) Pop() interface{} {
|
func (bh *bsrHeap) Pop() interface{} {
|
||||||
|
|
Loading…
Reference in a new issue