mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/mergeset: reduce memory usage for inmemoryBlock by using more compact items representation
This also should reduce CPU time spent by GC, since inmemoryBlock.items don't have pointers now, so GC doesn't need visiting them.
This commit is contained in:
parent
bd3bcdc43c
commit
587132555f
13 changed files with 248 additions and 150 deletions
|
@ -195,7 +195,8 @@ func (bsr *blockStreamReader) Next() bool {
|
|||
if err := bsr.readNextBHS(); err != nil {
|
||||
if err == io.EOF {
|
||||
// Check the last item.
|
||||
lastItem := bsr.Block.items[len(bsr.Block.items)-1]
|
||||
b := &bsr.Block
|
||||
lastItem := b.items[len(b.items)-1].Bytes(b.data)
|
||||
if string(bsr.ph.lastItem) != string(lastItem) {
|
||||
err = fmt.Errorf("unexpected last item; got %X; want %X", lastItem, bsr.ph.lastItem)
|
||||
}
|
||||
|
@ -240,12 +241,13 @@ func (bsr *blockStreamReader) Next() bool {
|
|||
}
|
||||
if !bsr.firstItemChecked {
|
||||
bsr.firstItemChecked = true
|
||||
if string(bsr.ph.firstItem) != string(bsr.Block.items[0]) {
|
||||
bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", bsr.Block.items[0], bsr.ph.firstItem)
|
||||
b := &bsr.Block
|
||||
firstItem := b.items[0].Bytes(b.data)
|
||||
if string(bsr.ph.firstItem) != string(firstItem) {
|
||||
bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", firstItem, bsr.ph.firstItem)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -44,8 +44,10 @@ func testBlockStreamReaderRead(ip *inmemoryPart, items []string) error {
|
|||
bsr := newTestBlockStreamReader(ip)
|
||||
i := 0
|
||||
for bsr.Next() {
|
||||
for _, item := range bsr.Block.items {
|
||||
if string(item) != items[i] {
|
||||
data := bsr.Block.data
|
||||
for _, it := range bsr.Block.items {
|
||||
item := it.String(data)
|
||||
if item != items[i] {
|
||||
return fmt.Errorf("unexpected item[%d]; got %q; want %q", i, item, items[i])
|
||||
}
|
||||
i++
|
||||
|
|
|
@ -3,6 +3,7 @@ package mergeset
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -13,35 +14,62 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
type byteSliceSorter [][]byte
|
||||
// Item represents a single item for storing in a mergeset.
|
||||
type Item struct {
|
||||
// Start is start offset for the item in data.
|
||||
Start uint32
|
||||
|
||||
func (s byteSliceSorter) Len() int { return len(s) }
|
||||
func (s byteSliceSorter) Less(i, j int) bool {
|
||||
return string(s[i]) < string(s[j])
|
||||
// End is end offset for the item in data.
|
||||
End uint32
|
||||
}
|
||||
func (s byteSliceSorter) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
|
||||
// Bytes returns bytes representation of it obtained from data.
|
||||
//
|
||||
// The returned bytes representation belongs to data.
|
||||
func (it Item) Bytes(data []byte) []byte {
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
|
||||
sh.Cap = int(it.End - it.Start)
|
||||
sh.Len = int(it.End - it.Start)
|
||||
sh.Data += uintptr(it.Start)
|
||||
return data
|
||||
}
|
||||
|
||||
// String returns string represetnation of it obtained from data.
|
||||
//
|
||||
// The returned string representation belongs to data.
|
||||
func (it Item) String(data []byte) string {
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
|
||||
sh.Data += uintptr(it.Start)
|
||||
sh.Len = int(it.End - it.Start)
|
||||
return *(*string)(unsafe.Pointer(sh))
|
||||
}
|
||||
|
||||
func (ib *inmemoryBlock) Len() int { return len(ib.items) }
|
||||
|
||||
func (ib *inmemoryBlock) Less(i, j int) bool {
|
||||
data := ib.data
|
||||
items := ib.items
|
||||
return string(items[i].Bytes(data)) < string(items[j].Bytes(data))
|
||||
}
|
||||
|
||||
func (ib *inmemoryBlock) Swap(i, j int) {
|
||||
items := ib.items
|
||||
items[i], items[j] = items[j], items[i]
|
||||
}
|
||||
|
||||
type inmemoryBlock struct {
|
||||
commonPrefix []byte
|
||||
data []byte
|
||||
items byteSliceSorter
|
||||
items []Item
|
||||
}
|
||||
|
||||
func (ib *inmemoryBlock) SizeBytes() int {
|
||||
return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof([]byte{}))
|
||||
return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof(Item{}))
|
||||
}
|
||||
|
||||
func (ib *inmemoryBlock) Reset() {
|
||||
ib.commonPrefix = ib.commonPrefix[:0]
|
||||
ib.data = ib.data[:0]
|
||||
|
||||
items := ib.items
|
||||
for i := range items {
|
||||
// Remove reference to by slice, so GC could free the byte slice.
|
||||
items[i] = nil
|
||||
}
|
||||
ib.items = ib.items[:0]
|
||||
}
|
||||
|
||||
|
@ -50,12 +78,14 @@ func (ib *inmemoryBlock) updateCommonPrefix() {
|
|||
if len(ib.items) == 0 {
|
||||
return
|
||||
}
|
||||
cp := ib.items[0]
|
||||
items := ib.items
|
||||
data := ib.data
|
||||
cp := items[0].Bytes(data)
|
||||
if len(cp) == 0 {
|
||||
return
|
||||
}
|
||||
for _, item := range ib.items[1:] {
|
||||
cpLen := commonPrefixLen(cp, item)
|
||||
for _, it := range items[1:] {
|
||||
cpLen := commonPrefixLen(cp, it.Bytes(data))
|
||||
if cpLen == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -82,15 +112,21 @@ func commonPrefixLen(a, b []byte) int {
|
|||
//
|
||||
// false is returned if x isn't added to ib due to block size contraints.
|
||||
func (ib *inmemoryBlock) Add(x []byte) bool {
|
||||
if len(x)+len(ib.data) > maxInmemoryBlockSize {
|
||||
data := ib.data
|
||||
if len(x)+len(data) > maxInmemoryBlockSize {
|
||||
return false
|
||||
}
|
||||
if cap(ib.data) < maxInmemoryBlockSize {
|
||||
dataLen := len(ib.data)
|
||||
ib.data = bytesutil.Resize(ib.data, maxInmemoryBlockSize)[:dataLen]
|
||||
if cap(data) < maxInmemoryBlockSize {
|
||||
dataLen := len(data)
|
||||
data = bytesutil.Resize(data, maxInmemoryBlockSize)[:dataLen]
|
||||
}
|
||||
ib.data = append(ib.data, x...)
|
||||
ib.items = append(ib.items, ib.data[len(ib.data)-len(x):])
|
||||
dataLen := len(data)
|
||||
data = append(data, x...)
|
||||
ib.items = append(ib.items, Item{
|
||||
Start: uint32(dataLen),
|
||||
End: uint32(len(data)),
|
||||
})
|
||||
ib.data = data
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -100,16 +136,21 @@ func (ib *inmemoryBlock) Add(x []byte) bool {
|
|||
const maxInmemoryBlockSize = 64 * 1024
|
||||
|
||||
func (ib *inmemoryBlock) sort() {
|
||||
// Use sort.Sort instead of sort.Slice in order to eliminate memory allocation.
|
||||
sort.Sort(&ib.items)
|
||||
sort.Sort(ib)
|
||||
data := ib.data
|
||||
items := ib.items
|
||||
bb := bbPool.Get()
|
||||
b := bytesutil.Resize(bb.B, len(ib.data))
|
||||
b := bytesutil.Resize(bb.B, len(data))
|
||||
b = b[:0]
|
||||
for i, item := range ib.items {
|
||||
b = append(b, item...)
|
||||
ib.items[i] = b[len(b)-len(item):]
|
||||
for i, it := range items {
|
||||
bLen := len(b)
|
||||
b = append(b, it.String(data)...)
|
||||
items[i] = Item{
|
||||
Start: uint32(bLen),
|
||||
End: uint32(len(b)),
|
||||
}
|
||||
}
|
||||
bb.B, ib.data = ib.data, b
|
||||
bb.B, ib.data = data, b
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
|
@ -140,7 +181,7 @@ func checkMarshalType(mt marshalType) error {
|
|||
|
||||
func (ib *inmemoryBlock) isSorted() bool {
|
||||
// Use sort.IsSorted instead of sort.SliceIsSorted in order to eliminate memory allocation.
|
||||
return sort.IsSorted(&ib.items)
|
||||
return sort.IsSorted(ib)
|
||||
}
|
||||
|
||||
// MarshalUnsortedData marshals unsorted items from ib to sb.
|
||||
|
@ -179,9 +220,11 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo
|
|||
|
||||
func (ib *inmemoryBlock) debugItemsString() string {
|
||||
var sb strings.Builder
|
||||
var prevItem []byte
|
||||
for i, item := range ib.items {
|
||||
if string(item) < string(prevItem) {
|
||||
var prevItem string
|
||||
data := ib.data
|
||||
for i, it := range ib.items {
|
||||
item := it.String(data)
|
||||
if item < prevItem {
|
||||
fmt.Fprintf(&sb, "!!! the next item is smaller than the previous item !!!\n")
|
||||
}
|
||||
fmt.Fprintf(&sb, "%05d %X\n", i, item)
|
||||
|
@ -201,7 +244,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
|
|||
logger.Panicf("BUG: the number of items in the block must be smaller than %d; got %d items", uint64(1<<32), len(ib.items))
|
||||
}
|
||||
|
||||
firstItemDst = append(firstItemDst, ib.items[0]...)
|
||||
data := ib.data
|
||||
firstItem := ib.items[0].Bytes(data)
|
||||
firstItemDst = append(firstItemDst, firstItem...)
|
||||
commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)
|
||||
|
||||
if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 {
|
||||
|
@ -221,10 +266,11 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
|
|||
defer encoding.PutUint64s(xs)
|
||||
|
||||
cpLen := len(ib.commonPrefix)
|
||||
prevItem := ib.items[0][cpLen:]
|
||||
prevItem := firstItem[cpLen:]
|
||||
prevPrefixLen := uint64(0)
|
||||
for i, item := range ib.items[1:] {
|
||||
item := item[cpLen:]
|
||||
for i, it := range ib.items[1:] {
|
||||
it.Start += uint32(cpLen)
|
||||
item := it.Bytes(data)
|
||||
prefixLen := uint64(commonPrefixLen(prevItem, item))
|
||||
bItems = append(bItems, item[prefixLen:]...)
|
||||
xLen := prefixLen ^ prevPrefixLen
|
||||
|
@ -240,9 +286,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
|
|||
bbPool.Put(bbItems)
|
||||
|
||||
// Marshal lens data.
|
||||
prevItemLen := uint64(len(ib.items[0]) - cpLen)
|
||||
for i, item := range ib.items[1:] {
|
||||
itemLen := uint64(len(item) - cpLen)
|
||||
prevItemLen := uint64(len(firstItem) - cpLen)
|
||||
for i, it := range ib.items[1:] {
|
||||
itemLen := uint64(int(it.End-it.Start) - cpLen)
|
||||
xLen := itemLen ^ prevItemLen
|
||||
prevItemLen = itemLen
|
||||
|
||||
|
@ -346,11 +392,15 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix
|
|||
}
|
||||
data := bytesutil.Resize(ib.data, maxInmemoryBlockSize)
|
||||
if n := int(itemsCount) - cap(ib.items); n > 0 {
|
||||
ib.items = append(ib.items[:cap(ib.items)], make([][]byte, n)...)
|
||||
ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...)
|
||||
}
|
||||
ib.items = ib.items[:itemsCount]
|
||||
data = append(data[:0], firstItem...)
|
||||
ib.items[0] = data
|
||||
items := ib.items
|
||||
items[0] = Item{
|
||||
Start: 0,
|
||||
End: uint32(len(data)),
|
||||
}
|
||||
prevItem := data[len(commonPrefix):]
|
||||
b := bb.B
|
||||
for i := 1; i < int(itemsCount); i++ {
|
||||
|
@ -363,17 +413,19 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix
|
|||
if uint64(len(b)) < suffixLen {
|
||||
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", suffixLen, len(b))
|
||||
}
|
||||
data = append(data, commonPrefix...)
|
||||
|
||||
if prefixLen > uint64(len(prevItem)) {
|
||||
return fmt.Errorf("prefixLen cannot exceed %d; got %d", len(prevItem), prefixLen)
|
||||
}
|
||||
dataLen := len(data)
|
||||
data = append(data, commonPrefix...)
|
||||
data = append(data, prevItem[:prefixLen]...)
|
||||
data = append(data, b[:suffixLen]...)
|
||||
item := data[len(data)-int(itemLen)-len(commonPrefix):]
|
||||
ib.items[i] = item
|
||||
items[i] = Item{
|
||||
Start: uint32(dataLen),
|
||||
End: uint32(len(data)),
|
||||
}
|
||||
b = b[suffixLen:]
|
||||
prevItem = item[len(commonPrefix):]
|
||||
prevItem = data[len(data)-int(itemLen):]
|
||||
}
|
||||
if len(b) > 0 {
|
||||
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
|
||||
|
@ -381,30 +433,33 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix
|
|||
if uint64(len(data)) != dataLen {
|
||||
return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen)
|
||||
}
|
||||
ib.data = data
|
||||
if !ib.isSorted() {
|
||||
return fmt.Errorf("decoded data block contains unsorted items; items:\n%s", ib.debugItemsString())
|
||||
}
|
||||
ib.data = data
|
||||
return nil
|
||||
}
|
||||
|
||||
var bbPool bytesutil.ByteBufferPool
|
||||
|
||||
func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) {
|
||||
data := ib.data
|
||||
|
||||
// Marshal items data.
|
||||
// There is no need in marshaling the first item, since it is returned
|
||||
// to the caller in marshalData.
|
||||
cpLen := len(ib.commonPrefix)
|
||||
b := sb.itemsData[:0]
|
||||
for _, item := range ib.items[1:] {
|
||||
b = append(b, item[cpLen:]...)
|
||||
for _, it := range ib.items[1:] {
|
||||
it.Start += uint32(cpLen)
|
||||
b = append(b, it.String(data)...)
|
||||
}
|
||||
sb.itemsData = b
|
||||
|
||||
// Marshal length data.
|
||||
b = sb.lensData[:0]
|
||||
for _, item := range ib.items[1:] {
|
||||
b = encoding.MarshalUint64(b, uint64(len(item)-cpLen))
|
||||
for _, it := range ib.items[1:] {
|
||||
b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen))
|
||||
}
|
||||
sb.lensData = b
|
||||
}
|
||||
|
@ -431,26 +486,34 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte,
|
|||
}
|
||||
|
||||
// Unmarshal items data.
|
||||
ib.data = bytesutil.Resize(ib.data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount))
|
||||
ib.data = append(ib.data[:0], firstItem...)
|
||||
ib.items = append(ib.items[:0], ib.data)
|
||||
|
||||
data := ib.data
|
||||
items := ib.items
|
||||
data = bytesutil.Resize(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount))
|
||||
data = append(data[:0], firstItem...)
|
||||
items = append(items[:0], Item{
|
||||
Start: 0,
|
||||
End: uint32(len(data)),
|
||||
})
|
||||
b = sb.itemsData
|
||||
for i := 1; i < int(itemsCount); i++ {
|
||||
itemLen := lb.lens[i]
|
||||
if uint64(len(b)) < itemLen {
|
||||
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b))
|
||||
}
|
||||
ib.data = append(ib.data, commonPrefix...)
|
||||
ib.data = append(ib.data, b[:itemLen]...)
|
||||
item := ib.data[len(ib.data)-int(itemLen)-len(commonPrefix):]
|
||||
ib.items = append(ib.items, item)
|
||||
dataLen := len(data)
|
||||
data = append(data, commonPrefix...)
|
||||
data = append(data, b[:itemLen]...)
|
||||
items = append(items, Item{
|
||||
Start: uint32(dataLen),
|
||||
End: uint32(len(data)),
|
||||
})
|
||||
b = b[itemLen:]
|
||||
}
|
||||
ib.data = data
|
||||
ib.items = items
|
||||
if len(b) > 0 {
|
||||
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -37,8 +37,10 @@ func TestInmemoryBlockAdd(t *testing.T) {
|
|||
if len(ib.data) != totalLen {
|
||||
t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen)
|
||||
}
|
||||
for j, item := range ib.items {
|
||||
if items[j] != string(item) {
|
||||
data := ib.data
|
||||
for j, it := range ib.items {
|
||||
item := it.String(data)
|
||||
if items[j] != item {
|
||||
t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j])
|
||||
}
|
||||
}
|
||||
|
@ -75,8 +77,10 @@ func TestInmemoryBlockSort(t *testing.T) {
|
|||
if len(ib.data) != totalLen {
|
||||
t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen)
|
||||
}
|
||||
for j, item := range ib.items {
|
||||
if items[j] != string(item) {
|
||||
data := ib.data
|
||||
for j, it := range ib.items {
|
||||
item := it.String(data)
|
||||
if items[j] != item {
|
||||
t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j])
|
||||
}
|
||||
}
|
||||
|
@ -122,8 +126,9 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
|
|||
if int(itemsLen) != len(ib.items) {
|
||||
t.Fatalf("unexpected number of items marshaled; got %d; want %d", itemsLen, len(ib.items))
|
||||
}
|
||||
if string(firstItem) != string(ib.items[0]) {
|
||||
t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, ib.items[0])
|
||||
firstItemExpected := ib.items[0].String(ib.data)
|
||||
if string(firstItem) != firstItemExpected {
|
||||
t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, firstItemExpected)
|
||||
}
|
||||
if err := checkMarshalType(mt); err != nil {
|
||||
t.Fatalf("invalid mt: %s", err)
|
||||
|
@ -143,12 +148,15 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
|
|||
t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib2.data), totalLen)
|
||||
}
|
||||
for j := range items {
|
||||
if len(items[j]) != len(ib2.items[j]) {
|
||||
it2 := ib2.items[j]
|
||||
item2 := it2.String(ib2.data)
|
||||
if len(items[j]) != len(item2) {
|
||||
t.Fatalf("items length mismatch at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X",
|
||||
j, len(items), i, len(ib2.items[j]), ib2.items[j], len(items[j]), items[j])
|
||||
j, len(items), i, len(item2), item2, len(items[j]), items[j])
|
||||
}
|
||||
}
|
||||
for j, item := range ib2.items {
|
||||
for j, it := range ib2.items {
|
||||
item := it.String(ib2.data)
|
||||
if items[j] != string(item) {
|
||||
t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X",
|
||||
j, len(items), i, len(item), item, len(items[j]), items[j])
|
||||
|
|
|
@ -56,8 +56,8 @@ func (ip *inmemoryPart) Init(ib *inmemoryBlock) {
|
|||
|
||||
ip.ph.itemsCount = uint64(len(ib.items))
|
||||
ip.ph.blocksCount = 1
|
||||
ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0]...)
|
||||
ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1]...)
|
||||
ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
||||
ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
||||
|
||||
fs.MustWriteData(&ip.itemsData, ip.sb.itemsData)
|
||||
ip.bh.itemsBlockOffset = 0
|
||||
|
|
|
@ -16,7 +16,7 @@ import (
|
|||
//
|
||||
// The callback must return sorted items. The first and the last item must be unchanged.
|
||||
// The callback can re-use data and items for storing the result.
|
||||
type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte)
|
||||
type PrepareBlockCallback func(data []byte, items []Item) ([]byte, []Item)
|
||||
|
||||
// mergeBlockStreams merges bsrs and writes result to bsw.
|
||||
//
|
||||
|
@ -122,8 +122,10 @@ again:
|
|||
nextItem = bsm.bsrHeap[0].bh.firstItem
|
||||
hasNextItem = true
|
||||
}
|
||||
items := bsr.Block.items
|
||||
data := bsr.Block.data
|
||||
for bsr.blockItemIdx < len(bsr.Block.items) {
|
||||
item := bsr.Block.items[bsr.blockItemIdx]
|
||||
item := items[bsr.blockItemIdx].Bytes(data)
|
||||
if hasNextItem && string(item) > string(nextItem) {
|
||||
break
|
||||
}
|
||||
|
@ -148,32 +150,36 @@ again:
|
|||
|
||||
// The next item in the bsr.Block exceeds nextItem.
|
||||
// Adjust bsr.bh.firstItem and return bsr to heap.
|
||||
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx]...)
|
||||
bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)
|
||||
heap.Push(&bsm.bsrHeap, bsr)
|
||||
goto again
|
||||
}
|
||||
|
||||
func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) {
|
||||
if len(bsm.ib.items) == 0 {
|
||||
items := bsm.ib.items
|
||||
data := bsm.ib.data
|
||||
if len(items) == 0 {
|
||||
// Nothing to flush.
|
||||
return
|
||||
}
|
||||
atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items)))
|
||||
atomic.AddUint64(itemsMerged, uint64(len(items)))
|
||||
if bsm.prepareBlock != nil {
|
||||
bsm.firstItem = append(bsm.firstItem[:0], bsm.ib.items[0]...)
|
||||
bsm.lastItem = append(bsm.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...)
|
||||
bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items)
|
||||
if len(bsm.ib.items) == 0 {
|
||||
bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...)
|
||||
bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...)
|
||||
data, items = bsm.prepareBlock(data, items)
|
||||
bsm.ib.data = data
|
||||
bsm.ib.items = items
|
||||
if len(items) == 0 {
|
||||
// Nothing to flush
|
||||
return
|
||||
}
|
||||
// Consistency checks after prepareBlock call.
|
||||
firstItem := bsm.ib.items[0]
|
||||
if string(firstItem) != string(bsm.firstItem) {
|
||||
firstItem := items[0].String(data)
|
||||
if firstItem != string(bsm.firstItem) {
|
||||
logger.Panicf("BUG: prepareBlock must return first item equal to the original first item;\ngot\n%X\nwant\n%X", firstItem, bsm.firstItem)
|
||||
}
|
||||
lastItem := bsm.ib.items[len(bsm.ib.items)-1]
|
||||
if string(lastItem) != string(bsm.lastItem) {
|
||||
lastItem := items[len(items)-1].String(data)
|
||||
if lastItem != string(bsm.lastItem) {
|
||||
logger.Panicf("BUG: prepareBlock must return last item equal to the original last item;\ngot\n%X\nwant\n%X", lastItem, bsm.lastItem)
|
||||
}
|
||||
// Verify whether the bsm.ib.items are sorted only in tests, since this
|
||||
|
@ -182,12 +188,12 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it
|
|||
logger.Panicf("BUG: prepareBlock must return sorted items;\ngot\n%s", bsm.ib.debugItemsString())
|
||||
}
|
||||
}
|
||||
ph.itemsCount += uint64(len(bsm.ib.items))
|
||||
ph.itemsCount += uint64(len(items))
|
||||
if !bsm.phFirstItemCaught {
|
||||
ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...)
|
||||
ph.firstItem = append(ph.firstItem[:0], items[0].String(data)...)
|
||||
bsm.phFirstItemCaught = true
|
||||
}
|
||||
ph.lastItem = append(ph.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...)
|
||||
ph.lastItem = append(ph.lastItem[:0], items[len(items)-1].String(data)...)
|
||||
bsw.WriteBlock(&bsm.ib)
|
||||
bsm.ib.Reset()
|
||||
ph.blocksCount++
|
||||
|
|
|
@ -157,10 +157,12 @@ func testCheckItems(dstIP *inmemoryPart, items []string) error {
|
|||
if bh.itemsCount <= 0 {
|
||||
return fmt.Errorf("unexpected empty block")
|
||||
}
|
||||
if string(bh.firstItem) != string(dstBsr.Block.items[0]) {
|
||||
return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, dstBsr.Block.items[0])
|
||||
item := dstBsr.Block.items[0].Bytes(dstBsr.Block.data)
|
||||
if string(bh.firstItem) != string(item) {
|
||||
return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, item)
|
||||
}
|
||||
for _, item := range dstBsr.Block.items {
|
||||
for _, it := range dstBsr.Block.items {
|
||||
item := it.Bytes(dstBsr.Block.data)
|
||||
dstItems = append(dstItems, string(item))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,14 +142,17 @@ func (ps *partSearch) Seek(k []byte) {
|
|||
|
||||
// Locate the first item to scan in the block.
|
||||
items := ps.ib.items
|
||||
data := ps.ib.data
|
||||
cpLen := commonPrefixLen(ps.ib.commonPrefix, k)
|
||||
if cpLen > 0 {
|
||||
keySuffix := k[cpLen:]
|
||||
ps.ibItemIdx = sort.Search(len(items), func(i int) bool {
|
||||
return string(keySuffix) <= string(items[i][cpLen:])
|
||||
it := items[i]
|
||||
it.Start += uint32(cpLen)
|
||||
return string(keySuffix) <= it.String(data)
|
||||
})
|
||||
} else {
|
||||
ps.ibItemIdx = binarySearchKey(items, k)
|
||||
ps.ibItemIdx = binarySearchKey(data, items, k)
|
||||
}
|
||||
if ps.ibItemIdx < len(items) {
|
||||
// The item has been found.
|
||||
|
@ -168,13 +171,14 @@ func (ps *partSearch) tryFastSeek(k []byte) bool {
|
|||
if ps.ib == nil {
|
||||
return false
|
||||
}
|
||||
data := ps.ib.data
|
||||
items := ps.ib.items
|
||||
idx := ps.ibItemIdx
|
||||
if idx >= len(items) {
|
||||
// The ib is exhausted.
|
||||
return false
|
||||
}
|
||||
if string(k) > string(items[len(items)-1]) {
|
||||
if string(k) > items[len(items)-1].String(data) {
|
||||
// The item is located in next blocks.
|
||||
return false
|
||||
}
|
||||
|
@ -183,8 +187,8 @@ func (ps *partSearch) tryFastSeek(k []byte) bool {
|
|||
if idx > 0 {
|
||||
idx--
|
||||
}
|
||||
if string(k) < string(items[idx]) {
|
||||
if string(k) < string(items[0]) {
|
||||
if string(k) < items[idx].String(data) {
|
||||
if string(k) < items[0].String(data) {
|
||||
// The item is located in previous blocks.
|
||||
return false
|
||||
}
|
||||
|
@ -192,7 +196,7 @@ func (ps *partSearch) tryFastSeek(k []byte) bool {
|
|||
}
|
||||
|
||||
// The item is located in the current block
|
||||
ps.ibItemIdx = idx + binarySearchKey(items[idx:], k)
|
||||
ps.ibItemIdx = idx + binarySearchKey(data, items[idx:], k)
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -204,10 +208,11 @@ func (ps *partSearch) NextItem() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
if ps.ibItemIdx < len(ps.ib.items) {
|
||||
items := ps.ib.items
|
||||
if ps.ibItemIdx < len(items) {
|
||||
// Fast path - the current block contains more items.
|
||||
// Proceed to the next item.
|
||||
ps.Item = ps.ib.items[ps.ibItemIdx]
|
||||
ps.Item = items[ps.ibItemIdx].Bytes(ps.ib.data)
|
||||
ps.ibItemIdx++
|
||||
return true
|
||||
}
|
||||
|
@ -219,7 +224,7 @@ func (ps *partSearch) NextItem() bool {
|
|||
}
|
||||
|
||||
// Invariant: len(ps.ib.items) > 0 after nextBlock.
|
||||
ps.Item = ps.ib.items[0]
|
||||
ps.Item = ps.ib.items[0].Bytes(ps.ib.data)
|
||||
ps.ibItemIdx++
|
||||
return true
|
||||
}
|
||||
|
@ -319,11 +324,11 @@ func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
|
|||
return ib, nil
|
||||
}
|
||||
|
||||
func binarySearchKey(items [][]byte, key []byte) int {
|
||||
func binarySearchKey(data []byte, items []Item, key []byte) int {
|
||||
if len(items) == 0 {
|
||||
return 0
|
||||
}
|
||||
if string(key) <= string(items[0]) {
|
||||
if string(key) <= items[0].String(data) {
|
||||
// Fast path - the item is the first.
|
||||
return 0
|
||||
}
|
||||
|
@ -335,7 +340,7 @@ func binarySearchKey(items [][]byte, key []byte) int {
|
|||
i, j := uint(0), n
|
||||
for i < j {
|
||||
h := uint(i+j) >> 1
|
||||
if h >= 0 && h < uint(len(items)) && string(key) > string(items[h]) {
|
||||
if h >= 0 && h < uint(len(items)) && string(key) > items[h].String(data) {
|
||||
i = h + 1
|
||||
} else {
|
||||
j = h
|
||||
|
|
|
@ -46,7 +46,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
|
|||
b.Run("sequential-keys-exact", func(b *testing.B) {
|
||||
benchmarkTableSearchKeys(b, tb, keys, 0)
|
||||
})
|
||||
b.Run("sequential-keys-without-siffux", func(b *testing.B) {
|
||||
b.Run("sequential-keys-without-suffix", func(b *testing.B) {
|
||||
benchmarkTableSearchKeys(b, tb, keys, 4)
|
||||
})
|
||||
|
||||
|
@ -57,7 +57,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
|
|||
b.Run("random-keys-exact", func(b *testing.B) {
|
||||
benchmarkTableSearchKeys(b, tb, randKeys, 0)
|
||||
})
|
||||
b.Run("random-keys-without-siffux", func(b *testing.B) {
|
||||
b.Run("random-keys-without-suffix", func(b *testing.B) {
|
||||
benchmarkTableSearchKeys(b, tb, randKeys, 4)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -218,7 +218,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
var itemsMerged uint64
|
||||
prepareBlock := func(data []byte, items [][]byte) ([]byte, [][]byte) {
|
||||
prepareBlock := func(data []byte, items []Item) ([]byte, []Item) {
|
||||
atomic.AddUint64(&itemsMerged, uint64(len(items)))
|
||||
return data, items
|
||||
}
|
||||
|
|
|
@ -76,6 +76,6 @@ func testBlockHeaderMarshalUnmarshal(t *testing.T, bh *blockHeader) {
|
|||
t.Fatalf("unexpected tail after unmarshaling bh=%+v; got\n%x; want\n%x", bh, tail, suffix)
|
||||
}
|
||||
if !reflect.DeepEqual(bh, &bh2) {
|
||||
t.Fatalf("unexpected bh unmarshaled after adding siffux; got\n%+v; want\n%+v", &bh2, bh)
|
||||
t.Fatalf("unexpected bh unmarshaled after adding suffix; got\n%+v; want\n%+v", &bh2, bh)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3499,24 +3499,24 @@ func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
|
||||
func mergeTagToMetricIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) {
|
||||
data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixTagToMetricIDs)
|
||||
data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixDateTagToMetricIDs)
|
||||
return data, items
|
||||
}
|
||||
|
||||
func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) ([]byte, [][]byte) {
|
||||
func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefix byte) ([]byte, []mergeset.Item) {
|
||||
// Perform quick checks whether items contain rows starting from nsPrefix
|
||||
// based on the fact that items are sorted.
|
||||
if len(items) <= 2 {
|
||||
// The first and the last row must remain unchanged.
|
||||
return data, items
|
||||
}
|
||||
firstItem := items[0]
|
||||
firstItem := items[0].Bytes(data)
|
||||
if len(firstItem) > 0 && firstItem[0] > nsPrefix {
|
||||
return data, items
|
||||
}
|
||||
lastItem := items[len(items)-1]
|
||||
lastItem := items[len(items)-1].Bytes(data)
|
||||
if len(lastItem) > 0 && lastItem[0] < nsPrefix {
|
||||
return data, items
|
||||
}
|
||||
|
@ -3529,14 +3529,18 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
|
|||
mpPrev := &tmm.mpPrev
|
||||
dstData := data[:0]
|
||||
dstItems := items[:0]
|
||||
for i, item := range items {
|
||||
for i, it := range items {
|
||||
item := it.Bytes(data)
|
||||
if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 {
|
||||
// Write rows not starting with nsPrefix as-is.
|
||||
// Additionally write the first and the last row as-is in order to preserve
|
||||
// sort order for adjancent blocks.
|
||||
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
|
||||
dstData = append(dstData, item...)
|
||||
dstItems = append(dstItems, dstData[len(dstData)-len(item):])
|
||||
dstItems = append(dstItems, mergeset.Item{
|
||||
Start: uint32(len(dstData) - len(item)),
|
||||
End: uint32(len(dstData)),
|
||||
})
|
||||
continue
|
||||
}
|
||||
if err := mp.Init(item, nsPrefix); err != nil {
|
||||
|
@ -3545,7 +3549,10 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
|
|||
if mp.MetricIDsLen() >= maxMetricIDsPerRow {
|
||||
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
|
||||
dstData = append(dstData, item...)
|
||||
dstItems = append(dstItems, dstData[len(dstData)-len(item):])
|
||||
dstItems = append(dstItems, mergeset.Item{
|
||||
Start: uint32(len(dstData) - len(item)),
|
||||
End: uint32(len(dstData)),
|
||||
})
|
||||
continue
|
||||
}
|
||||
if !mp.EqualPrefix(mpPrev) {
|
||||
|
@ -3561,7 +3568,7 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
|
|||
if len(tmm.pendingMetricIDs) > 0 {
|
||||
logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs)
|
||||
}
|
||||
if !checkItemsSorted(dstItems) {
|
||||
if !checkItemsSorted(dstData, dstItems) {
|
||||
// Items could become unsorted if initial items contain duplicate metricIDs:
|
||||
//
|
||||
// item1: 1, 1, 5
|
||||
|
@ -3579,15 +3586,8 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
|
|||
// into the same new time series from multiple concurrent goroutines.
|
||||
atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1)
|
||||
dstData = append(dstData[:0], tmm.dataCopy...)
|
||||
dstItems = dstItems[:0]
|
||||
// tmm.itemsCopy can point to overwritten data, so it must be updated
|
||||
// to point to real data from tmm.dataCopy.
|
||||
buf := dstData
|
||||
for _, item := range tmm.itemsCopy {
|
||||
dstItems = append(dstItems, buf[:len(item)])
|
||||
buf = buf[len(item):]
|
||||
}
|
||||
if !checkItemsSorted(dstItems) {
|
||||
dstItems = append(dstItems[:0], tmm.itemsCopy...)
|
||||
if !checkItemsSorted(dstData, dstItems) {
|
||||
logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
|
||||
}
|
||||
}
|
||||
|
@ -3599,13 +3599,14 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte)
|
|||
var indexBlocksWithMetricIDsIncorrectOrder uint64
|
||||
var indexBlocksWithMetricIDsProcessed uint64
|
||||
|
||||
func checkItemsSorted(items [][]byte) bool {
|
||||
func checkItemsSorted(data []byte, items []mergeset.Item) bool {
|
||||
if len(items) == 0 {
|
||||
return true
|
||||
}
|
||||
prevItem := items[0]
|
||||
for _, currItem := range items[1:] {
|
||||
if string(prevItem) > string(currItem) {
|
||||
prevItem := items[0].String(data)
|
||||
for _, it := range items[1:] {
|
||||
currItem := it.String(data)
|
||||
if prevItem > currItem {
|
||||
return false
|
||||
}
|
||||
prevItem = currItem
|
||||
|
@ -3633,7 +3634,7 @@ type tagToMetricIDsRowsMerger struct {
|
|||
mp tagToMetricIDsRowParser
|
||||
mpPrev tagToMetricIDsRowParser
|
||||
|
||||
itemsCopy [][]byte
|
||||
itemsCopy []mergeset.Item
|
||||
dataCopy []byte
|
||||
}
|
||||
|
||||
|
@ -3646,7 +3647,7 @@ func (tmm *tagToMetricIDsRowsMerger) Reset() {
|
|||
tmm.dataCopy = tmm.dataCopy[:0]
|
||||
}
|
||||
|
||||
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) {
|
||||
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems []mergeset.Item, mp *tagToMetricIDsRowParser) ([]byte, []mergeset.Item) {
|
||||
if len(tmm.pendingMetricIDs) == 0 {
|
||||
// Nothing to flush
|
||||
return dstData, dstItems
|
||||
|
@ -3661,7 +3662,10 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt
|
|||
for _, metricID := range tmm.pendingMetricIDs {
|
||||
dstData = encoding.MarshalUint64(dstData, metricID)
|
||||
}
|
||||
dstItems = append(dstItems, dstData[dstDataLen:])
|
||||
dstItems = append(dstItems, mergeset.Item{
|
||||
Start: uint32(dstDataLen),
|
||||
End: uint32(len(dstData)),
|
||||
})
|
||||
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
|
||||
return dstData, dstItems
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||
)
|
||||
|
@ -36,33 +37,38 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
|
|||
f := func(items []string, expectedItems []string) {
|
||||
t.Helper()
|
||||
var data []byte
|
||||
var itemsB [][]byte
|
||||
var itemsB []mergeset.Item
|
||||
for _, item := range items {
|
||||
data = append(data, item...)
|
||||
itemsB = append(itemsB, data[len(data)-len(item):])
|
||||
itemsB = append(itemsB, mergeset.Item{
|
||||
Start: uint32(len(data) - len(item)),
|
||||
End: uint32(len(data)),
|
||||
})
|
||||
}
|
||||
if !checkItemsSorted(itemsB) {
|
||||
if !checkItemsSorted(data, itemsB) {
|
||||
t.Fatalf("source items aren't sorted; items:\n%q", itemsB)
|
||||
}
|
||||
resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB)
|
||||
if len(resultItemsB) != len(expectedItems) {
|
||||
t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems))
|
||||
}
|
||||
if !checkItemsSorted(resultItemsB) {
|
||||
if !checkItemsSorted(resultData, resultItemsB) {
|
||||
t.Fatalf("result items aren't sorted; items:\n%q", resultItemsB)
|
||||
}
|
||||
for i, item := range resultItemsB {
|
||||
if !bytes.HasPrefix(resultData, item) {
|
||||
t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, resultData, item)
|
||||
buf := resultData
|
||||
for i, it := range resultItemsB {
|
||||
item := it.Bytes(resultData)
|
||||
if !bytes.HasPrefix(buf, item) {
|
||||
t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, buf, item)
|
||||
}
|
||||
resultData = resultData[len(item):]
|
||||
buf = buf[len(item):]
|
||||
}
|
||||
if len(resultData) != 0 {
|
||||
t.Fatalf("unexpected tail left in resultData: %X", resultData)
|
||||
if len(buf) != 0 {
|
||||
t.Fatalf("unexpected tail left in resultData: %X", buf)
|
||||
}
|
||||
var resultItems []string
|
||||
for _, item := range resultItemsB {
|
||||
resultItems = append(resultItems, string(item))
|
||||
for _, it := range resultItemsB {
|
||||
resultItems = append(resultItems, string(it.Bytes(resultData)))
|
||||
}
|
||||
if !reflect.DeepEqual(expectedItems, resultItems) {
|
||||
t.Fatalf("unexpected items;\ngot\n%X\nwant\n%X", resultItems, expectedItems)
|
||||
|
|
Loading…
Reference in a new issue