mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
474 lines
13 KiB
Go
474 lines
13 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type byteSliceSorter [][]byte
|
|
|
|
func (s byteSliceSorter) Len() int { return len(s) }
|
|
func (s byteSliceSorter) Less(i, j int) bool {
|
|
return string(s[i]) < string(s[j])
|
|
}
|
|
func (s byteSliceSorter) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
type inmemoryBlock struct {
|
|
commonPrefix []byte
|
|
data []byte
|
|
items byteSliceSorter
|
|
}
|
|
|
|
func (ib *inmemoryBlock) Reset() {
|
|
ib.commonPrefix = ib.commonPrefix[:0]
|
|
ib.data = ib.data[:0]
|
|
ib.items = ib.items[:0]
|
|
}
|
|
|
|
func (ib *inmemoryBlock) updateCommonPrefix() {
|
|
ib.commonPrefix = ib.commonPrefix[:0]
|
|
if len(ib.items) == 0 {
|
|
return
|
|
}
|
|
cp := ib.items[0]
|
|
if len(cp) == 0 {
|
|
return
|
|
}
|
|
for _, item := range ib.items[1:] {
|
|
cpLen := commonPrefixLen(cp, item)
|
|
if cpLen == 0 {
|
|
return
|
|
}
|
|
cp = cp[:cpLen]
|
|
}
|
|
ib.commonPrefix = append(ib.commonPrefix[:0], cp...)
|
|
}
|
|
|
|
func commonPrefixLen(a, b []byte) int {
|
|
i := 0
|
|
if len(a) > len(b) {
|
|
for i < len(b) && a[i] == b[i] {
|
|
i++
|
|
}
|
|
} else {
|
|
for i < len(a) && a[i] == b[i] {
|
|
i++
|
|
}
|
|
}
|
|
return i
|
|
}
|
|
|
|
// Add adds x to the end of ib.
|
|
//
|
|
// false is returned if x isn't added to ib due to block size contraints.
|
|
func (ib *inmemoryBlock) Add(x []byte) bool {
|
|
if len(x)+len(ib.data) > maxInmemoryBlockSize {
|
|
return false
|
|
}
|
|
if cap(ib.data) < maxInmemoryBlockSize {
|
|
dataLen := len(ib.data)
|
|
ib.data = bytesutil.Resize(ib.data, maxInmemoryBlockSize)[:dataLen]
|
|
}
|
|
ib.data = append(ib.data, x...)
|
|
ib.items = append(ib.items, ib.data[len(ib.data)-len(x):])
|
|
return true
|
|
}
|
|
|
|
// maxInmemoryBlockSize is the maximum inmemoryBlock.data size.
|
|
//
|
|
// It must fit CPU cache size, i.e. 64KB for the current CPUs.
|
|
const maxInmemoryBlockSize = 64 * 1024
|
|
|
|
func (ib *inmemoryBlock) sort() {
|
|
// Use sort.Sort instead of sort.Slice in order to eliminate memory allocation.
|
|
sort.Sort(&ib.items)
|
|
bb := bbPool.Get()
|
|
b := bytesutil.Resize(bb.B, len(ib.data))
|
|
b = b[:0]
|
|
for i, item := range ib.items {
|
|
b = append(b, item...)
|
|
ib.items[i] = b[len(b)-len(item):]
|
|
}
|
|
bb.B, ib.data = ib.data, b
|
|
bbPool.Put(bb)
|
|
}
|
|
|
|
// storageBlock represents a block of data on the storage.
|
|
type storageBlock struct {
|
|
itemsData []byte
|
|
lensData []byte
|
|
}
|
|
|
|
func (sb *storageBlock) Reset() {
|
|
sb.itemsData = sb.itemsData[:0]
|
|
sb.lensData = sb.lensData[:0]
|
|
}
|
|
|
|
type marshalType uint8
|
|
|
|
const (
|
|
marshalTypePlain = marshalType(0)
|
|
marshalTypeZSTD = marshalType(1)
|
|
)
|
|
|
|
func checkMarshalType(mt marshalType) error {
|
|
if mt < 0 || mt > 1 {
|
|
return fmt.Errorf("marshalType must be in the range [0..1]; got %d", mt)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ib *inmemoryBlock) isSorted() bool {
|
|
// Use sort.IsSorted instead of sort.SliceIsSorted in order to eliminate memory allocation.
|
|
return sort.IsSorted(&ib.items)
|
|
}
|
|
|
|
// MarshalUnsortedData marshals unsorted items from ib to sb.
|
|
//
|
|
// It also:
|
|
// - appends first item to firstItemDst and returns the result.
|
|
// - appends common prefix for all the items to commonPrefixDst and returns the result.
|
|
// - returns the number of items encoded including the first item.
|
|
// - returns the marshal type used for the encoding.
|
|
func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
|
|
if !ib.isSorted() {
|
|
ib.sort()
|
|
}
|
|
ib.updateCommonPrefix()
|
|
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
|
|
}
|
|
|
|
var isInTest bool
|
|
|
|
// MarshalUnsortedData marshals sorted items from ib to sb.
|
|
//
|
|
// It also:
|
|
// - appends first item to firstItemDst and returns the result.
|
|
// - appends common prefix for all the items to commonPrefixDst and returns the result.
|
|
// - returns the number of items encoded including the first item.
|
|
// - returns the marshal type used for the encoding.
|
|
func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
|
|
if isInTest && !ib.isSorted() {
|
|
logger.Panicf("BUG: %d items must be sorted; items:\n%s", len(ib.items), ib.debugItemsString())
|
|
}
|
|
ib.updateCommonPrefix()
|
|
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
|
|
}
|
|
|
|
func (ib *inmemoryBlock) debugItemsString() string {
|
|
var sb strings.Builder
|
|
for i, item := range ib.items {
|
|
fmt.Fprintf(&sb, "%05d %X\n", i, item)
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
// Preconditions:
|
|
// - ib.items must be sorted.
|
|
// - updateCommonPrefix must be called.
|
|
func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
|
|
if len(ib.items) <= 0 {
|
|
logger.Panicf("BUG: inmemoryBlock.marshalData must be called on non-empty blocks only")
|
|
}
|
|
if uint64(len(ib.items)) >= 1<<32 {
|
|
logger.Panicf("BUG: the number of items in the block must be smaller than %d; got %d items", uint64(1<<32), len(ib.items))
|
|
}
|
|
|
|
firstItemDst = append(firstItemDst, ib.items[0]...)
|
|
commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)
|
|
|
|
if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 10 {
|
|
// Use plain encoding form small block, since it is cheaper.
|
|
ib.marshalDataPlain(sb)
|
|
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain
|
|
}
|
|
|
|
bbItems := bbPool.Get()
|
|
bItems := bbItems.B[:0]
|
|
|
|
bbLens := bbPool.Get()
|
|
bLens := bbLens.B[:0]
|
|
|
|
// Marshal items data.
|
|
xs := encoding.GetUint64s(len(ib.items) - 1)
|
|
defer encoding.PutUint64s(xs)
|
|
|
|
cpLen := len(ib.commonPrefix)
|
|
prevItem := ib.items[0][cpLen:]
|
|
prevPrefixLen := uint64(0)
|
|
for i, item := range ib.items[1:] {
|
|
item := item[cpLen:]
|
|
prefixLen := uint64(commonPrefixLen(prevItem, item))
|
|
bItems = append(bItems, item[prefixLen:]...)
|
|
xLen := prefixLen ^ prevPrefixLen
|
|
prevItem = item
|
|
prevPrefixLen = prefixLen
|
|
|
|
xs.A[i] = xLen
|
|
}
|
|
bLens = encoding.MarshalVarUint64s(bLens, xs.A)
|
|
sb.itemsData = encoding.CompressZSTDLevel(sb.itemsData[:0], bItems, compressLevel)
|
|
|
|
bbItems.B = bItems
|
|
bbPool.Put(bbItems)
|
|
|
|
// Marshal lens data.
|
|
prevItemLen := uint64(len(ib.items[0]) - cpLen)
|
|
for i, item := range ib.items[1:] {
|
|
itemLen := uint64(len(item) - cpLen)
|
|
xLen := itemLen ^ prevItemLen
|
|
prevItemLen = itemLen
|
|
|
|
xs.A[i] = xLen
|
|
}
|
|
bLens = encoding.MarshalVarUint64s(bLens, xs.A)
|
|
sb.lensData = encoding.CompressZSTDLevel(sb.lensData[:0], bLens, compressLevel)
|
|
|
|
bbLens.B = bLens
|
|
bbPool.Put(bbLens)
|
|
|
|
if float64(len(sb.itemsData)) > 0.9*float64(len(ib.data)-len(ib.commonPrefix)*len(ib.items)) {
|
|
// Bad compression rate. It is cheaper to use plain encoding.
|
|
ib.marshalDataPlain(sb)
|
|
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain
|
|
}
|
|
|
|
// Good compression rate.
|
|
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
|
|
}
|
|
|
|
// UnmarshalData decodes itemsCount items from sb and firstItem and stores
|
|
// them to ib.
|
|
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
|
|
ib.Reset()
|
|
|
|
if itemsCount <= 0 {
|
|
logger.Panicf("BUG: cannot unmarshal zero items")
|
|
}
|
|
|
|
ib.commonPrefix = append(ib.commonPrefix[:0], commonPrefix...)
|
|
|
|
switch mt {
|
|
case marshalTypePlain:
|
|
if err := ib.unmarshalDataPlain(sb, firstItem, itemsCount); err != nil {
|
|
return fmt.Errorf("cannot unmarshal plain data: %s", err)
|
|
}
|
|
if !ib.isSorted() {
|
|
return fmt.Errorf("plain data block contains unsorted items; items:\n%s", ib.debugItemsString())
|
|
}
|
|
return nil
|
|
case marshalTypeZSTD:
|
|
// it is handled below.
|
|
default:
|
|
return fmt.Errorf("unknown marshalType=%d", mt)
|
|
}
|
|
|
|
// Unmarshal mt = marshalTypeZSTD
|
|
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
|
|
var err error
|
|
|
|
// Unmarshal lens data.
|
|
bb.B, err = encoding.DecompressZSTD(bb.B[:0], sb.lensData)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot decompress lensData: %s", err)
|
|
}
|
|
|
|
lb := getLensBuffer(int(2 * itemsCount))
|
|
defer putLensBuffer(lb)
|
|
|
|
prefixLens := lb.lens[:itemsCount]
|
|
lens := lb.lens[itemsCount:]
|
|
|
|
is := encoding.GetUint64s(int(itemsCount) - 1)
|
|
defer encoding.PutUint64s(is)
|
|
|
|
// Unmarshal prefixLens
|
|
tail, err := encoding.UnmarshalVarUint64s(is.A, bb.B)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal prefixLens from lensData: %s", err)
|
|
}
|
|
prefixLens[0] = 0
|
|
for i, xLen := range is.A {
|
|
prefixLens[i+1] = xLen ^ prefixLens[i]
|
|
}
|
|
|
|
// Unmarshal lens
|
|
tail, err = encoding.UnmarshalVarUint64s(is.A, tail)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal lens from lensData: %s", err)
|
|
}
|
|
if len(tail) > 0 {
|
|
return fmt.Errorf("unexpected tail left unmarshaling %d lens; tail size=%d; contents=%X", itemsCount, len(tail), tail)
|
|
}
|
|
lens[0] = uint64(len(firstItem) - len(commonPrefix))
|
|
dataLen := uint64(len(commonPrefix) * int(itemsCount))
|
|
dataLen += lens[0]
|
|
for i, xLen := range is.A {
|
|
itemLen := xLen ^ lens[i]
|
|
lens[i+1] = itemLen
|
|
dataLen += itemLen
|
|
}
|
|
|
|
// Unmarshal items data.
|
|
bb.B, err = encoding.DecompressZSTD(bb.B[:0], sb.itemsData)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot decompress lensData: %s", err)
|
|
}
|
|
data := bytesutil.Resize(ib.data, maxInmemoryBlockSize)
|
|
if n := int(itemsCount) - cap(ib.items); n > 0 {
|
|
ib.items = append(ib.items[:cap(ib.items)], make([][]byte, n)...)
|
|
}
|
|
ib.items = ib.items[:itemsCount]
|
|
data = append(data[:0], firstItem...)
|
|
ib.items[0] = data
|
|
prevItem := data[len(commonPrefix):]
|
|
b := bb.B
|
|
for i := 1; i < int(itemsCount); i++ {
|
|
itemLen := lens[i]
|
|
prefixLen := prefixLens[i]
|
|
if prefixLen > itemLen {
|
|
return fmt.Errorf("prefixLen=%d exceeds itemLen=%d", prefixLen, itemLen)
|
|
}
|
|
suffixLen := itemLen - prefixLen
|
|
if uint64(len(b)) < suffixLen {
|
|
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", suffixLen, len(b))
|
|
}
|
|
data = append(data, commonPrefix...)
|
|
|
|
if prefixLen > uint64(len(prevItem)) {
|
|
return fmt.Errorf("prefixLen cannot exceed %d; got %d", len(prevItem), prefixLen)
|
|
}
|
|
data = append(data, prevItem[:prefixLen]...)
|
|
data = append(data, b[:suffixLen]...)
|
|
item := data[len(data)-int(itemLen)-len(commonPrefix):]
|
|
ib.items[i] = item
|
|
b = b[suffixLen:]
|
|
prevItem = item[len(commonPrefix):]
|
|
}
|
|
if len(b) > 0 {
|
|
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
|
|
}
|
|
if uint64(len(data)) != dataLen {
|
|
return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen)
|
|
}
|
|
if !ib.isSorted() {
|
|
return fmt.Errorf("decoded data block contains unsorted items; items:\n%s", ib.debugItemsString())
|
|
}
|
|
ib.data = data
|
|
return nil
|
|
}
|
|
|
|
var bbPool bytesutil.ByteBufferPool
|
|
|
|
func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) {
|
|
// Marshal items data.
|
|
// There is no need in marshaling the first item, since it is returned
|
|
// to the caller in marshalData.
|
|
cpLen := len(ib.commonPrefix)
|
|
b := sb.itemsData[:0]
|
|
for _, item := range ib.items[1:] {
|
|
b = append(b, item[cpLen:]...)
|
|
}
|
|
sb.itemsData = b
|
|
|
|
// Marshal length data.
|
|
b = sb.lensData[:0]
|
|
for _, item := range ib.items[1:] {
|
|
b = encoding.MarshalUint64(b, uint64(len(item)-cpLen))
|
|
}
|
|
sb.lensData = b
|
|
}
|
|
|
|
func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte, itemsCount uint32) error {
|
|
commonPrefix := ib.commonPrefix
|
|
|
|
// Unmarshal lens data.
|
|
lb := getLensBuffer(int(itemsCount))
|
|
defer putLensBuffer(lb)
|
|
|
|
lb.lens[0] = uint64(len(firstItem) - len(commonPrefix))
|
|
b := sb.lensData
|
|
for i := 1; i < int(itemsCount); i++ {
|
|
if len(b) < 8 {
|
|
return fmt.Errorf("too short tail for decoding len from lensData; got %d bytes; want at least %d bytes", len(b), 8)
|
|
}
|
|
iLen := encoding.UnmarshalUint64(b)
|
|
b = b[8:]
|
|
lb.lens[i] = iLen
|
|
}
|
|
if len(b) > 0 {
|
|
return fmt.Errorf("unexpected tail left after lensData with len %d: %q", len(b), b)
|
|
}
|
|
|
|
// Unmarshal items data.
|
|
ib.data = bytesutil.Resize(ib.data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount))
|
|
ib.data = append(ib.data[:0], firstItem...)
|
|
ib.items = append(ib.items[:0], ib.data)
|
|
|
|
b = sb.itemsData
|
|
for i := 1; i < int(itemsCount); i++ {
|
|
itemLen := lb.lens[i]
|
|
if uint64(len(b)) < itemLen {
|
|
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b))
|
|
}
|
|
ib.data = append(ib.data, commonPrefix...)
|
|
ib.data = append(ib.data, b[:itemLen]...)
|
|
item := ib.data[len(ib.data)-int(itemLen)-len(commonPrefix):]
|
|
ib.items = append(ib.items, item)
|
|
b = b[itemLen:]
|
|
}
|
|
if len(b) > 0 {
|
|
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type lensBuffer struct {
|
|
lens []uint64
|
|
}
|
|
|
|
var lensBufferPool sync.Pool
|
|
|
|
func getLensBuffer(n int) *lensBuffer {
|
|
v := lensBufferPool.Get()
|
|
if v == nil {
|
|
v = &lensBuffer{}
|
|
}
|
|
lb := v.(*lensBuffer)
|
|
if nn := n - cap(lb.lens); nn > 0 {
|
|
lb.lens = append(lb.lens[:cap(lb.lens)], make([]uint64, nn)...)
|
|
}
|
|
lb.lens = lb.lens[:n]
|
|
return lb
|
|
}
|
|
|
|
func putLensBuffer(lb *lensBuffer) {
|
|
lensBufferPool.Put(lb)
|
|
}
|
|
|
|
func getInmemoryBlock() *inmemoryBlock {
|
|
v := ibPool.Get()
|
|
if v == nil {
|
|
return &inmemoryBlock{}
|
|
}
|
|
return v.(*inmemoryBlock)
|
|
}
|
|
|
|
func putInmemoryBlock(ib *inmemoryBlock) {
|
|
ib.Reset()
|
|
ibPool.Put(ib)
|
|
}
|
|
|
|
var ibPool sync.Pool
|