VictoriaMetrics/lib/mergeset/block_stream_writer.go

233 lines
6.2 KiB
Go

package mergeset
import (
"fmt"
"path/filepath"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
type blockStreamWriter struct {
compressLevel int
path string
metaindexWriter filestream.WriteCloser
indexWriter filestream.WriteCloser
itemsWriter filestream.WriteCloser
lensWriter filestream.WriteCloser
sb storageBlock
bh blockHeader
mr metaindexRow
unpackedIndexBlockBuf []byte
packedIndexBlockBuf []byte
unpackedMetaindexBuf []byte
packedMetaindexBuf []byte
itemsBlockOffset uint64
lensBlockOffset uint64
indexBlockOffset uint64
// whether the first item for mr has been caught.
mrFirstItemCaught bool
}
func (bsw *blockStreamWriter) reset() {
bsw.compressLevel = 0
bsw.path = ""
bsw.metaindexWriter = nil
bsw.indexWriter = nil
bsw.itemsWriter = nil
bsw.lensWriter = nil
bsw.sb.Reset()
bsw.bh.Reset()
bsw.mr.Reset()
bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:0]
bsw.packedIndexBlockBuf = bsw.packedIndexBlockBuf[:0]
bsw.unpackedMetaindexBuf = bsw.unpackedMetaindexBuf[:0]
bsw.packedMetaindexBuf = bsw.packedMetaindexBuf[:0]
bsw.itemsBlockOffset = 0
bsw.lensBlockOffset = 0
bsw.indexBlockOffset = 0
bsw.mrFirstItemCaught = false
}
func (bsw *blockStreamWriter) InitFromInmemoryPart(ip *inmemoryPart, compressLevel int) {
bsw.reset()
bsw.compressLevel = compressLevel
bsw.metaindexWriter = &ip.metaindexData
bsw.indexWriter = &ip.indexData
bsw.itemsWriter = &ip.itemsData
bsw.lensWriter = &ip.lensData
}
// InitFromFilePart initializes bsw from a file-based part on the given path.
//
// The bsw doesn't pollute OS page cache if nocache is set.
func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error {
path = filepath.Clean(path)
// Create the directory
if err := fs.MkdirAllFailIfExist(path); err != nil {
return fmt.Errorf("cannot create directory %q: %s", path, err)
}
// Create part files in the directory.
// Always cache metaindex file in OS page cache, since it is immediately
// read after the merge.
metaindexPath := path + "/metaindex.bin"
metaindexFile, err := filestream.Create(metaindexPath, false)
if err != nil {
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create metaindex file: %s", err)
}
indexPath := path + "/index.bin"
indexFile, err := filestream.Create(indexPath, nocache)
if err != nil {
metaindexFile.MustClose()
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create index file: %s", err)
}
itemsPath := path + "/items.bin"
itemsFile, err := filestream.Create(itemsPath, nocache)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create items file: %s", err)
}
lensPath := path + "/lens.bin"
lensFile, err := filestream.Create(lensPath, nocache)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
itemsFile.MustClose()
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create lens file: %s", err)
}
bsw.reset()
bsw.compressLevel = compressLevel
bsw.path = path
bsw.metaindexWriter = metaindexFile
bsw.indexWriter = indexFile
bsw.itemsWriter = itemsFile
bsw.lensWriter = lensFile
return nil
}
// MustClose closes the bsw.
//
// It closes *Writer files passed to Init*.
func (bsw *blockStreamWriter) MustClose() {
// Flush the remaining data.
bsw.flushIndexData()
// Compress and write metaindex.
bsw.packedMetaindexBuf = encoding.CompressZSTDLevel(bsw.packedMetaindexBuf[:0], bsw.unpackedMetaindexBuf, bsw.compressLevel)
fs.MustWriteData(bsw.metaindexWriter, bsw.packedMetaindexBuf)
// Close all the writers.
bsw.metaindexWriter.MustClose()
bsw.indexWriter.MustClose()
bsw.itemsWriter.MustClose()
bsw.lensWriter.MustClose()
// Sync bsw.path contents to make sure it doesn't disappear
// after system crash or power loss.
if bsw.path != "" {
fs.SyncPath(bsw.path)
}
bsw.reset()
}
// WriteBlock writes ib to bsw.
//
// ib must be sorted.
func (bsw *blockStreamWriter) WriteBlock(ib *inmemoryBlock) {
bsw.bh.firstItem, bsw.bh.commonPrefix, bsw.bh.itemsCount, bsw.bh.marshalType = ib.MarshalSortedData(&bsw.sb, bsw.bh.firstItem[:0], bsw.bh.commonPrefix[:0], bsw.compressLevel)
if !bsw.mrFirstItemCaught {
bsw.mr.firstItem = append(bsw.mr.firstItem[:0], bsw.bh.firstItem...)
bsw.mrFirstItemCaught = true
}
// Write itemsData
fs.MustWriteData(bsw.itemsWriter, bsw.sb.itemsData)
bsw.bh.itemsBlockSize = uint32(len(bsw.sb.itemsData))
bsw.bh.itemsBlockOffset = bsw.itemsBlockOffset
bsw.itemsBlockOffset += uint64(bsw.bh.itemsBlockSize)
// Write lensData
fs.MustWriteData(bsw.lensWriter, bsw.sb.lensData)
bsw.bh.lensBlockSize = uint32(len(bsw.sb.lensData))
bsw.bh.lensBlockOffset = bsw.lensBlockOffset
bsw.lensBlockOffset += uint64(bsw.bh.lensBlockSize)
// Write blockHeader
bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf)
bsw.bh.Reset()
bsw.mr.blockHeadersCount++
if len(bsw.unpackedIndexBlockBuf) >= maxIndexBlockSize {
bsw.flushIndexData()
}
}
// The maximum size of index block with multiple blockHeaders.
const maxIndexBlockSize = 64 * 1024
func (bsw *blockStreamWriter) flushIndexData() {
if len(bsw.unpackedIndexBlockBuf) == 0 {
// Nothing to flush.
return
}
// Write indexBlock.
bsw.packedIndexBlockBuf = encoding.CompressZSTDLevel(bsw.packedIndexBlockBuf[:0], bsw.unpackedIndexBlockBuf, bsw.compressLevel)
fs.MustWriteData(bsw.indexWriter, bsw.packedIndexBlockBuf)
bsw.mr.indexBlockSize = uint32(len(bsw.packedIndexBlockBuf))
bsw.mr.indexBlockOffset = bsw.indexBlockOffset
bsw.indexBlockOffset += uint64(bsw.mr.indexBlockSize)
bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:0]
// Write metaindexRow.
bsw.unpackedMetaindexBuf = bsw.mr.Marshal(bsw.unpackedMetaindexBuf)
bsw.mr.Reset()
// Notify that the next call to WriteBlock must catch the first item.
bsw.mrFirstItemCaught = false
}
func getBlockStreamWriter() *blockStreamWriter {
v := bswPool.Get()
if v == nil {
return &blockStreamWriter{}
}
return v.(*blockStreamWriter)
}
func putBlockStreamWriter(bsw *blockStreamWriter) {
bsw.reset()
bswPool.Put(bsw)
}
var bswPool sync.Pool