From 587132555f119c321fb3bc3aff124bb3a44636e5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 21 Feb 2021 22:06:45 +0200 Subject: [PATCH] lib/mergeset: reduce memory usage for inmemoryBlock by using more compact items representation This also should reduce CPU time spent by GC, since inmemoryBlock.items don't have pointers now, so GC doesn't need visiting them. --- lib/mergeset/block_stream_reader.go | 10 +- lib/mergeset/block_stream_reader_test.go | 6 +- lib/mergeset/encoding.go | 187 +++++++++++++++-------- lib/mergeset/encoding_test.go | 26 ++-- lib/mergeset/inmemory_part.go | 4 +- lib/mergeset/merge.go | 38 +++-- lib/mergeset/merge_test.go | 8 +- lib/mergeset/part_search.go | 29 ++-- lib/mergeset/table_search_timing_test.go | 4 +- lib/mergeset/table_test.go | 2 +- lib/storage/block_header_test.go | 2 +- lib/storage/index_db.go | 52 ++++--- lib/storage/index_db_test.go | 30 ++-- 13 files changed, 248 insertions(+), 150 deletions(-) diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index a897108874..06a28b2381 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -195,7 +195,8 @@ func (bsr *blockStreamReader) Next() bool { if err := bsr.readNextBHS(); err != nil { if err == io.EOF { // Check the last item. - lastItem := bsr.Block.items[len(bsr.Block.items)-1] + b := &bsr.Block + lastItem := b.items[len(b.items)-1].Bytes(b.data) if string(bsr.ph.lastItem) != string(lastItem) { err = fmt.Errorf("unexpected last item; got %X; want %X", lastItem, bsr.ph.lastItem) } @@ -240,12 +241,13 @@ func (bsr *blockStreamReader) Next() bool { } if !bsr.firstItemChecked { bsr.firstItemChecked = true - if string(bsr.ph.firstItem) != string(bsr.Block.items[0]) { - bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", bsr.Block.items[0], bsr.ph.firstItem) + b := &bsr.Block + firstItem := b.items[0].Bytes(b.data) + if string(bsr.ph.firstItem) != string(firstItem) { + bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", firstItem, bsr.ph.firstItem) return false } } - return true } diff --git a/lib/mergeset/block_stream_reader_test.go b/lib/mergeset/block_stream_reader_test.go index 056b05cec1..c9175549d2 100644 --- a/lib/mergeset/block_stream_reader_test.go +++ b/lib/mergeset/block_stream_reader_test.go @@ -44,8 +44,10 @@ func testBlockStreamReaderRead(ip *inmemoryPart, items []string) error { bsr := newTestBlockStreamReader(ip) i := 0 for bsr.Next() { - for _, item := range bsr.Block.items { - if string(item) != items[i] { + data := bsr.Block.data + for _, it := range bsr.Block.items { + item := it.String(data) + if item != items[i] { return fmt.Errorf("unexpected item[%d]; got %q; want %q", i, item, items[i]) } i++ diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 647b0a6745..8cb57082cf 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -3,6 +3,7 @@ package mergeset import ( "fmt" "os" + "reflect" "sort" "strings" "sync" @@ -13,35 +14,62 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -type byteSliceSorter [][]byte +// Item represents a single item for storing in a mergeset. +type Item struct { + // Start is start offset for the item in data. + Start uint32 -func (s byteSliceSorter) Len() int { return len(s) } -func (s byteSliceSorter) Less(i, j int) bool { - return string(s[i]) < string(s[j]) + // End is end offset for the item in data. + End uint32 } -func (s byteSliceSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] + +// Bytes returns bytes representation of it obtained from data. +// +// The returned bytes representation belongs to data. +func (it Item) Bytes(data []byte) []byte { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&data)) + sh.Cap = int(it.End - it.Start) + sh.Len = int(it.End - it.Start) + sh.Data += uintptr(it.Start) + return data +} + +// String returns string represetnation of it obtained from data. +// +// The returned string representation belongs to data. +func (it Item) String(data []byte) string { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&data)) + sh.Data += uintptr(it.Start) + sh.Len = int(it.End - it.Start) + return *(*string)(unsafe.Pointer(sh)) +} + +func (ib *inmemoryBlock) Len() int { return len(ib.items) } + +func (ib *inmemoryBlock) Less(i, j int) bool { + data := ib.data + items := ib.items + return string(items[i].Bytes(data)) < string(items[j].Bytes(data)) +} + +func (ib *inmemoryBlock) Swap(i, j int) { + items := ib.items + items[i], items[j] = items[j], items[i] } type inmemoryBlock struct { commonPrefix []byte data []byte - items byteSliceSorter + items []Item } func (ib *inmemoryBlock) SizeBytes() int { - return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof([]byte{})) + return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof(Item{})) } func (ib *inmemoryBlock) Reset() { ib.commonPrefix = ib.commonPrefix[:0] ib.data = ib.data[:0] - - items := ib.items - for i := range items { - // Remove reference to by slice, so GC could free the byte slice. - items[i] = nil - } ib.items = ib.items[:0] } @@ -50,12 +78,14 @@ func (ib *inmemoryBlock) updateCommonPrefix() { if len(ib.items) == 0 { return } - cp := ib.items[0] + items := ib.items + data := ib.data + cp := items[0].Bytes(data) if len(cp) == 0 { return } - for _, item := range ib.items[1:] { - cpLen := commonPrefixLen(cp, item) + for _, it := range items[1:] { + cpLen := commonPrefixLen(cp, it.Bytes(data)) if cpLen == 0 { return } @@ -82,15 +112,21 @@ func commonPrefixLen(a, b []byte) int { // // false is returned if x isn't added to ib due to block size contraints. func (ib *inmemoryBlock) Add(x []byte) bool { - if len(x)+len(ib.data) > maxInmemoryBlockSize { + data := ib.data + if len(x)+len(data) > maxInmemoryBlockSize { return false } - if cap(ib.data) < maxInmemoryBlockSize { - dataLen := len(ib.data) - ib.data = bytesutil.Resize(ib.data, maxInmemoryBlockSize)[:dataLen] + if cap(data) < maxInmemoryBlockSize { + dataLen := len(data) + data = bytesutil.Resize(data, maxInmemoryBlockSize)[:dataLen] } - ib.data = append(ib.data, x...) - ib.items = append(ib.items, ib.data[len(ib.data)-len(x):]) + dataLen := len(data) + data = append(data, x...) + ib.items = append(ib.items, Item{ + Start: uint32(dataLen), + End: uint32(len(data)), + }) + ib.data = data return true } @@ -100,16 +136,21 @@ func (ib *inmemoryBlock) Add(x []byte) bool { const maxInmemoryBlockSize = 64 * 1024 func (ib *inmemoryBlock) sort() { - // Use sort.Sort instead of sort.Slice in order to eliminate memory allocation. - sort.Sort(&ib.items) + sort.Sort(ib) + data := ib.data + items := ib.items bb := bbPool.Get() - b := bytesutil.Resize(bb.B, len(ib.data)) + b := bytesutil.Resize(bb.B, len(data)) b = b[:0] - for i, item := range ib.items { - b = append(b, item...) - ib.items[i] = b[len(b)-len(item):] + for i, it := range items { + bLen := len(b) + b = append(b, it.String(data)...) + items[i] = Item{ + Start: uint32(bLen), + End: uint32(len(b)), + } } - bb.B, ib.data = ib.data, b + bb.B, ib.data = data, b bbPool.Put(bb) } @@ -140,7 +181,7 @@ func checkMarshalType(mt marshalType) error { func (ib *inmemoryBlock) isSorted() bool { // Use sort.IsSorted instead of sort.SliceIsSorted in order to eliminate memory allocation. - return sort.IsSorted(&ib.items) + return sort.IsSorted(ib) } // MarshalUnsortedData marshals unsorted items from ib to sb. @@ -179,9 +220,11 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo func (ib *inmemoryBlock) debugItemsString() string { var sb strings.Builder - var prevItem []byte - for i, item := range ib.items { - if string(item) < string(prevItem) { + var prevItem string + data := ib.data + for i, it := range ib.items { + item := it.String(data) + if item < prevItem { fmt.Fprintf(&sb, "!!! the next item is smaller than the previous item !!!\n") } fmt.Fprintf(&sb, "%05d %X\n", i, item) @@ -201,7 +244,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi logger.Panicf("BUG: the number of items in the block must be smaller than %d; got %d items", uint64(1<<32), len(ib.items)) } - firstItemDst = append(firstItemDst, ib.items[0]...) + data := ib.data + firstItem := ib.items[0].Bytes(data) + firstItemDst = append(firstItemDst, firstItem...) commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...) if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 { @@ -221,10 +266,11 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi defer encoding.PutUint64s(xs) cpLen := len(ib.commonPrefix) - prevItem := ib.items[0][cpLen:] + prevItem := firstItem[cpLen:] prevPrefixLen := uint64(0) - for i, item := range ib.items[1:] { - item := item[cpLen:] + for i, it := range ib.items[1:] { + it.Start += uint32(cpLen) + item := it.Bytes(data) prefixLen := uint64(commonPrefixLen(prevItem, item)) bItems = append(bItems, item[prefixLen:]...) xLen := prefixLen ^ prevPrefixLen @@ -240,9 +286,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi bbPool.Put(bbItems) // Marshal lens data. - prevItemLen := uint64(len(ib.items[0]) - cpLen) - for i, item := range ib.items[1:] { - itemLen := uint64(len(item) - cpLen) + prevItemLen := uint64(len(firstItem) - cpLen) + for i, it := range ib.items[1:] { + itemLen := uint64(int(it.End-it.Start) - cpLen) xLen := itemLen ^ prevItemLen prevItemLen = itemLen @@ -346,11 +392,15 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix } data := bytesutil.Resize(ib.data, maxInmemoryBlockSize) if n := int(itemsCount) - cap(ib.items); n > 0 { - ib.items = append(ib.items[:cap(ib.items)], make([][]byte, n)...) + ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...) } ib.items = ib.items[:itemsCount] data = append(data[:0], firstItem...) - ib.items[0] = data + items := ib.items + items[0] = Item{ + Start: 0, + End: uint32(len(data)), + } prevItem := data[len(commonPrefix):] b := bb.B for i := 1; i < int(itemsCount); i++ { @@ -363,17 +413,19 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix if uint64(len(b)) < suffixLen { return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", suffixLen, len(b)) } - data = append(data, commonPrefix...) - if prefixLen > uint64(len(prevItem)) { return fmt.Errorf("prefixLen cannot exceed %d; got %d", len(prevItem), prefixLen) } + dataLen := len(data) + data = append(data, commonPrefix...) data = append(data, prevItem[:prefixLen]...) data = append(data, b[:suffixLen]...) - item := data[len(data)-int(itemLen)-len(commonPrefix):] - ib.items[i] = item + items[i] = Item{ + Start: uint32(dataLen), + End: uint32(len(data)), + } b = b[suffixLen:] - prevItem = item[len(commonPrefix):] + prevItem = data[len(data)-int(itemLen):] } if len(b) > 0 { return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b) @@ -381,30 +433,33 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix if uint64(len(data)) != dataLen { return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen) } + ib.data = data if !ib.isSorted() { return fmt.Errorf("decoded data block contains unsorted items; items:\n%s", ib.debugItemsString()) } - ib.data = data return nil } var bbPool bytesutil.ByteBufferPool func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) { + data := ib.data + // Marshal items data. // There is no need in marshaling the first item, since it is returned // to the caller in marshalData. cpLen := len(ib.commonPrefix) b := sb.itemsData[:0] - for _, item := range ib.items[1:] { - b = append(b, item[cpLen:]...) + for _, it := range ib.items[1:] { + it.Start += uint32(cpLen) + b = append(b, it.String(data)...) } sb.itemsData = b // Marshal length data. b = sb.lensData[:0] - for _, item := range ib.items[1:] { - b = encoding.MarshalUint64(b, uint64(len(item)-cpLen)) + for _, it := range ib.items[1:] { + b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen)) } sb.lensData = b } @@ -431,26 +486,34 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte, } // Unmarshal items data. - ib.data = bytesutil.Resize(ib.data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount)) - ib.data = append(ib.data[:0], firstItem...) - ib.items = append(ib.items[:0], ib.data) - + data := ib.data + items := ib.items + data = bytesutil.Resize(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount)) + data = append(data[:0], firstItem...) + items = append(items[:0], Item{ + Start: 0, + End: uint32(len(data)), + }) b = sb.itemsData for i := 1; i < int(itemsCount); i++ { itemLen := lb.lens[i] if uint64(len(b)) < itemLen { return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b)) } - ib.data = append(ib.data, commonPrefix...) - ib.data = append(ib.data, b[:itemLen]...) - item := ib.data[len(ib.data)-int(itemLen)-len(commonPrefix):] - ib.items = append(ib.items, item) + dataLen := len(data) + data = append(data, commonPrefix...) + data = append(data, b[:itemLen]...) + items = append(items, Item{ + Start: uint32(dataLen), + End: uint32(len(data)), + }) b = b[itemLen:] } + ib.data = data + ib.items = items if len(b) > 0 { return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b) } - return nil } diff --git a/lib/mergeset/encoding_test.go b/lib/mergeset/encoding_test.go index 549a45d659..993c4b6efd 100644 --- a/lib/mergeset/encoding_test.go +++ b/lib/mergeset/encoding_test.go @@ -37,8 +37,10 @@ func TestInmemoryBlockAdd(t *testing.T) { if len(ib.data) != totalLen { t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen) } - for j, item := range ib.items { - if items[j] != string(item) { + data := ib.data + for j, it := range ib.items { + item := it.String(data) + if items[j] != item { t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j]) } } @@ -75,8 +77,10 @@ func TestInmemoryBlockSort(t *testing.T) { if len(ib.data) != totalLen { t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen) } - for j, item := range ib.items { - if items[j] != string(item) { + data := ib.data + for j, it := range ib.items { + item := it.String(data) + if items[j] != item { t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j]) } } @@ -122,8 +126,9 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { if int(itemsLen) != len(ib.items) { t.Fatalf("unexpected number of items marshaled; got %d; want %d", itemsLen, len(ib.items)) } - if string(firstItem) != string(ib.items[0]) { - t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, ib.items[0]) + firstItemExpected := ib.items[0].String(ib.data) + if string(firstItem) != firstItemExpected { + t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, firstItemExpected) } if err := checkMarshalType(mt); err != nil { t.Fatalf("invalid mt: %s", err) @@ -143,12 +148,15 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib2.data), totalLen) } for j := range items { - if len(items[j]) != len(ib2.items[j]) { + it2 := ib2.items[j] + item2 := it2.String(ib2.data) + if len(items[j]) != len(item2) { t.Fatalf("items length mismatch at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X", - j, len(items), i, len(ib2.items[j]), ib2.items[j], len(items[j]), items[j]) + j, len(items), i, len(item2), item2, len(items[j]), items[j]) } } - for j, item := range ib2.items { + for j, it := range ib2.items { + item := it.String(ib2.data) if items[j] != string(item) { t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X", j, len(items), i, len(item), item, len(items[j]), items[j]) diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index 8297ec5f14..e2f4f02bd8 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -56,8 +56,8 @@ func (ip *inmemoryPart) Init(ib *inmemoryBlock) { ip.ph.itemsCount = uint64(len(ib.items)) ip.ph.blocksCount = 1 - ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0]...) - ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1]...) + ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...) + ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...) fs.MustWriteData(&ip.itemsData, ip.sb.itemsData) ip.bh.itemsBlockOffset = 0 diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 276354e8de..7f2fa5ccfa 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -16,7 +16,7 @@ import ( // // The callback must return sorted items. The first and the last item must be unchanged. // The callback can re-use data and items for storing the result. -type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte) +type PrepareBlockCallback func(data []byte, items []Item) ([]byte, []Item) // mergeBlockStreams merges bsrs and writes result to bsw. // @@ -122,8 +122,10 @@ again: nextItem = bsm.bsrHeap[0].bh.firstItem hasNextItem = true } + items := bsr.Block.items + data := bsr.Block.data for bsr.blockItemIdx < len(bsr.Block.items) { - item := bsr.Block.items[bsr.blockItemIdx] + item := items[bsr.blockItemIdx].Bytes(data) if hasNextItem && string(item) > string(nextItem) { break } @@ -148,32 +150,36 @@ again: // The next item in the bsr.Block exceeds nextItem. // Adjust bsr.bh.firstItem and return bsr to heap. - bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx]...) + bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...) heap.Push(&bsm.bsrHeap, bsr) goto again } func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) { - if len(bsm.ib.items) == 0 { + items := bsm.ib.items + data := bsm.ib.data + if len(items) == 0 { // Nothing to flush. return } - atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items))) + atomic.AddUint64(itemsMerged, uint64(len(items))) if bsm.prepareBlock != nil { - bsm.firstItem = append(bsm.firstItem[:0], bsm.ib.items[0]...) - bsm.lastItem = append(bsm.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...) - bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items) - if len(bsm.ib.items) == 0 { + bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...) + bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...) + data, items = bsm.prepareBlock(data, items) + bsm.ib.data = data + bsm.ib.items = items + if len(items) == 0 { // Nothing to flush return } // Consistency checks after prepareBlock call. - firstItem := bsm.ib.items[0] - if string(firstItem) != string(bsm.firstItem) { + firstItem := items[0].String(data) + if firstItem != string(bsm.firstItem) { logger.Panicf("BUG: prepareBlock must return first item equal to the original first item;\ngot\n%X\nwant\n%X", firstItem, bsm.firstItem) } - lastItem := bsm.ib.items[len(bsm.ib.items)-1] - if string(lastItem) != string(bsm.lastItem) { + lastItem := items[len(items)-1].String(data) + if lastItem != string(bsm.lastItem) { logger.Panicf("BUG: prepareBlock must return last item equal to the original last item;\ngot\n%X\nwant\n%X", lastItem, bsm.lastItem) } // Verify whether the bsm.ib.items are sorted only in tests, since this @@ -182,12 +188,12 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it logger.Panicf("BUG: prepareBlock must return sorted items;\ngot\n%s", bsm.ib.debugItemsString()) } } - ph.itemsCount += uint64(len(bsm.ib.items)) + ph.itemsCount += uint64(len(items)) if !bsm.phFirstItemCaught { - ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...) + ph.firstItem = append(ph.firstItem[:0], items[0].String(data)...) bsm.phFirstItemCaught = true } - ph.lastItem = append(ph.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...) + ph.lastItem = append(ph.lastItem[:0], items[len(items)-1].String(data)...) bsw.WriteBlock(&bsm.ib) bsm.ib.Reset() ph.blocksCount++ diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 8d34ce4218..f042bae0fc 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -157,10 +157,12 @@ func testCheckItems(dstIP *inmemoryPart, items []string) error { if bh.itemsCount <= 0 { return fmt.Errorf("unexpected empty block") } - if string(bh.firstItem) != string(dstBsr.Block.items[0]) { - return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, dstBsr.Block.items[0]) + item := dstBsr.Block.items[0].Bytes(dstBsr.Block.data) + if string(bh.firstItem) != string(item) { + return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, item) } - for _, item := range dstBsr.Block.items { + for _, it := range dstBsr.Block.items { + item := it.Bytes(dstBsr.Block.data) dstItems = append(dstItems, string(item)) } } diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index ea4b5ca25c..8671561b63 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -142,14 +142,17 @@ func (ps *partSearch) Seek(k []byte) { // Locate the first item to scan in the block. items := ps.ib.items + data := ps.ib.data cpLen := commonPrefixLen(ps.ib.commonPrefix, k) if cpLen > 0 { keySuffix := k[cpLen:] ps.ibItemIdx = sort.Search(len(items), func(i int) bool { - return string(keySuffix) <= string(items[i][cpLen:]) + it := items[i] + it.Start += uint32(cpLen) + return string(keySuffix) <= it.String(data) }) } else { - ps.ibItemIdx = binarySearchKey(items, k) + ps.ibItemIdx = binarySearchKey(data, items, k) } if ps.ibItemIdx < len(items) { // The item has been found. @@ -168,13 +171,14 @@ func (ps *partSearch) tryFastSeek(k []byte) bool { if ps.ib == nil { return false } + data := ps.ib.data items := ps.ib.items idx := ps.ibItemIdx if idx >= len(items) { // The ib is exhausted. return false } - if string(k) > string(items[len(items)-1]) { + if string(k) > items[len(items)-1].String(data) { // The item is located in next blocks. return false } @@ -183,8 +187,8 @@ func (ps *partSearch) tryFastSeek(k []byte) bool { if idx > 0 { idx-- } - if string(k) < string(items[idx]) { - if string(k) < string(items[0]) { + if string(k) < items[idx].String(data) { + if string(k) < items[0].String(data) { // The item is located in previous blocks. return false } @@ -192,7 +196,7 @@ func (ps *partSearch) tryFastSeek(k []byte) bool { } // The item is located in the current block - ps.ibItemIdx = idx + binarySearchKey(items[idx:], k) + ps.ibItemIdx = idx + binarySearchKey(data, items[idx:], k) return true } @@ -204,10 +208,11 @@ func (ps *partSearch) NextItem() bool { return false } - if ps.ibItemIdx < len(ps.ib.items) { + items := ps.ib.items + if ps.ibItemIdx < len(items) { // Fast path - the current block contains more items. // Proceed to the next item. - ps.Item = ps.ib.items[ps.ibItemIdx] + ps.Item = items[ps.ibItemIdx].Bytes(ps.ib.data) ps.ibItemIdx++ return true } @@ -219,7 +224,7 @@ func (ps *partSearch) NextItem() bool { } // Invariant: len(ps.ib.items) > 0 after nextBlock. - ps.Item = ps.ib.items[0] + ps.Item = ps.ib.items[0].Bytes(ps.ib.data) ps.ibItemIdx++ return true } @@ -319,11 +324,11 @@ func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) return ib, nil } -func binarySearchKey(items [][]byte, key []byte) int { +func binarySearchKey(data []byte, items []Item, key []byte) int { if len(items) == 0 { return 0 } - if string(key) <= string(items[0]) { + if string(key) <= items[0].String(data) { // Fast path - the item is the first. return 0 } @@ -335,7 +340,7 @@ func binarySearchKey(items [][]byte, key []byte) int { i, j := uint(0), n for i < j { h := uint(i+j) >> 1 - if h >= 0 && h < uint(len(items)) && string(key) > string(items[h]) { + if h >= 0 && h < uint(len(items)) && string(key) > items[h].String(data) { i = h + 1 } else { j = h diff --git a/lib/mergeset/table_search_timing_test.go b/lib/mergeset/table_search_timing_test.go index f8d6e3515f..add04e46ef 100644 --- a/lib/mergeset/table_search_timing_test.go +++ b/lib/mergeset/table_search_timing_test.go @@ -46,7 +46,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { b.Run("sequential-keys-exact", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, keys, 0) }) - b.Run("sequential-keys-without-siffux", func(b *testing.B) { + b.Run("sequential-keys-without-suffix", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, keys, 4) }) @@ -57,7 +57,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { b.Run("random-keys-exact", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, randKeys, 0) }) - b.Run("random-keys-without-siffux", func(b *testing.B) { + b.Run("random-keys-without-suffix", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, randKeys, 4) }) } diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index aa43fe9e11..f61d2e6356 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -218,7 +218,7 @@ func TestTableAddItemsConcurrent(t *testing.T) { atomic.AddUint64(&flushes, 1) } var itemsMerged uint64 - prepareBlock := func(data []byte, items [][]byte) ([]byte, [][]byte) { + prepareBlock := func(data []byte, items []Item) ([]byte, []Item) { atomic.AddUint64(&itemsMerged, uint64(len(items))) return data, items } diff --git a/lib/storage/block_header_test.go b/lib/storage/block_header_test.go index 86b120c181..efc9d71f7d 100644 --- a/lib/storage/block_header_test.go +++ b/lib/storage/block_header_test.go @@ -76,6 +76,6 @@ func testBlockHeaderMarshalUnmarshal(t *testing.T, bh *blockHeader) { t.Fatalf("unexpected tail after unmarshaling bh=%+v; got\n%x; want\n%x", bh, tail, suffix) } if !reflect.DeepEqual(bh, &bh2) { - t.Fatalf("unexpected bh unmarshaled after adding siffux; got\n%+v; want\n%+v", &bh2, bh) + t.Fatalf("unexpected bh unmarshaled after adding suffix; got\n%+v; want\n%+v", &bh2, bh) } } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 61a3419c82..04cf922a79 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -3499,24 +3499,24 @@ func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool { return true } -func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { +func mergeTagToMetricIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) { data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixTagToMetricIDs) data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixDateTagToMetricIDs) return data, items } -func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) ([]byte, [][]byte) { +func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefix byte) ([]byte, []mergeset.Item) { // Perform quick checks whether items contain rows starting from nsPrefix // based on the fact that items are sorted. if len(items) <= 2 { // The first and the last row must remain unchanged. return data, items } - firstItem := items[0] + firstItem := items[0].Bytes(data) if len(firstItem) > 0 && firstItem[0] > nsPrefix { return data, items } - lastItem := items[len(items)-1] + lastItem := items[len(items)-1].Bytes(data) if len(lastItem) > 0 && lastItem[0] < nsPrefix { return data, items } @@ -3529,14 +3529,18 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) mpPrev := &tmm.mpPrev dstData := data[:0] dstItems := items[:0] - for i, item := range items { + for i, it := range items { + item := it.Bytes(data) if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 { // Write rows not starting with nsPrefix as-is. // Additionally write the first and the last row as-is in order to preserve // sort order for adjancent blocks. dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) dstData = append(dstData, item...) - dstItems = append(dstItems, dstData[len(dstData)-len(item):]) + dstItems = append(dstItems, mergeset.Item{ + Start: uint32(len(dstData) - len(item)), + End: uint32(len(dstData)), + }) continue } if err := mp.Init(item, nsPrefix); err != nil { @@ -3545,7 +3549,10 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) if mp.MetricIDsLen() >= maxMetricIDsPerRow { dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) dstData = append(dstData, item...) - dstItems = append(dstItems, dstData[len(dstData)-len(item):]) + dstItems = append(dstItems, mergeset.Item{ + Start: uint32(len(dstData) - len(item)), + End: uint32(len(dstData)), + }) continue } if !mp.EqualPrefix(mpPrev) { @@ -3561,7 +3568,7 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) if len(tmm.pendingMetricIDs) > 0 { logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs) } - if !checkItemsSorted(dstItems) { + if !checkItemsSorted(dstData, dstItems) { // Items could become unsorted if initial items contain duplicate metricIDs: // // item1: 1, 1, 5 @@ -3579,15 +3586,8 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) // into the same new time series from multiple concurrent goroutines. atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1) dstData = append(dstData[:0], tmm.dataCopy...) - dstItems = dstItems[:0] - // tmm.itemsCopy can point to overwritten data, so it must be updated - // to point to real data from tmm.dataCopy. - buf := dstData - for _, item := range tmm.itemsCopy { - dstItems = append(dstItems, buf[:len(item)]) - buf = buf[len(item):] - } - if !checkItemsSorted(dstItems) { + dstItems = append(dstItems[:0], tmm.itemsCopy...) + if !checkItemsSorted(dstData, dstItems) { logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems) } } @@ -3599,13 +3599,14 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) var indexBlocksWithMetricIDsIncorrectOrder uint64 var indexBlocksWithMetricIDsProcessed uint64 -func checkItemsSorted(items [][]byte) bool { +func checkItemsSorted(data []byte, items []mergeset.Item) bool { if len(items) == 0 { return true } - prevItem := items[0] - for _, currItem := range items[1:] { - if string(prevItem) > string(currItem) { + prevItem := items[0].String(data) + for _, it := range items[1:] { + currItem := it.String(data) + if prevItem > currItem { return false } prevItem = currItem @@ -3633,7 +3634,7 @@ type tagToMetricIDsRowsMerger struct { mp tagToMetricIDsRowParser mpPrev tagToMetricIDsRowParser - itemsCopy [][]byte + itemsCopy []mergeset.Item dataCopy []byte } @@ -3646,7 +3647,7 @@ func (tmm *tagToMetricIDsRowsMerger) Reset() { tmm.dataCopy = tmm.dataCopy[:0] } -func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) { +func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems []mergeset.Item, mp *tagToMetricIDsRowParser) ([]byte, []mergeset.Item) { if len(tmm.pendingMetricIDs) == 0 { // Nothing to flush return dstData, dstItems @@ -3661,7 +3662,10 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt for _, metricID := range tmm.pendingMetricIDs { dstData = encoding.MarshalUint64(dstData, metricID) } - dstItems = append(dstItems, dstData[dstDataLen:]) + dstItems = append(dstItems, mergeset.Item{ + Start: uint32(dstDataLen), + End: uint32(len(dstData)), + }) tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] return dstData, dstItems } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 0a5632479b..0450161779 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) @@ -36,33 +37,38 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { f := func(items []string, expectedItems []string) { t.Helper() var data []byte - var itemsB [][]byte + var itemsB []mergeset.Item for _, item := range items { data = append(data, item...) - itemsB = append(itemsB, data[len(data)-len(item):]) + itemsB = append(itemsB, mergeset.Item{ + Start: uint32(len(data) - len(item)), + End: uint32(len(data)), + }) } - if !checkItemsSorted(itemsB) { + if !checkItemsSorted(data, itemsB) { t.Fatalf("source items aren't sorted; items:\n%q", itemsB) } resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB) if len(resultItemsB) != len(expectedItems) { t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems)) } - if !checkItemsSorted(resultItemsB) { + if !checkItemsSorted(resultData, resultItemsB) { t.Fatalf("result items aren't sorted; items:\n%q", resultItemsB) } - for i, item := range resultItemsB { - if !bytes.HasPrefix(resultData, item) { - t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, resultData, item) + buf := resultData + for i, it := range resultItemsB { + item := it.Bytes(resultData) + if !bytes.HasPrefix(buf, item) { + t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, buf, item) } - resultData = resultData[len(item):] + buf = buf[len(item):] } - if len(resultData) != 0 { - t.Fatalf("unexpected tail left in resultData: %X", resultData) + if len(buf) != 0 { + t.Fatalf("unexpected tail left in resultData: %X", buf) } var resultItems []string - for _, item := range resultItemsB { - resultItems = append(resultItems, string(item)) + for _, it := range resultItemsB { + resultItems = append(resultItems, string(it.Bytes(resultData))) } if !reflect.DeepEqual(expectedItems, resultItems) { t.Fatalf("unexpected items;\ngot\n%X\nwant\n%X", resultItems, expectedItems)