VictoriaMetrics/lib/mergeset/encoding.go
Aliaksandr Valialkin db00ddd23e
lib/mergeset: skip common prefixes when comparing inmemoryBlock items
This should improve the performance for items sorting inside inmemoryBlock.MarshalUnsortedData
if they have common prefix.

While at it, improve the performance for inmemoryBlock.updateCommonPrefix for sorted items.
This should improve performance for inmemoryBlock.MarshalSortedData during background merge.
2022-04-06 18:55:25 +03:00

580 lines
16 KiB
Go

package mergeset
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// Item represents a single item for storing in a mergeset.
type Item struct {
// Start is start offset for the item in data.
Start uint32
// End is end offset for the item in data.
End uint32
}
// Bytes returns bytes representation of it obtained from data.
//
// The returned bytes representation belongs to data.
func (it Item) Bytes(data []byte) []byte {
n := int(it.End - it.Start)
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
sh.Cap = n
sh.Len = n
sh.Data += uintptr(it.Start)
return data
}
// String returns string represetnation of it obtained from data.
//
// The returned string representation belongs to data.
func (it Item) String(data []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
sh.Data += uintptr(it.Start)
sh.Len = int(it.End - it.Start)
return *(*string)(unsafe.Pointer(sh))
}
func (ib *inmemoryBlock) Len() int { return len(ib.items) }
func (ib *inmemoryBlock) Less(i, j int) bool {
items := ib.items
a := items[i]
b := items[j]
cpLen := uint32(len(ib.commonPrefix))
a.Start += cpLen
b.Start += cpLen
data := ib.data
return string(items[i].Bytes(data)) < string(items[j].Bytes(data))
}
func (ib *inmemoryBlock) Swap(i, j int) {
items := ib.items
items[i], items[j] = items[j], items[i]
}
type inmemoryBlock struct {
// commonPrefix contains common prefix for all the items stored in the inmemoryBlock
commonPrefix []byte
// data contains source data for items
data []byte
// items contains items stored in inmemoryBlock.
// Every item contains the prefix specified at commonPrefix.
items []Item
}
func (ib *inmemoryBlock) SizeBytes() int {
return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof(Item{}))
}
func (ib *inmemoryBlock) Reset() {
ib.commonPrefix = ib.commonPrefix[:0]
ib.data = ib.data[:0]
ib.items = ib.items[:0]
}
func (ib *inmemoryBlock) updateCommonPrefixSorted() {
ib.commonPrefix = ib.commonPrefix[:0]
items := ib.items
if len(items) == 0 {
return
}
data := ib.data
cp := items[0].Bytes(data)
if len(items) > 1 {
cpLen := commonPrefixLen(cp, items[len(items)-1].Bytes(data))
cp = cp[:cpLen]
}
ib.commonPrefix = append(ib.commonPrefix[:0], cp...)
}
func (ib *inmemoryBlock) updateCommonPrefixUnsorted() {
ib.commonPrefix = ib.commonPrefix[:0]
items := ib.items
if len(items) == 0 {
return
}
data := ib.data
cp := items[0].Bytes(data)
for _, it := range items[1:] {
cpLen := commonPrefixLen(cp, it.Bytes(data))
if cpLen == 0 {
return
}
cp = cp[:cpLen]
}
ib.commonPrefix = append(ib.commonPrefix[:0], cp...)
}
func commonPrefixLen(a, b []byte) int {
i := 0
if len(a) > len(b) {
for i < len(b) && a[i] == b[i] {
i++
}
} else {
for i < len(a) && a[i] == b[i] {
i++
}
}
return i
}
// Add adds x to the end of ib.
//
// false is returned if x isn't added to ib due to block size contraints.
func (ib *inmemoryBlock) Add(x []byte) bool {
data := ib.data
if len(x)+len(data) > maxInmemoryBlockSize {
return false
}
if cap(data) == 0 {
// Pre-allocate data and items in order to reduce memory allocations
data = make([]byte, 0, maxInmemoryBlockSize)
ib.items = make([]Item, 0, 512)
}
dataLen := len(data)
data = append(data, x...)
ib.items = append(ib.items, Item{
Start: uint32(dataLen),
End: uint32(len(data)),
})
ib.data = data
return true
}
// maxInmemoryBlockSize is the maximum inmemoryBlock.data size.
//
// It must fit CPU cache size, i.e. 64KB for the current CPUs.
const maxInmemoryBlockSize = 64 * 1024
// storageBlock represents a block of data on the storage.
type storageBlock struct {
itemsData []byte
lensData []byte
}
func (sb *storageBlock) Reset() {
sb.itemsData = sb.itemsData[:0]
sb.lensData = sb.lensData[:0]
}
type marshalType uint8
const (
marshalTypePlain = marshalType(0)
marshalTypeZSTD = marshalType(1)
)
func checkMarshalType(mt marshalType) error {
if mt < 0 || mt > 1 {
return fmt.Errorf("marshalType must be in the range [0..1]; got %d", mt)
}
return nil
}
func (ib *inmemoryBlock) isSorted() bool {
// Use sort.IsSorted instead of sort.SliceIsSorted in order to eliminate memory allocation.
return sort.IsSorted(ib)
}
// MarshalUnsortedData marshals unsorted items from ib to sb.
//
// It also:
// - appends first item to firstItemDst and returns the result.
// - appends common prefix for all the items to commonPrefixDst and returns the result.
// - returns the number of items encoded including the first item.
// - returns the marshal type used for the encoding.
func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
if !ib.isSorted() {
ib.updateCommonPrefixUnsorted()
sort.Sort(ib)
} else {
ib.updateCommonPrefixSorted()
}
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
}
var isInTest = func() bool {
return strings.HasSuffix(os.Args[0], ".test")
}()
// MarshalSortedData marshals sorted items from ib to sb.
//
// It also:
// - appends first item to firstItemDst and returns the result.
// - appends common prefix for all the items to commonPrefixDst and returns the result.
// - returns the number of items encoded including the first item.
// - returns the marshal type used for the encoding.
func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
if isInTest && !ib.isSorted() {
logger.Panicf("BUG: %d items must be sorted; items:\n%s", len(ib.items), ib.debugItemsString())
}
ib.updateCommonPrefixSorted()
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
}
func (ib *inmemoryBlock) debugItemsString() string {
var sb strings.Builder
var prevItem string
data := ib.data
for i, it := range ib.items {
item := it.String(data)
if item < prevItem {
fmt.Fprintf(&sb, "!!! the next item is smaller than the previous item !!!\n")
}
fmt.Fprintf(&sb, "%05d %X\n", i, item)
prevItem = item
}
return sb.String()
}
// Preconditions:
// - ib.items must be sorted.
// - updateCommonPrefix* must be called.
func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
if len(ib.items) <= 0 {
logger.Panicf("BUG: inmemoryBlock.marshalData must be called on non-empty blocks only")
}
if uint64(len(ib.items)) >= 1<<32 {
logger.Panicf("BUG: the number of items in the block must be smaller than %d; got %d items", uint64(1<<32), len(ib.items))
}
data := ib.data
firstItem := ib.items[0].Bytes(data)
firstItemDst = append(firstItemDst, firstItem...)
commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)
if len(data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 {
// Use plain encoding form small block, since it is cheaper.
ib.marshalDataPlain(sb)
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain
}
bbItems := bbPool.Get()
bItems := bbItems.B[:0]
bbLens := bbPool.Get()
bLens := bbLens.B[:0]
// Marshal items data.
xs := encoding.GetUint64s(len(ib.items) - 1)
defer encoding.PutUint64s(xs)
cpLen := len(ib.commonPrefix)
prevItem := firstItem[cpLen:]
prevPrefixLen := uint64(0)
for i, it := range ib.items[1:] {
it.Start += uint32(cpLen)
item := it.Bytes(data)
prefixLen := uint64(commonPrefixLen(prevItem, item))
bItems = append(bItems, item[prefixLen:]...)
xLen := prefixLen ^ prevPrefixLen
prevItem = item
prevPrefixLen = prefixLen
xs.A[i] = xLen
}
bLens = encoding.MarshalVarUint64s(bLens, xs.A)
sb.itemsData = encoding.CompressZSTDLevel(sb.itemsData[:0], bItems, compressLevel)
bbItems.B = bItems
bbPool.Put(bbItems)
// Marshal lens data.
prevItemLen := uint64(len(firstItem) - cpLen)
for i, it := range ib.items[1:] {
itemLen := uint64(int(it.End-it.Start) - cpLen)
xLen := itemLen ^ prevItemLen
prevItemLen = itemLen
xs.A[i] = xLen
}
bLens = encoding.MarshalVarUint64s(bLens, xs.A)
sb.lensData = encoding.CompressZSTDLevel(sb.lensData[:0], bLens, compressLevel)
bbLens.B = bLens
bbPool.Put(bbLens)
if float64(len(sb.itemsData)) > 0.9*float64(len(data)-len(ib.commonPrefix)*len(ib.items)) {
// Bad compression rate. It is cheaper to use plain encoding.
ib.marshalDataPlain(sb)
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain
}
// Good compression rate.
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
}
// UnmarshalData decodes itemsCount items from sb and firstItem and stores
// them to ib.
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
ib.Reset()
if itemsCount <= 0 {
logger.Panicf("BUG: cannot unmarshal zero items")
}
ib.commonPrefix = append(ib.commonPrefix[:0], commonPrefix...)
switch mt {
case marshalTypePlain:
if err := ib.unmarshalDataPlain(sb, firstItem, itemsCount); err != nil {
return fmt.Errorf("cannot unmarshal plain data: %w", err)
}
if !ib.isSorted() {
return fmt.Errorf("plain data block contains unsorted items; items:\n%s", ib.debugItemsString())
}
return nil
case marshalTypeZSTD:
// it is handled below.
default:
return fmt.Errorf("unknown marshalType=%d", mt)
}
// Unmarshal mt = marshalTypeZSTD
bb := bbPool.Get()
defer bbPool.Put(bb)
var err error
// Unmarshal lens data.
bb.B, err = encoding.DecompressZSTD(bb.B[:0], sb.lensData)
if err != nil {
return fmt.Errorf("cannot decompress lensData: %w", err)
}
lb := getLensBuffer(int(2 * itemsCount))
defer putLensBuffer(lb)
prefixLens := lb.lens[:itemsCount]
lens := lb.lens[itemsCount:]
is := encoding.GetUint64s(int(itemsCount) - 1)
defer encoding.PutUint64s(is)
// Unmarshal prefixLens
tail, err := encoding.UnmarshalVarUint64s(is.A, bb.B)
if err != nil {
return fmt.Errorf("cannot unmarshal prefixLens from lensData: %w", err)
}
prefixLens[0] = 0
for i, xLen := range is.A {
prefixLens[i+1] = xLen ^ prefixLens[i]
}
// Unmarshal lens
tail, err = encoding.UnmarshalVarUint64s(is.A, tail)
if err != nil {
return fmt.Errorf("cannot unmarshal lens from lensData: %w", err)
}
if len(tail) > 0 {
return fmt.Errorf("unexpected tail left unmarshaling %d lens; tail size=%d; contents=%X", itemsCount, len(tail), tail)
}
lens[0] = uint64(len(firstItem) - len(commonPrefix))
dataLen := len(commonPrefix) * int(itemsCount)
dataLen += int(lens[0])
for i, xLen := range is.A {
itemLen := xLen ^ lens[i]
lens[i+1] = itemLen
dataLen += int(itemLen)
}
// Unmarshal items data.
bb.B, err = encoding.DecompressZSTD(bb.B[:0], sb.itemsData)
if err != nil {
return fmt.Errorf("cannot decompress lensData: %w", err)
}
// Resize ib.data to dataLen instead of maxInmemoryBlockSize,
// since the data isn't going to be resized after unmarshaling.
// This may save memory for caching the unmarshaled block.
data := bytesutil.ResizeNoCopyNoOverallocate(ib.data, dataLen)
if n := int(itemsCount) - cap(ib.items); n > 0 {
ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...)
}
ib.items = ib.items[:itemsCount]
data = append(data[:0], firstItem...)
items := ib.items
items[0] = Item{
Start: 0,
End: uint32(len(data)),
}
prevItem := data[len(commonPrefix):]
b := bb.B
for i := 1; i < int(itemsCount); i++ {
itemLen := lens[i]
prefixLen := prefixLens[i]
if prefixLen > itemLen {
return fmt.Errorf("prefixLen=%d exceeds itemLen=%d", prefixLen, itemLen)
}
suffixLen := itemLen - prefixLen
if uint64(len(b)) < suffixLen {
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", suffixLen, len(b))
}
if prefixLen > uint64(len(prevItem)) {
return fmt.Errorf("prefixLen cannot exceed %d; got %d", len(prevItem), prefixLen)
}
dataStart := len(data)
data = append(data, commonPrefix...)
data = append(data, prevItem[:prefixLen]...)
data = append(data, b[:suffixLen]...)
items[i] = Item{
Start: uint32(dataStart),
End: uint32(len(data)),
}
b = b[suffixLen:]
prevItem = data[len(data)-int(itemLen):]
}
if len(b) > 0 {
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
}
if len(data) != dataLen {
return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen)
}
ib.data = data
if !ib.isSorted() {
return fmt.Errorf("decoded data block contains unsorted items; items:\n%s", ib.debugItemsString())
}
return nil
}
var bbPool bytesutil.ByteBufferPool
func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) {
data := ib.data
// Marshal items data.
// There is no need in marshaling the first item, since it is returned
// to the caller in marshalData.
cpLen := len(ib.commonPrefix)
b := sb.itemsData[:0]
for _, it := range ib.items[1:] {
it.Start += uint32(cpLen)
b = append(b, it.String(data)...)
}
sb.itemsData = b
// Marshal length data.
b = sb.lensData[:0]
for _, it := range ib.items[1:] {
b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen))
}
sb.lensData = b
}
func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte, itemsCount uint32) error {
commonPrefix := ib.commonPrefix
// Unmarshal lens data.
lb := getLensBuffer(int(itemsCount))
defer putLensBuffer(lb)
lb.lens[0] = uint64(len(firstItem) - len(commonPrefix))
b := sb.lensData
for i := 1; i < int(itemsCount); i++ {
if len(b) < 8 {
return fmt.Errorf("too short tail for decoding len from lensData; got %d bytes; want at least %d bytes", len(b), 8)
}
iLen := encoding.UnmarshalUint64(b)
b = b[8:]
lb.lens[i] = iLen
}
if len(b) > 0 {
return fmt.Errorf("unexpected tail left after lensData with len %d: %q", len(b), b)
}
// Unmarshal items data.
data := ib.data
items := ib.items
dataLen := len(firstItem) + len(sb.itemsData) + len(commonPrefix)*(int(itemsCount)-1)
data = bytesutil.ResizeNoCopyNoOverallocate(data, dataLen)
data = append(data[:0], firstItem...)
items = append(items[:0], Item{
Start: 0,
End: uint32(len(data)),
})
b = sb.itemsData
for i := 1; i < int(itemsCount); i++ {
itemLen := lb.lens[i]
if uint64(len(b)) < itemLen {
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b))
}
dataStart := len(data)
data = append(data, commonPrefix...)
data = append(data, b[:itemLen]...)
items = append(items, Item{
Start: uint32(dataStart),
End: uint32(len(data)),
})
b = b[itemLen:]
}
if len(b) > 0 {
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
}
if len(data) != dataLen {
return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen)
}
ib.data = data
ib.items = items
return nil
}
type lensBuffer struct {
lens []uint64
}
var lensBufferPool sync.Pool
func getLensBuffer(n int) *lensBuffer {
v := lensBufferPool.Get()
if v == nil {
v = &lensBuffer{}
}
lb := v.(*lensBuffer)
if nn := n - cap(lb.lens); nn > 0 {
lb.lens = append(lb.lens[:cap(lb.lens)], make([]uint64, nn)...)
}
lb.lens = lb.lens[:n]
return lb
}
func putLensBuffer(lb *lensBuffer) {
lensBufferPool.Put(lb)
}
func getInmemoryBlock() *inmemoryBlock {
select {
case ib := <-ibPoolCh:
return ib
default:
return &inmemoryBlock{}
}
}
func putInmemoryBlock(ib *inmemoryBlock) {
ib.Reset()
select {
case ibPoolCh <- ib:
default:
// drop ib in order to reduce memory usage on systems with big number of CPU cores
}
}
// Every inmemoryBlock struct occupies at least 64KB of memory, e.g. quite big amounts of memory.
// Use a chan instead of sync.Pool in order to reduce memory usage on systems
// with big number of CPU cores.
var ibPoolCh = make(chan *inmemoryBlock, 100*cgroup.AvailableCPUs())