diff --git a/lib/mergeset/block_header.go b/lib/mergeset/block_header.go index 49e35aa09..ce4c8dd51 100644 --- a/lib/mergeset/block_header.go +++ b/lib/mergeset/block_header.go @@ -16,6 +16,9 @@ type blockHeader struct { // The first item. firstItem []byte + // Whether commonPrefix and firstItem point to external data. + noCopy bool + // Marshal type used for block compression. marshalType marshalType @@ -40,8 +43,13 @@ func (bh *blockHeader) SizeBytes() int { } func (bh *blockHeader) Reset() { - bh.commonPrefix = bh.commonPrefix[:0] - bh.firstItem = bh.firstItem[:0] + if bh.noCopy { + bh.commonPrefix = nil + bh.firstItem = nil + } else { + bh.commonPrefix = bh.commonPrefix[:0] + bh.firstItem = bh.firstItem[:0] + } bh.marshalType = marshalTypePlain bh.itemsCount = 0 bh.itemsBlockOffset = 0 @@ -62,13 +70,17 @@ func (bh *blockHeader) Marshal(dst []byte) []byte { return dst } -func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) { +// UnmarshalNoCopy unmarshals bh from src without copying the data from src. +// +// The src must remain unchanged while bh is in use. +func (bh *blockHeader) UnmarshalNoCopy(src []byte) ([]byte, error) { + bh.noCopy = true // Unmarshal commonPrefix tail, cp, err := encoding.UnmarshalBytes(src) if err != nil { return tail, fmt.Errorf("cannot unmarshal commonPrefix: %w", err) } - bh.commonPrefix = append(bh.commonPrefix[:0], cp...) + bh.commonPrefix = cp[:len(cp):len(cp)] src = tail // Unmarshal firstItem @@ -76,7 +88,7 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) { if err != nil { return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err) } - bh.firstItem = append(bh.firstItem[:0], fi...) + bh.firstItem = fi[:len(fi):len(fi)] src = tail // Unmarshal marshalType @@ -137,11 +149,13 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) { return src, nil } -// unmarshalBlockHeaders unmarshals all the block headers from src, +// unmarshalBlockHeadersNoCopy unmarshals all the block headers from src, // appends them to dst and returns the appended result. // // Block headers must be sorted by bh.firstItem. -func unmarshalBlockHeaders(dst []blockHeader, src []byte, blockHeadersCount int) ([]blockHeader, error) { +// +// It is expected that src remains unchanged while rhe returned blocks are in use. +func unmarshalBlockHeadersNoCopy(dst []blockHeader, src []byte, blockHeadersCount int) ([]blockHeader, error) { if blockHeadersCount <= 0 { logger.Panicf("BUG: blockHeadersCount must be greater than 0; got %d", blockHeadersCount) } @@ -151,7 +165,7 @@ func unmarshalBlockHeaders(dst []blockHeader, src []byte, blockHeadersCount int) } dst = dst[:dstLen+blockHeadersCount] for i := 0; i < blockHeadersCount; i++ { - tail, err := dst[dstLen+i].Unmarshal(src) + tail, err := dst[dstLen+i].UnmarshalNoCopy(src) if err != nil { return dst, fmt.Errorf("cannot unmarshal block header: %w", err) } diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index ff49f23b0..7e21cf44c 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -295,21 +295,9 @@ func (bsr *blockStreamReader) readNextBHS() error { } // Unmarshal the unpacked index block into bsr.bhs. - if n := int(mr.blockHeadersCount) - cap(bsr.bhs); n > 0 { - bsr.bhs = append(bsr.bhs[:cap(bsr.bhs)], make([]blockHeader, n)...) - } - bsr.bhs = bsr.bhs[:mr.blockHeadersCount] - bsr.bhIdx = 0 - b := bsr.unpackedBuf - for i := 0; i < int(mr.blockHeadersCount); i++ { - tail, err := bsr.bhs[i].Unmarshal(b) - if err != nil { - return fmt.Errorf("cannot unmarshal blockHeader #%d in the index block #%d: %w", len(bsr.bhs), bsr.mrIdx, err) - } - b = tail - } - if len(b) > 0 { - return fmt.Errorf("unexpected non-empty tail left after unmarshaling block headers; len(tail)=%d", len(b)) + bsr.bhs, err = unmarshalBlockHeadersNoCopy(bsr.bhs[:0], bsr.unpackedBuf, int(mr.blockHeadersCount)) + if err != nil { + return fmt.Errorf("cannot unmarshal blockHeader #%d in the index block #%d: %w", len(bsr.bhs), bsr.mrIdx, err) } return nil } diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 1236c1861..09d1e0f76 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -334,8 +334,7 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD } -// UnmarshalData decodes itemsCount items from sb and firstItem and stores -// them to ib. +// UnmarshalData decodes itemsCount items from sb and firstItem and stores them to ib. func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error { ib.Reset() diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 1d3c4d30a..a2f2c181d 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -286,7 +286,7 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { return nil, fmt.Errorf("cannot decompress index block: %w", err) } idxb := &indexBlock{} - idxb.bhs, err = unmarshalBlockHeaders(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount)) + idxb.bhs, err = unmarshalBlockHeadersNoCopy(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount)) if err != nil { return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err) }