mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
aac3dccfd1
Callers of these functions log the returned error and then exit. The returned error already contains the path to directory, which was failed to be created. So let's just log the error together with the call stack inside these functions. This leaves the debuggability of the returned error at the same level while allows simplifying the code at callers' side. While at it, properly use MustMkdirFailIfExist instead of MustMkdirIfNotExist inside inmemoryPart.MustStoreToDisk(). It is expected that the inmemoryPart.MustStoreToDick() must fail if there is already a directory under the given path.
114 lines
3.6 KiB
Go
114 lines
3.6 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"path/filepath"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type inmemoryPart struct {
|
|
ph partHeader
|
|
bh blockHeader
|
|
mr metaindexRow
|
|
|
|
metaindexData bytesutil.ByteBuffer
|
|
indexData bytesutil.ByteBuffer
|
|
itemsData bytesutil.ByteBuffer
|
|
lensData bytesutil.ByteBuffer
|
|
}
|
|
|
|
func (mp *inmemoryPart) Reset() {
|
|
mp.ph.Reset()
|
|
mp.bh.Reset()
|
|
mp.mr.Reset()
|
|
|
|
mp.metaindexData.Reset()
|
|
mp.indexData.Reset()
|
|
mp.itemsData.Reset()
|
|
mp.lensData.Reset()
|
|
}
|
|
|
|
// MustStoreToDisk stores mp to the given path on disk.
|
|
func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
|
fs.MustMkdirFailIfExist(path)
|
|
|
|
metaindexPath := filepath.Join(path, metaindexFilename)
|
|
fs.MustWriteSync(metaindexPath, mp.metaindexData.B)
|
|
|
|
indexPath := filepath.Join(path, indexFilename)
|
|
fs.MustWriteSync(indexPath, mp.indexData.B)
|
|
|
|
itemsPath := filepath.Join(path, itemsFilename)
|
|
fs.MustWriteSync(itemsPath, mp.itemsData.B)
|
|
|
|
lensPath := filepath.Join(path, lensFilename)
|
|
fs.MustWriteSync(lensPath, mp.lensData.B)
|
|
|
|
mp.ph.MustWriteMetadata(path)
|
|
|
|
fs.MustSyncPath(path)
|
|
// Do not sync parent directory - it must be synced by the caller.
|
|
}
|
|
|
|
// Init initializes mp from ib.
|
|
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
|
|
mp.Reset()
|
|
|
|
// Re-use mp.itemsData and mp.lensData in sb.
|
|
// This eliminates copying itemsData and lensData from sb to mp later.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
|
|
sb := &storageBlock{}
|
|
sb.itemsData = mp.itemsData.B[:0]
|
|
sb.lensData = mp.lensData.B[:0]
|
|
|
|
// Use the minimum possible compressLevel for compressing inmemoryPart,
|
|
// since it will be merged into file part soon.
|
|
// See https://github.com/facebook/zstd/releases/tag/v1.3.4 for details about negative compression level
|
|
compressLevel := -5
|
|
mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
|
|
|
|
mp.ph.itemsCount = uint64(len(ib.items))
|
|
mp.ph.blocksCount = 1
|
|
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
|
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
|
|
|
mp.itemsData.B = sb.itemsData
|
|
mp.bh.itemsBlockOffset = 0
|
|
mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
|
|
|
|
mp.lensData.B = sb.lensData
|
|
mp.bh.lensBlockOffset = 0
|
|
mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
|
|
|
|
bb := inmemoryPartBytePool.Get()
|
|
bb.B = mp.bh.Marshal(bb.B[:0])
|
|
mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, compressLevel)
|
|
|
|
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
|
|
mp.mr.blockHeadersCount = 1
|
|
mp.mr.indexBlockOffset = 0
|
|
mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
|
|
bb.B = mp.mr.Marshal(bb.B[:0])
|
|
mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, compressLevel)
|
|
inmemoryPartBytePool.Put(bb)
|
|
}
|
|
|
|
var inmemoryPartBytePool bytesutil.ByteBufferPool
|
|
|
|
// It is safe calling NewPart multiple times.
|
|
// It is unsafe re-using mp while the returned part is in use.
|
|
func (mp *inmemoryPart) NewPart() *part {
|
|
size := mp.size()
|
|
p, err := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
|
|
}
|
|
return p
|
|
}
|
|
|
|
func (mp *inmemoryPart) size() uint64 {
|
|
return uint64(cap(mp.metaindexData.B) + cap(mp.indexData.B) + cap(mp.itemsData.B) + cap(mp.lensData.B))
|
|
}
|