mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
lib/mergeset: remove string allocation and copying when unmarshaling blockHeader
This should reduce CPU usage for the case from https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3343
This commit is contained in:
parent
09b79d74a7
commit
58b40f514c
4 changed files with 27 additions and 26 deletions
|
@ -16,6 +16,9 @@ type blockHeader struct {
|
||||||
// The first item.
|
// The first item.
|
||||||
firstItem []byte
|
firstItem []byte
|
||||||
|
|
||||||
|
// Whether commonPrefix and firstItem point to external data.
|
||||||
|
noCopy bool
|
||||||
|
|
||||||
// Marshal type used for block compression.
|
// Marshal type used for block compression.
|
||||||
marshalType marshalType
|
marshalType marshalType
|
||||||
|
|
||||||
|
@ -40,8 +43,13 @@ func (bh *blockHeader) SizeBytes() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bh *blockHeader) Reset() {
|
func (bh *blockHeader) Reset() {
|
||||||
bh.commonPrefix = bh.commonPrefix[:0]
|
if bh.noCopy {
|
||||||
bh.firstItem = bh.firstItem[:0]
|
bh.commonPrefix = nil
|
||||||
|
bh.firstItem = nil
|
||||||
|
} else {
|
||||||
|
bh.commonPrefix = bh.commonPrefix[:0]
|
||||||
|
bh.firstItem = bh.firstItem[:0]
|
||||||
|
}
|
||||||
bh.marshalType = marshalTypePlain
|
bh.marshalType = marshalTypePlain
|
||||||
bh.itemsCount = 0
|
bh.itemsCount = 0
|
||||||
bh.itemsBlockOffset = 0
|
bh.itemsBlockOffset = 0
|
||||||
|
@ -62,13 +70,17 @@ func (bh *blockHeader) Marshal(dst []byte) []byte {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
|
// UnmarshalNoCopy unmarshals bh from src without copying the data from src.
|
||||||
|
//
|
||||||
|
// The src must remain unchanged while bh is in use.
|
||||||
|
func (bh *blockHeader) UnmarshalNoCopy(src []byte) ([]byte, error) {
|
||||||
|
bh.noCopy = true
|
||||||
// Unmarshal commonPrefix
|
// Unmarshal commonPrefix
|
||||||
tail, cp, err := encoding.UnmarshalBytes(src)
|
tail, cp, err := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return tail, fmt.Errorf("cannot unmarshal commonPrefix: %w", err)
|
return tail, fmt.Errorf("cannot unmarshal commonPrefix: %w", err)
|
||||||
}
|
}
|
||||||
bh.commonPrefix = append(bh.commonPrefix[:0], cp...)
|
bh.commonPrefix = cp[:len(cp):len(cp)]
|
||||||
src = tail
|
src = tail
|
||||||
|
|
||||||
// Unmarshal firstItem
|
// Unmarshal firstItem
|
||||||
|
@ -76,7 +88,7 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
|
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
|
||||||
}
|
}
|
||||||
bh.firstItem = append(bh.firstItem[:0], fi...)
|
bh.firstItem = fi[:len(fi):len(fi)]
|
||||||
src = tail
|
src = tail
|
||||||
|
|
||||||
// Unmarshal marshalType
|
// Unmarshal marshalType
|
||||||
|
@ -137,11 +149,13 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
|
||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmarshalBlockHeaders unmarshals all the block headers from src,
|
// unmarshalBlockHeadersNoCopy unmarshals all the block headers from src,
|
||||||
// appends them to dst and returns the appended result.
|
// appends them to dst and returns the appended result.
|
||||||
//
|
//
|
||||||
// Block headers must be sorted by bh.firstItem.
|
// Block headers must be sorted by bh.firstItem.
|
||||||
func unmarshalBlockHeaders(dst []blockHeader, src []byte, blockHeadersCount int) ([]blockHeader, error) {
|
//
|
||||||
|
// It is expected that src remains unchanged while rhe returned blocks are in use.
|
||||||
|
func unmarshalBlockHeadersNoCopy(dst []blockHeader, src []byte, blockHeadersCount int) ([]blockHeader, error) {
|
||||||
if blockHeadersCount <= 0 {
|
if blockHeadersCount <= 0 {
|
||||||
logger.Panicf("BUG: blockHeadersCount must be greater than 0; got %d", blockHeadersCount)
|
logger.Panicf("BUG: blockHeadersCount must be greater than 0; got %d", blockHeadersCount)
|
||||||
}
|
}
|
||||||
|
@ -151,7 +165,7 @@ func unmarshalBlockHeaders(dst []blockHeader, src []byte, blockHeadersCount int)
|
||||||
}
|
}
|
||||||
dst = dst[:dstLen+blockHeadersCount]
|
dst = dst[:dstLen+blockHeadersCount]
|
||||||
for i := 0; i < blockHeadersCount; i++ {
|
for i := 0; i < blockHeadersCount; i++ {
|
||||||
tail, err := dst[dstLen+i].Unmarshal(src)
|
tail, err := dst[dstLen+i].UnmarshalNoCopy(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dst, fmt.Errorf("cannot unmarshal block header: %w", err)
|
return dst, fmt.Errorf("cannot unmarshal block header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,21 +295,9 @@ func (bsr *blockStreamReader) readNextBHS() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal the unpacked index block into bsr.bhs.
|
// Unmarshal the unpacked index block into bsr.bhs.
|
||||||
if n := int(mr.blockHeadersCount) - cap(bsr.bhs); n > 0 {
|
bsr.bhs, err = unmarshalBlockHeadersNoCopy(bsr.bhs[:0], bsr.unpackedBuf, int(mr.blockHeadersCount))
|
||||||
bsr.bhs = append(bsr.bhs[:cap(bsr.bhs)], make([]blockHeader, n)...)
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("cannot unmarshal blockHeader #%d in the index block #%d: %w", len(bsr.bhs), bsr.mrIdx, err)
|
||||||
bsr.bhs = bsr.bhs[:mr.blockHeadersCount]
|
|
||||||
bsr.bhIdx = 0
|
|
||||||
b := bsr.unpackedBuf
|
|
||||||
for i := 0; i < int(mr.blockHeadersCount); i++ {
|
|
||||||
tail, err := bsr.bhs[i].Unmarshal(b)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal blockHeader #%d in the index block #%d: %w", len(bsr.bhs), bsr.mrIdx, err)
|
|
||||||
}
|
|
||||||
b = tail
|
|
||||||
}
|
|
||||||
if len(b) > 0 {
|
|
||||||
return fmt.Errorf("unexpected non-empty tail left after unmarshaling block headers; len(tail)=%d", len(b))
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -334,8 +334,7 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
|
||||||
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
|
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalData decodes itemsCount items from sb and firstItem and stores
|
// UnmarshalData decodes itemsCount items from sb and firstItem and stores them to ib.
|
||||||
// them to ib.
|
|
||||||
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
|
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
|
||||||
ib.Reset()
|
ib.Reset()
|
||||||
|
|
||||||
|
|
|
@ -286,7 +286,7 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
|
||||||
return nil, fmt.Errorf("cannot decompress index block: %w", err)
|
return nil, fmt.Errorf("cannot decompress index block: %w", err)
|
||||||
}
|
}
|
||||||
idxb := &indexBlock{}
|
idxb := &indexBlock{}
|
||||||
idxb.bhs, err = unmarshalBlockHeaders(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount))
|
idxb.bhs, err = unmarshalBlockHeadersNoCopy(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err)
|
return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue