mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/mergeset: remove string allocation and copying when unmarshaling blockHeader
This should reduce CPU usage for the case from https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3343
This commit is contained in:
parent
6e4c9d5f6b
commit
454060fd78
4 changed files with 27 additions and 26 deletions
|
@ -16,6 +16,9 @@ type blockHeader struct {
|
|||
// The first item.
|
||||
firstItem []byte
|
||||
|
||||
// Whether commonPrefix and firstItem point to external data.
|
||||
noCopy bool
|
||||
|
||||
// Marshal type used for block compression.
|
||||
marshalType marshalType
|
||||
|
||||
|
@ -40,8 +43,13 @@ func (bh *blockHeader) SizeBytes() int {
|
|||
}
|
||||
|
||||
func (bh *blockHeader) Reset() {
|
||||
if bh.noCopy {
|
||||
bh.commonPrefix = nil
|
||||
bh.firstItem = nil
|
||||
} else {
|
||||
bh.commonPrefix = bh.commonPrefix[:0]
|
||||
bh.firstItem = bh.firstItem[:0]
|
||||
}
|
||||
bh.marshalType = marshalTypePlain
|
||||
bh.itemsCount = 0
|
||||
bh.itemsBlockOffset = 0
|
||||
|
@ -62,13 +70,17 @@ func (bh *blockHeader) Marshal(dst []byte) []byte {
|
|||
return dst
|
||||
}
|
||||
|
||||
func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
|
||||
// UnmarshalNoCopy unmarshals bh from src without copying the data from src.
|
||||
//
|
||||
// The src must remain unchanged while bh is in use.
|
||||
func (bh *blockHeader) UnmarshalNoCopy(src []byte) ([]byte, error) {
|
||||
bh.noCopy = true
|
||||
// Unmarshal commonPrefix
|
||||
tail, cp, err := encoding.UnmarshalBytes(src)
|
||||
if err != nil {
|
||||
return tail, fmt.Errorf("cannot unmarshal commonPrefix: %w", err)
|
||||
}
|
||||
bh.commonPrefix = append(bh.commonPrefix[:0], cp...)
|
||||
bh.commonPrefix = cp[:len(cp):len(cp)]
|
||||
src = tail
|
||||
|
||||
// Unmarshal firstItem
|
||||
|
@ -76,7 +88,7 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
|
|||
if err != nil {
|
||||
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
|
||||
}
|
||||
bh.firstItem = append(bh.firstItem[:0], fi...)
|
||||
bh.firstItem = fi[:len(fi):len(fi)]
|
||||
src = tail
|
||||
|
||||
// Unmarshal marshalType
|
||||
|
@ -137,11 +149,13 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
|
|||
return src, nil
|
||||
}
|
||||
|
||||
// unmarshalBlockHeaders unmarshals all the block headers from src,
|
||||
// unmarshalBlockHeadersNoCopy unmarshals all the block headers from src,
|
||||
// appends them to dst and returns the appended result.
|
||||
//
|
||||
// Block headers must be sorted by bh.firstItem.
|
||||
func unmarshalBlockHeaders(dst []blockHeader, src []byte, blockHeadersCount int) ([]blockHeader, error) {
|
||||
//
|
||||
// It is expected that src remains unchanged while rhe returned blocks are in use.
|
||||
func unmarshalBlockHeadersNoCopy(dst []blockHeader, src []byte, blockHeadersCount int) ([]blockHeader, error) {
|
||||
if blockHeadersCount <= 0 {
|
||||
logger.Panicf("BUG: blockHeadersCount must be greater than 0; got %d", blockHeadersCount)
|
||||
}
|
||||
|
@ -151,7 +165,7 @@ func unmarshalBlockHeaders(dst []blockHeader, src []byte, blockHeadersCount int)
|
|||
}
|
||||
dst = dst[:dstLen+blockHeadersCount]
|
||||
for i := 0; i < blockHeadersCount; i++ {
|
||||
tail, err := dst[dstLen+i].Unmarshal(src)
|
||||
tail, err := dst[dstLen+i].UnmarshalNoCopy(src)
|
||||
if err != nil {
|
||||
return dst, fmt.Errorf("cannot unmarshal block header: %w", err)
|
||||
}
|
||||
|
|
|
@ -295,22 +295,10 @@ func (bsr *blockStreamReader) readNextBHS() error {
|
|||
}
|
||||
|
||||
// Unmarshal the unpacked index block into bsr.bhs.
|
||||
if n := int(mr.blockHeadersCount) - cap(bsr.bhs); n > 0 {
|
||||
bsr.bhs = append(bsr.bhs[:cap(bsr.bhs)], make([]blockHeader, n)...)
|
||||
}
|
||||
bsr.bhs = bsr.bhs[:mr.blockHeadersCount]
|
||||
bsr.bhIdx = 0
|
||||
b := bsr.unpackedBuf
|
||||
for i := 0; i < int(mr.blockHeadersCount); i++ {
|
||||
tail, err := bsr.bhs[i].Unmarshal(b)
|
||||
bsr.bhs, err = unmarshalBlockHeadersNoCopy(bsr.bhs[:0], bsr.unpackedBuf, int(mr.blockHeadersCount))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot unmarshal blockHeader #%d in the index block #%d: %w", len(bsr.bhs), bsr.mrIdx, err)
|
||||
}
|
||||
b = tail
|
||||
}
|
||||
if len(b) > 0 {
|
||||
return fmt.Errorf("unexpected non-empty tail left after unmarshaling block headers; len(tail)=%d", len(b))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -334,8 +334,7 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
|
|||
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
|
||||
}
|
||||
|
||||
// UnmarshalData decodes itemsCount items from sb and firstItem and stores
|
||||
// them to ib.
|
||||
// UnmarshalData decodes itemsCount items from sb and firstItem and stores them to ib.
|
||||
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
|
||||
ib.Reset()
|
||||
|
||||
|
|
|
@ -286,7 +286,7 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
|
|||
return nil, fmt.Errorf("cannot decompress index block: %w", err)
|
||||
}
|
||||
idxb := &indexBlock{}
|
||||
idxb.bhs, err = unmarshalBlockHeaders(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount))
|
||||
idxb.bhs, err = unmarshalBlockHeadersNoCopy(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue