VictoriaMetrics/lib/mergeset/metaindex_row.go
Aliaksandr Valialkin 32193b6059
lib/encoding: optimize UnmarshalVarUint64, UnmarshalVarInt64 and UnmarshalBytes a bit
Change the return values for these functions - now they return the unmarshaled result plus
the size of the unmarshaled result in bytes, so the caller could re-slice the src for further unmarshaling.

This improves performance of these functions in hot loops of VictoriaLogs a bit.
2024-05-14 01:30:25 +02:00

125 lines
3.6 KiB
Go

package mergeset
import (
"fmt"
"io"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
// metaindexRow describes a block of blockHeaders aka index block.
type metaindexRow struct {
// First item in the first block.
// It is used for fast lookup of the required index block.
firstItem []byte
// The number of blockHeaders the block contains.
blockHeadersCount uint32
// The offset of the block in the index file.
indexBlockOffset uint64
// The size of the block in the index file.
indexBlockSize uint32
}
func (mr *metaindexRow) Reset() {
mr.firstItem = mr.firstItem[:0]
mr.blockHeadersCount = 0
mr.indexBlockOffset = 0
mr.indexBlockSize = 0
}
func (mr *metaindexRow) Marshal(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, mr.firstItem)
dst = encoding.MarshalUint32(dst, mr.blockHeadersCount)
dst = encoding.MarshalUint64(dst, mr.indexBlockOffset)
dst = encoding.MarshalUint32(dst, mr.indexBlockSize)
return dst
}
func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
// Unmarshal firstItem
fi, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal firstItem")
}
src = src[nSize:]
mr.firstItem = append(mr.firstItem[:0], fi...)
// Unmarshal blockHeadersCount
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal blockHeadersCount from %d bytes; need at least %d bytes", len(src), 4)
}
mr.blockHeadersCount = encoding.UnmarshalUint32(src)
src = src[4:]
// Unmarshal indexBlockOffset
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal indexBlockOffset from %d bytes; need at least %d bytes", len(src), 8)
}
mr.indexBlockOffset = encoding.UnmarshalUint64(src)
src = src[8:]
// Unmarshal indexBlockSize
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal indexBlockSize from %d bytes; need at least %d bytes", len(src), 4)
}
mr.indexBlockSize = encoding.UnmarshalUint32(src)
src = src[4:]
if mr.blockHeadersCount <= 0 {
return src, fmt.Errorf("blockHeadersCount must be bigger than 0; got %d", mr.blockHeadersCount)
}
if mr.indexBlockSize > 4*maxIndexBlockSize {
// The index block size can exceed maxIndexBlockSize by up to 4x,
// since it can contain commonPrefix and firstItem at blockHeader
// with the maximum length of maxIndexBlockSize per each field.
return src, fmt.Errorf("too big indexBlockSize: %d; cannot exceed %d", mr.indexBlockSize, 4*maxIndexBlockSize)
}
return src, nil
}
func unmarshalMetaindexRows(dst []metaindexRow, r io.Reader) ([]metaindexRow, error) {
// It is ok to read all the metaindex in memory,
// since it is quite small.
compressedData, err := io.ReadAll(r)
if err != nil {
return dst, fmt.Errorf("cannot read metaindex data: %w", err)
}
data, err := encoding.DecompressZSTD(nil, compressedData)
if err != nil {
return dst, fmt.Errorf("cannot decompress metaindex data: %w", err)
}
dstLen := len(dst)
for len(data) > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, metaindexRow{})
}
mr := &dst[len(dst)-1]
tail, err := mr.Unmarshal(data)
if err != nil {
return dst, fmt.Errorf("cannot unmarshal metaindexRow #%d from metaindex data: %w", len(dst)-dstLen, err)
}
data = tail
}
if dstLen == len(dst) {
return dst, fmt.Errorf("expecting non-zero metaindex rows; got zero")
}
// Make sure metaindexRows are sorted by firstItem.
tmp := dst[dstLen:]
ok := sort.SliceIsSorted(tmp, func(i, j int) bool {
return string(tmp[i].firstItem) < string(tmp[j].firstItem)
})
if !ok {
return dst, fmt.Errorf("metaindex %d rows aren't sorted by firstItem", len(tmp))
}
return dst, nil
}