package mergeset import ( "path/filepath" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" ) type blockStreamWriter struct { compressLevel int metaindexWriter filestream.WriteCloser indexWriter filestream.WriteCloser itemsWriter filestream.WriteCloser lensWriter filestream.WriteCloser sb storageBlock bh blockHeader mr metaindexRow unpackedIndexBlockBuf []byte packedIndexBlockBuf []byte unpackedMetaindexBuf []byte packedMetaindexBuf []byte itemsBlockOffset uint64 lensBlockOffset uint64 indexBlockOffset uint64 // whether the first item for mr has been caught. mrFirstItemCaught bool } func (bsw *blockStreamWriter) reset() { bsw.compressLevel = 0 bsw.metaindexWriter = nil bsw.indexWriter = nil bsw.itemsWriter = nil bsw.lensWriter = nil bsw.sb.Reset() bsw.bh.Reset() bsw.mr.Reset() bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:0] bsw.packedIndexBlockBuf = bsw.packedIndexBlockBuf[:0] bsw.unpackedMetaindexBuf = bsw.unpackedMetaindexBuf[:0] bsw.packedMetaindexBuf = bsw.packedMetaindexBuf[:0] bsw.itemsBlockOffset = 0 bsw.lensBlockOffset = 0 bsw.indexBlockOffset = 0 bsw.mrFirstItemCaught = false } func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compressLevel int) { bsw.reset() bsw.compressLevel = compressLevel bsw.metaindexWriter = &mp.metaindexData bsw.indexWriter = &mp.indexData bsw.itemsWriter = &mp.itemsData bsw.lensWriter = &mp.lensData } // MustInitFromFilePart initializes bsw from a file-based part on the given path. // // The bsw doesn't pollute OS page cache if nocache is set. func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) { path = filepath.Clean(path) // Create the directory fs.MustMkdirFailIfExist(path) // Create part files in the directory. // Always cache metaindex file in OS page cache, since it is immediately // read after the merge. metaindexPath := filepath.Join(path, metaindexFilename) metaindexFile := filestream.MustCreate(metaindexPath, false) indexPath := filepath.Join(path, indexFilename) indexFile := filestream.MustCreate(indexPath, nocache) itemsPath := filepath.Join(path, itemsFilename) itemsFile := filestream.MustCreate(itemsPath, nocache) lensPath := filepath.Join(path, lensFilename) lensFile := filestream.MustCreate(lensPath, nocache) bsw.reset() bsw.compressLevel = compressLevel bsw.metaindexWriter = metaindexFile bsw.indexWriter = indexFile bsw.itemsWriter = itemsFile bsw.lensWriter = lensFile } // MustClose closes the bsw. // // It closes *Writer files passed to Init*. func (bsw *blockStreamWriter) MustClose() { // Flush the remaining data. bsw.flushIndexData() // Compress and write metaindex. bsw.packedMetaindexBuf = encoding.CompressZSTDLevel(bsw.packedMetaindexBuf[:0], bsw.unpackedMetaindexBuf, bsw.compressLevel) fs.MustWriteData(bsw.metaindexWriter, bsw.packedMetaindexBuf) // Close all the writers. bsw.metaindexWriter.MustClose() bsw.indexWriter.MustClose() bsw.itemsWriter.MustClose() bsw.lensWriter.MustClose() bsw.reset() } // WriteBlock writes ib to bsw. // // ib must be sorted. func (bsw *blockStreamWriter) WriteBlock(ib *inmemoryBlock) { bsw.bh.firstItem, bsw.bh.commonPrefix, bsw.bh.itemsCount, bsw.bh.marshalType = ib.MarshalSortedData(&bsw.sb, bsw.bh.firstItem[:0], bsw.bh.commonPrefix[:0], bsw.compressLevel) // Write itemsData fs.MustWriteData(bsw.itemsWriter, bsw.sb.itemsData) bsw.bh.itemsBlockSize = uint32(len(bsw.sb.itemsData)) bsw.bh.itemsBlockOffset = bsw.itemsBlockOffset bsw.itemsBlockOffset += uint64(bsw.bh.itemsBlockSize) // Write lensData fs.MustWriteData(bsw.lensWriter, bsw.sb.lensData) bsw.bh.lensBlockSize = uint32(len(bsw.sb.lensData)) bsw.bh.lensBlockOffset = bsw.lensBlockOffset bsw.lensBlockOffset += uint64(bsw.bh.lensBlockSize) // Write blockHeader unpackedIndexBlockBufLen := len(bsw.unpackedIndexBlockBuf) bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf) if len(bsw.unpackedIndexBlockBuf) > maxIndexBlockSize { bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:unpackedIndexBlockBufLen] bsw.flushIndexData() bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf) } if !bsw.mrFirstItemCaught { bsw.mr.firstItem = append(bsw.mr.firstItem[:0], bsw.bh.firstItem...) bsw.mrFirstItemCaught = true } bsw.bh.Reset() bsw.mr.blockHeadersCount++ } // The maximum size of index block with multiple blockHeaders. const maxIndexBlockSize = 64 * 1024 func (bsw *blockStreamWriter) flushIndexData() { if len(bsw.unpackedIndexBlockBuf) == 0 { // Nothing to flush. return } // Write indexBlock. bsw.packedIndexBlockBuf = encoding.CompressZSTDLevel(bsw.packedIndexBlockBuf[:0], bsw.unpackedIndexBlockBuf, bsw.compressLevel) fs.MustWriteData(bsw.indexWriter, bsw.packedIndexBlockBuf) bsw.mr.indexBlockSize = uint32(len(bsw.packedIndexBlockBuf)) bsw.mr.indexBlockOffset = bsw.indexBlockOffset bsw.indexBlockOffset += uint64(bsw.mr.indexBlockSize) bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:0] // Write metaindexRow. bsw.unpackedMetaindexBuf = bsw.mr.Marshal(bsw.unpackedMetaindexBuf) bsw.mr.Reset() // Notify that the next call to WriteBlock must catch the first item. bsw.mrFirstItemCaught = false } func getBlockStreamWriter() *blockStreamWriter { v := bswPool.Get() if v == nil { return &blockStreamWriter{} } return v.(*blockStreamWriter) } func putBlockStreamWriter(bsw *blockStreamWriter) { bsw.reset() bswPool.Put(bsw) } var bswPool sync.Pool