lib/mergeset: reduce memory usage for inmemoryBlock by using more compact items representation

This also should reduce CPU time spent by GC, since inmemoryBlock.items don't have pointers now,
so GC doesn't need visiting them.
This commit is contained in:
Aliaksandr Valialkin 2021-02-21 22:06:45 +02:00
parent 388cdb1980
commit 636c55b526
13 changed files with 248 additions and 150 deletions

View file

@ -195,7 +195,8 @@ func (bsr *blockStreamReader) Next() bool {
if err := bsr.readNextBHS(); err != nil {
if err == io.EOF {
// Check the last item.
lastItem := bsr.Block.items[len(bsr.Block.items)-1]
b := &bsr.Block
lastItem := b.items[len(b.items)-1].Bytes(b.data)
if string(bsr.ph.lastItem) != string(lastItem) {
err = fmt.Errorf("unexpected last item; got %X; want %X", lastItem, bsr.ph.lastItem)
}
@ -240,12 +241,13 @@ func (bsr *blockStreamReader) Next() bool {
}
if !bsr.firstItemChecked {
bsr.firstItemChecked = true
if string(bsr.ph.firstItem) != string(bsr.Block.items[0]) {
bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", bsr.Block.items[0], bsr.ph.firstItem)
b := &bsr.Block
firstItem := b.items[0].Bytes(b.data)
if string(bsr.ph.firstItem) != string(firstItem) {
bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", firstItem, bsr.ph.firstItem)
return false
}
}
return true
}

View file

@ -44,8 +44,10 @@ func testBlockStreamReaderRead(ip *inmemoryPart, items []string) error {
bsr := newTestBlockStreamReader(ip)
i := 0
for bsr.Next() {
for _, item := range bsr.Block.items {
if string(item) != items[i] {
data := bsr.Block.data
for _, it := range bsr.Block.items {
item := it.String(data)
if item != items[i] {
return fmt.Errorf("unexpected item[%d]; got %q; want %q", i, item, items[i])
}
i++

View file

@ -3,6 +3,7 @@ package mergeset
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
@ -13,35 +14,62 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type byteSliceSorter [][]byte
// Item represents a single item for storing in a mergeset.
type Item struct {
// Start is start offset for the item in data.
Start uint32
func (s byteSliceSorter) Len() int { return len(s) }
func (s byteSliceSorter) Less(i, j int) bool {
return string(s[i]) < string(s[j])
// End is end offset for the item in data.
End uint32
}
func (s byteSliceSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
// Bytes returns bytes representation of it obtained from data.
//
// The returned bytes representation belongs to data.
func (it Item) Bytes(data []byte) []byte {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
sh.Cap = int(it.End - it.Start)
sh.Len = int(it.End - it.Start)
sh.Data += uintptr(it.Start)
return data
}
// String returns string represetnation of it obtained from data.
//
// The returned string representation belongs to data.
func (it Item) String(data []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
sh.Data += uintptr(it.Start)
sh.Len = int(it.End - it.Start)
return *(*string)(unsafe.Pointer(sh))
}
func (ib *inmemoryBlock) Len() int { return len(ib.items) }
func (ib *inmemoryBlock) Less(i, j int) bool {
data := ib.data
items := ib.items
return string(items[i].Bytes(data)) < string(items[j].Bytes(data))
}
func (ib *inmemoryBlock) Swap(i, j int) {
items := ib.items
items[i], items[j] = items[j], items[i]
}
type inmemoryBlock struct {
commonPrefix []byte
data []byte
items byteSliceSorter
items []Item
}
func (ib *inmemoryBlock) SizeBytes() int {
return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof([]byte{}))
return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof(Item{}))
}
func (ib *inmemoryBlock) Reset() {
ib.commonPrefix = ib.commonPrefix[:0]
ib.data = ib.data[:0]
items := ib.items
for i := range items {
// Remove reference to by slice, so GC could free the byte slice.
items[i] = nil
}
ib.items = ib.items[:0]
}
@ -50,12 +78,14 @@ func (ib *inmemoryBlock) updateCommonPrefix() {
if len(ib.items) == 0 {
return
}
cp := ib.items[0]
items := ib.items
data := ib.data
cp := items[0].Bytes(data)
if len(cp) == 0 {
return
}
for _, item := range ib.items[1:] {
cpLen := commonPrefixLen(cp, item)
for _, it := range items[1:] {
cpLen := commonPrefixLen(cp, it.Bytes(data))
if cpLen == 0 {
return
}
@ -82,15 +112,21 @@ func commonPrefixLen(a, b []byte) int {
//
// false is returned if x isn't added to ib due to block size contraints.
func (ib *inmemoryBlock) Add(x []byte) bool {
if len(x)+len(ib.data) > maxInmemoryBlockSize {
data := ib.data
if len(x)+len(data) > maxInmemoryBlockSize {
return false
}
if cap(ib.data) < maxInmemoryBlockSize {
dataLen := len(ib.data)
ib.data = bytesutil.Resize(ib.data, maxInmemoryBlockSize)[:dataLen]
if cap(data) < maxInmemoryBlockSize {
dataLen := len(data)
data = bytesutil.Resize(data, maxInmemoryBlockSize)[:dataLen]
}
ib.data = append(ib.data, x...)
ib.items = append(ib.items, ib.data[len(ib.data)-len(x):])
dataLen := len(data)
data = append(data, x...)
ib.items = append(ib.items, Item{
Start: uint32(dataLen),
End: uint32(len(data)),
})
ib.data = data
return true
}
@ -100,16 +136,21 @@ func (ib *inmemoryBlock) Add(x []byte) bool {
const maxInmemoryBlockSize = 64 * 1024
func (ib *inmemoryBlock) sort() {
// Use sort.Sort instead of sort.Slice in order to eliminate memory allocation.
sort.Sort(&ib.items)
sort.Sort(ib)
data := ib.data
items := ib.items
bb := bbPool.Get()
b := bytesutil.Resize(bb.B, len(ib.data))
b := bytesutil.Resize(bb.B, len(data))
b = b[:0]
for i, item := range ib.items {
b = append(b, item...)
ib.items[i] = b[len(b)-len(item):]
for i, it := range items {
bLen := len(b)
b = append(b, it.String(data)...)
items[i] = Item{
Start: uint32(bLen),
End: uint32(len(b)),
}
}
bb.B, ib.data = ib.data, b
bb.B, ib.data = data, b
bbPool.Put(bb)
}
@ -140,7 +181,7 @@ func checkMarshalType(mt marshalType) error {
func (ib *inmemoryBlock) isSorted() bool {
// Use sort.IsSorted instead of sort.SliceIsSorted in order to eliminate memory allocation.
return sort.IsSorted(&ib.items)
return sort.IsSorted(ib)
}
// MarshalUnsortedData marshals unsorted items from ib to sb.
@ -179,9 +220,11 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo
func (ib *inmemoryBlock) debugItemsString() string {
var sb strings.Builder
var prevItem []byte
for i, item := range ib.items {
if string(item) < string(prevItem) {
var prevItem string
data := ib.data
for i, it := range ib.items {
item := it.String(data)
if item < prevItem {
fmt.Fprintf(&sb, "!!! the next item is smaller than the previous item !!!\n")
}
fmt.Fprintf(&sb, "%05d %X\n", i, item)
@ -201,7 +244,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
logger.Panicf("BUG: the number of items in the block must be smaller than %d; got %d items", uint64(1<<32), len(ib.items))
}
firstItemDst = append(firstItemDst, ib.items[0]...)
data := ib.data
firstItem := ib.items[0].Bytes(data)
firstItemDst = append(firstItemDst, firstItem...)
commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)
if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 {
@ -221,10 +266,11 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
defer encoding.PutUint64s(xs)
cpLen := len(ib.commonPrefix)
prevItem := ib.items[0][cpLen:]
prevItem := firstItem[cpLen:]
prevPrefixLen := uint64(0)
for i, item := range ib.items[1:] {
item := item[cpLen:]
for i, it := range ib.items[1:] {
it.Start += uint32(cpLen)
item := it.Bytes(data)
prefixLen := uint64(commonPrefixLen(prevItem, item))
bItems = append(bItems, item[prefixLen:]...)
xLen := prefixLen ^ prevPrefixLen
@ -240,9 +286,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
bbPool.Put(bbItems)
// Marshal lens data.
prevItemLen := uint64(len(ib.items[0]) - cpLen)
for i, item := range ib.items[1:] {
itemLen := uint64(len(item) - cpLen)
prevItemLen := uint64(len(firstItem) - cpLen)
for i, it := range ib.items[1:] {
itemLen := uint64(int(it.End-it.Start) - cpLen)
xLen := itemLen ^ prevItemLen
prevItemLen = itemLen
@ -346,11 +392,15 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix
}
data := bytesutil.Resize(ib.data, maxInmemoryBlockSize)
if n := int(itemsCount) - cap(ib.items); n > 0 {
ib.items = append(ib.items[:cap(ib.items)], make([][]byte, n)...)
ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...)
}
ib.items = ib.items[:itemsCount]
data = append(data[:0], firstItem...)
ib.items[0] = data
items := ib.items
items[0] = Item{
Start: 0,
End: uint32(len(data)),
}
prevItem := data[len(commonPrefix):]
b := bb.B
for i := 1; i < int(itemsCount); i++ {
@ -363,17 +413,19 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix
if uint64(len(b)) < suffixLen {
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", suffixLen, len(b))
}
data = append(data, commonPrefix...)
if prefixLen > uint64(len(prevItem)) {
return fmt.Errorf("prefixLen cannot exceed %d; got %d", len(prevItem), prefixLen)
}
dataLen := len(data)
data = append(data, commonPrefix...)
data = append(data, prevItem[:prefixLen]...)
data = append(data, b[:suffixLen]...)
item := data[len(data)-int(itemLen)-len(commonPrefix):]
ib.items[i] = item
items[i] = Item{
Start: uint32(dataLen),
End: uint32(len(data)),
}
b = b[suffixLen:]
prevItem = item[len(commonPrefix):]
prevItem = data[len(data)-int(itemLen):]
}
if len(b) > 0 {
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
@ -381,30 +433,33 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix
if uint64(len(data)) != dataLen {
return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen)
}
ib.data = data
if !ib.isSorted() {
return fmt.Errorf("decoded data block contains unsorted items; items:\n%s", ib.debugItemsString())
}
ib.data = data
return nil
}
var bbPool bytesutil.ByteBufferPool
func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) {
data := ib.data
// Marshal items data.
// There is no need in marshaling the first item, since it is returned
// to the caller in marshalData.
cpLen := len(ib.commonPrefix)
b := sb.itemsData[:0]
for _, item := range ib.items[1:] {
b = append(b, item[cpLen:]...)
for _, it := range ib.items[1:] {
it.Start += uint32(cpLen)
b = append(b, it.String(data)...)
}
sb.itemsData = b
// Marshal length data.
b = sb.lensData[:0]
for _, item := range ib.items[1:] {
b = encoding.MarshalUint64(b, uint64(len(item)-cpLen))
for _, it := range ib.items[1:] {
b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen))
}
sb.lensData = b
}
@ -431,26 +486,34 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte,
}
// Unmarshal items data.
ib.data = bytesutil.Resize(ib.data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount))
ib.data = append(ib.data[:0], firstItem...)
ib.items = append(ib.items[:0], ib.data)
data := ib.data
items := ib.items
data = bytesutil.Resize(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount))
data = append(data[:0], firstItem...)
items = append(items[:0], Item{
Start: 0,
End: uint32(len(data)),
})
b = sb.itemsData
for i := 1; i < int(itemsCount); i++ {
itemLen := lb.lens[i]
if uint64(len(b)) < itemLen {
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b))
}
ib.data = append(ib.data, commonPrefix...)
ib.data = append(ib.data, b[:itemLen]...)
item := ib.data[len(ib.data)-int(itemLen)-len(commonPrefix):]
ib.items = append(ib.items, item)
dataLen := len(data)
data = append(data, commonPrefix...)
data = append(data, b[:itemLen]...)
items = append(items, Item{
Start: uint32(dataLen),
End: uint32(len(data)),
})
b = b[itemLen:]
}
ib.data = data
ib.items = items
if len(b) > 0 {
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
}
return nil
}

View file

@ -37,8 +37,10 @@ func TestInmemoryBlockAdd(t *testing.T) {
if len(ib.data) != totalLen {
t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen)
}
for j, item := range ib.items {
if items[j] != string(item) {
data := ib.data
for j, it := range ib.items {
item := it.String(data)
if items[j] != item {
t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j])
}
}
@ -75,8 +77,10 @@ func TestInmemoryBlockSort(t *testing.T) {
if len(ib.data) != totalLen {
t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen)
}
for j, item := range ib.items {
if items[j] != string(item) {
data := ib.data
for j, it := range ib.items {
item := it.String(data)
if items[j] != item {
t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j])
}
}
@ -122,8 +126,9 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
if int(itemsLen) != len(ib.items) {
t.Fatalf("unexpected number of items marshaled; got %d; want %d", itemsLen, len(ib.items))
}
if string(firstItem) != string(ib.items[0]) {
t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, ib.items[0])
firstItemExpected := ib.items[0].String(ib.data)
if string(firstItem) != firstItemExpected {
t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, firstItemExpected)
}
if err := checkMarshalType(mt); err != nil {
t.Fatalf("invalid mt: %s", err)
@ -143,12 +148,15 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib2.data), totalLen)
}
for j := range items {
if len(items[j]) != len(ib2.items[j]) {
it2 := ib2.items[j]
item2 := it2.String(ib2.data)
if len(items[j]) != len(item2) {
t.Fatalf("items length mismatch at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X",
j, len(items), i, len(ib2.items[j]), ib2.items[j], len(items[j]), items[j])
j, len(items), i, len(item2), item2, len(items[j]), items[j])
}
}
for j, item := range ib2.items {
for j, it := range ib2.items {
item := it.String(ib2.data)
if items[j] != string(item) {
t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X",
j, len(items), i, len(item), item, len(items[j]), items[j])

View file

@ -56,8 +56,8 @@ func (ip *inmemoryPart) Init(ib *inmemoryBlock) {
ip.ph.itemsCount = uint64(len(ib.items))
ip.ph.blocksCount = 1
ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0]...)
ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1]...)
ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...)
ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
fs.MustWriteData(&ip.itemsData, ip.sb.itemsData)
ip.bh.itemsBlockOffset = 0

View file

@ -16,7 +16,7 @@ import (
//
// The callback must return sorted items. The first and the last item must be unchanged.
// The callback can re-use data and items for storing the result.
type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte)
type PrepareBlockCallback func(data []byte, items []Item) ([]byte, []Item)
// mergeBlockStreams merges bsrs and writes result to bsw.
//
@ -122,8 +122,10 @@ again:
nextItem = bsm.bsrHeap[0].bh.firstItem
hasNextItem = true
}
items := bsr.Block.items
data := bsr.Block.data
for bsr.blockItemIdx < len(bsr.Block.items) {
item := bsr.Block.items[bsr.blockItemIdx]
item := items[bsr.blockItemIdx].Bytes(data)
if hasNextItem && string(item) > string(nextItem) {
break
}
@ -148,32 +150,36 @@ again:
// The next item in the bsr.Block exceeds nextItem.
// Adjust bsr.bh.firstItem and return bsr to heap.
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx]...)
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)
heap.Push(&bsm.bsrHeap, bsr)
goto again
}
func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) {
if len(bsm.ib.items) == 0 {
items := bsm.ib.items
data := bsm.ib.data
if len(items) == 0 {
// Nothing to flush.
return
}
atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items)))
atomic.AddUint64(itemsMerged, uint64(len(items)))
if bsm.prepareBlock != nil {
bsm.firstItem = append(bsm.firstItem[:0], bsm.ib.items[0]...)
bsm.lastItem = append(bsm.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...)
bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items)
if len(bsm.ib.items) == 0 {
bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...)
bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...)
data, items = bsm.prepareBlock(data, items)
bsm.ib.data = data
bsm.ib.items = items
if len(items) == 0 {
// Nothing to flush
return
}
// Consistency checks after prepareBlock call.
firstItem := bsm.ib.items[0]
if string(firstItem) != string(bsm.firstItem) {
firstItem := items[0].String(data)
if firstItem != string(bsm.firstItem) {
logger.Panicf("BUG: prepareBlock must return first item equal to the original first item;\ngot\n%X\nwant\n%X", firstItem, bsm.firstItem)
}
lastItem := bsm.ib.items[len(bsm.ib.items)-1]
if string(lastItem) != string(bsm.lastItem) {
lastItem := items[len(items)-1].String(data)
if lastItem != string(bsm.lastItem) {
logger.Panicf("BUG: prepareBlock must return last item equal to the original last item;\ngot\n%X\nwant\n%X", lastItem, bsm.lastItem)
}
// Verify whether the bsm.ib.items are sorted only in tests, since this
@ -182,12 +188,12 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it
logger.Panicf("BUG: prepareBlock must return sorted items;\ngot\n%s", bsm.ib.debugItemsString())
}
}
ph.itemsCount += uint64(len(bsm.ib.items))
ph.itemsCount += uint64(len(items))
if !bsm.phFirstItemCaught {
ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...)
ph.firstItem = append(ph.firstItem[:0], items[0].String(data)...)
bsm.phFirstItemCaught = true
}
ph.lastItem = append(ph.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...)
ph.lastItem = append(ph.lastItem[:0], items[len(items)-1].String(data)...)
bsw.WriteBlock(&bsm.ib)
bsm.ib.Reset()
ph.blocksCount++

View file

@ -157,10 +157,12 @@ func testCheckItems(dstIP *inmemoryPart, items []string) error {
if bh.itemsCount <= 0 {
return fmt.Errorf("unexpected empty block")
}
if string(bh.firstItem) != string(dstBsr.Block.items[0]) {
return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, dstBsr.Block.items[0])
item := dstBsr.Block.items[0].Bytes(dstBsr.Block.data)
if string(bh.firstItem) != string(item) {
return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, item)
}
for _, item := range dstBsr.Block.items {
for _, it := range dstBsr.Block.items {
item := it.Bytes(dstBsr.Block.data)
dstItems = append(dstItems, string(item))
}
}

View file

@ -142,14 +142,17 @@ func (ps *partSearch) Seek(k []byte) {
// Locate the first item to scan in the block.
items := ps.ib.items
data := ps.ib.data
cpLen := commonPrefixLen(ps.ib.commonPrefix, k)
if cpLen > 0 {
keySuffix := k[cpLen:]
ps.ibItemIdx = sort.Search(len(items), func(i int) bool {
return string(keySuffix) <= string(items[i][cpLen:])
it := items[i]
it.Start += uint32(cpLen)
return string(keySuffix) <= it.String(data)
})
} else {
ps.ibItemIdx = binarySearchKey(items, k)
ps.ibItemIdx = binarySearchKey(data, items, k)
}
if ps.ibItemIdx < len(items) {
// The item has been found.
@ -168,13 +171,14 @@ func (ps *partSearch) tryFastSeek(k []byte) bool {
if ps.ib == nil {
return false
}
data := ps.ib.data
items := ps.ib.items
idx := ps.ibItemIdx
if idx >= len(items) {
// The ib is exhausted.
return false
}
if string(k) > string(items[len(items)-1]) {
if string(k) > items[len(items)-1].String(data) {
// The item is located in next blocks.
return false
}
@ -183,8 +187,8 @@ func (ps *partSearch) tryFastSeek(k []byte) bool {
if idx > 0 {
idx--
}
if string(k) < string(items[idx]) {
if string(k) < string(items[0]) {
if string(k) < items[idx].String(data) {
if string(k) < items[0].String(data) {
// The item is located in previous blocks.
return false
}
@ -192,7 +196,7 @@ func (ps *partSearch) tryFastSeek(k []byte) bool {
}
// The item is located in the current block
ps.ibItemIdx = idx + binarySearchKey(items[idx:], k)
ps.ibItemIdx = idx + binarySearchKey(data, items[idx:], k)
return true
}
@ -204,10 +208,11 @@ func (ps *partSearch) NextItem() bool {
return false
}
if ps.ibItemIdx < len(ps.ib.items) {
items := ps.ib.items
if ps.ibItemIdx < len(items) {
// Fast path - the current block contains more items.
// Proceed to the next item.
ps.Item = ps.ib.items[ps.ibItemIdx]
ps.Item = items[ps.ibItemIdx].Bytes(ps.ib.data)
ps.ibItemIdx++
return true
}
@ -219,7 +224,7 @@ func (ps *partSearch) NextItem() bool {
}
// Invariant: len(ps.ib.items) > 0 after nextBlock.
ps.Item = ps.ib.items[0]
ps.Item = ps.ib.items[0].Bytes(ps.ib.data)
ps.ibItemIdx++
return true
}
@ -319,11 +324,11 @@ func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
return ib, nil
}
func binarySearchKey(items [][]byte, key []byte) int {
func binarySearchKey(data []byte, items []Item, key []byte) int {
if len(items) == 0 {
return 0
}
if string(key) <= string(items[0]) {
if string(key) <= items[0].String(data) {
// Fast path - the item is the first.
return 0
}
@ -335,7 +340,7 @@ func binarySearchKey(items [][]byte, key []byte) int {
i, j := uint(0), n
for i < j {
h := uint(i+j) >> 1
if h >= 0 && h < uint(len(items)) && string(key) > string(items[h]) {
if h >= 0 && h < uint(len(items)) && string(key) > items[h].String(data) {
i = h + 1
} else {
j = h

View file

@ -46,7 +46,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
b.Run("sequential-keys-exact", func(b *testing.B) {
benchmarkTableSearchKeys(b, tb, keys, 0)
})
b.Run("sequential-keys-without-siffux", func(b *testing.B) {
b.Run("sequential-keys-without-suffix", func(b *testing.B) {
benchmarkTableSearchKeys(b, tb, keys, 4)
})
@ -57,7 +57,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
b.Run("random-keys-exact", func(b *testing.B) {
benchmarkTableSearchKeys(b, tb, randKeys, 0)
})
b.Run("random-keys-without-siffux", func(b *testing.B) {
b.Run("random-keys-without-suffix", func(b *testing.B) {
benchmarkTableSearchKeys(b, tb, randKeys, 4)
})
}

View file

@ -218,7 +218,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
atomic.AddUint64(&flushes, 1)
}
var itemsMerged uint64
prepareBlock := func(data []byte, items [][]byte) ([]byte, [][]byte) {
prepareBlock := func(data []byte, items []Item) ([]byte, []Item) {
atomic.AddUint64(&itemsMerged, uint64(len(items)))
return data, items
}

View file

@ -76,6 +76,6 @@ func testBlockHeaderMarshalUnmarshal(t *testing.T, bh *blockHeader) {
t.Fatalf("unexpected tail after unmarshaling bh=%+v; got\n%x; want\n%x", bh, tail, suffix)
}
if !reflect.DeepEqual(bh, &bh2) {
t.Fatalf("unexpected bh unmarshaled after adding siffux; got\n%+v; want\n%+v", &bh2, bh)
t.Fatalf("unexpected bh unmarshaled after adding suffix; got\n%+v; want\n%+v", &bh2, bh)
}
}

View file

@ -3461,24 +3461,24 @@ func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool {
return true
}
func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
func mergeTagToMetricIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) {
data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixTagToMetricIDs)
data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixDateTagToMetricIDs)
return data, items
}
func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) ([]byte, [][]byte) {
func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefix byte) ([]byte, []mergeset.Item) {
// Perform quick checks whether items contain rows starting from nsPrefix
// based on the fact that items are sorted.
if len(items) <= 2 {
// The first and the last row must remain unchanged.
return data, items
}
firstItem := items[0]
firstItem := items[0].Bytes(data)
if len(firstItem) > 0 && firstItem[0] > nsPrefix {
return data, items
}
lastItem := items[len(items)-1]
lastItem := items[len(items)-1].Bytes(data)
if len(lastItem) > 0 && lastItem[0] < nsPrefix {
return data, items
}
@ -3491,14 +3491,18 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
mpPrev := &tmm.mpPrev
dstData := data[:0]
dstItems := items[:0]
for i, item := range items {
for i, it := range items {
item := it.Bytes(data)
if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 {
// Write rows not starting with nsPrefix as-is.
// Additionally write the first and the last row as-is in order to preserve
// sort order for adjancent blocks.
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
dstData = append(dstData, item...)
dstItems = append(dstItems, dstData[len(dstData)-len(item):])
dstItems = append(dstItems, mergeset.Item{
Start: uint32(len(dstData) - len(item)),
End: uint32(len(dstData)),
})
continue
}
if err := mp.Init(item, nsPrefix); err != nil {
@ -3507,7 +3511,10 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
if mp.MetricIDsLen() >= maxMetricIDsPerRow {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
dstData = append(dstData, item...)
dstItems = append(dstItems, dstData[len(dstData)-len(item):])
dstItems = append(dstItems, mergeset.Item{
Start: uint32(len(dstData) - len(item)),
End: uint32(len(dstData)),
})
continue
}
if !mp.EqualPrefix(mpPrev) {
@ -3523,7 +3530,7 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
if len(tmm.pendingMetricIDs) > 0 {
logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs)
}
if !checkItemsSorted(dstItems) {
if !checkItemsSorted(dstData, dstItems) {
// Items could become unsorted if initial items contain duplicate metricIDs:
//
// item1: 1, 1, 5
@ -3541,15 +3548,8 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
// into the same new time series from multiple concurrent goroutines.
atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1)
dstData = append(dstData[:0], tmm.dataCopy...)
dstItems = dstItems[:0]
// tmm.itemsCopy can point to overwritten data, so it must be updated
// to point to real data from tmm.dataCopy.
buf := dstData
for _, item := range tmm.itemsCopy {
dstItems = append(dstItems, buf[:len(item)])
buf = buf[len(item):]
}
if !checkItemsSorted(dstItems) {
dstItems = append(dstItems[:0], tmm.itemsCopy...)
if !checkItemsSorted(dstData, dstItems) {
logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
}
}
@ -3561,13 +3561,14 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
var indexBlocksWithMetricIDsIncorrectOrder uint64
var indexBlocksWithMetricIDsProcessed uint64
func checkItemsSorted(items [][]byte) bool {
func checkItemsSorted(data []byte, items []mergeset.Item) bool {
if len(items) == 0 {
return true
}
prevItem := items[0]
for _, currItem := range items[1:] {
if string(prevItem) > string(currItem) {
prevItem := items[0].String(data)
for _, it := range items[1:] {
currItem := it.String(data)
if prevItem > currItem {
return false
}
prevItem = currItem
@ -3595,7 +3596,7 @@ type tagToMetricIDsRowsMerger struct {
mp tagToMetricIDsRowParser
mpPrev tagToMetricIDsRowParser
itemsCopy [][]byte
itemsCopy []mergeset.Item
dataCopy []byte
}
@ -3608,7 +3609,7 @@ func (tmm *tagToMetricIDsRowsMerger) Reset() {
tmm.dataCopy = tmm.dataCopy[:0]
}
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) {
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems []mergeset.Item, mp *tagToMetricIDsRowParser) ([]byte, []mergeset.Item) {
if len(tmm.pendingMetricIDs) == 0 {
// Nothing to flush
return dstData, dstItems
@ -3623,7 +3624,10 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt
for _, metricID := range tmm.pendingMetricIDs {
dstData = encoding.MarshalUint64(dstData, metricID)
}
dstItems = append(dstItems, dstData[dstDataLen:])
dstItems = append(dstItems, mergeset.Item{
Start: uint32(dstDataLen),
End: uint32(len(dstData)),
})
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
return dstData, dstItems
}

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)
@ -36,33 +37,38 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
f := func(items []string, expectedItems []string) {
t.Helper()
var data []byte
var itemsB [][]byte
var itemsB []mergeset.Item
for _, item := range items {
data = append(data, item...)
itemsB = append(itemsB, data[len(data)-len(item):])
itemsB = append(itemsB, mergeset.Item{
Start: uint32(len(data) - len(item)),
End: uint32(len(data)),
})
}
if !checkItemsSorted(itemsB) {
if !checkItemsSorted(data, itemsB) {
t.Fatalf("source items aren't sorted; items:\n%q", itemsB)
}
resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB)
if len(resultItemsB) != len(expectedItems) {
t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems))
}
if !checkItemsSorted(resultItemsB) {
if !checkItemsSorted(resultData, resultItemsB) {
t.Fatalf("result items aren't sorted; items:\n%q", resultItemsB)
}
for i, item := range resultItemsB {
if !bytes.HasPrefix(resultData, item) {
t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, resultData, item)
buf := resultData
for i, it := range resultItemsB {
item := it.Bytes(resultData)
if !bytes.HasPrefix(buf, item) {
t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, buf, item)
}
resultData = resultData[len(item):]
buf = buf[len(item):]
}
if len(resultData) != 0 {
t.Fatalf("unexpected tail left in resultData: %X", resultData)
if len(buf) != 0 {
t.Fatalf("unexpected tail left in resultData: %X", buf)
}
var resultItems []string
for _, item := range resultItemsB {
resultItems = append(resultItems, string(item))
for _, it := range resultItemsB {
resultItems = append(resultItems, string(it.Bytes(resultData)))
}
if !reflect.DeepEqual(expectedItems, resultItems) {
t.Fatalf("unexpected items;\ngot\n%X\nwant\n%X", resultItems, expectedItems)