package storage import ( "bytes" "fmt" "io" "path/filepath" "sync" "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // blockStreamWriter represents block stream writer. type blockStreamWriter struct { compressLevel int path string // Use io.Writer type for timestampsWriter and valuesWriter // in order to remove I2I conversion in WriteExternalBlock // when passing them to fs.MustWriteData timestampsWriter io.Writer valuesWriter io.Writer indexWriter filestream.WriteCloser metaindexWriter filestream.WriteCloser mr metaindexRow timestampsBlockOffset uint64 valuesBlockOffset uint64 indexBlockOffset uint64 indexData []byte compressedIndexData []byte metaindexData []byte compressedMetaindexData []byte // prevTimestamps* is used as an optimization for reducing disk space usage // when serially written blocks have identical timestamps. // This is usually the case when adjacent blocks contain metrics scraped from the same target, // since such metrics have identical timestamps. prevTimestampsData []byte prevTimestampsBlockOffset uint64 } func (bsw *blockStreamWriter) assertWriteClosers() { _ = bsw.timestampsWriter.(filestream.WriteCloser) _ = bsw.valuesWriter.(filestream.WriteCloser) } // Init initializes bsw with the given writers. func (bsw *blockStreamWriter) reset() { bsw.compressLevel = 0 bsw.path = "" bsw.timestampsWriter = nil bsw.valuesWriter = nil bsw.indexWriter = nil bsw.metaindexWriter = nil bsw.mr.Reset() bsw.timestampsBlockOffset = 0 bsw.valuesBlockOffset = 0 bsw.indexBlockOffset = 0 bsw.indexData = bsw.indexData[:0] bsw.compressedIndexData = bsw.compressedIndexData[:0] bsw.metaindexData = bsw.metaindexData[:0] bsw.compressedMetaindexData = bsw.compressedMetaindexData[:0] bsw.prevTimestampsData = bsw.prevTimestampsData[:0] bsw.prevTimestampsBlockOffset = 0 } // InitFromInmemoryPart initialzes bsw from inmemory part. func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) { bsw.reset() // Use the minimum compression level for in-memory blocks, // since they are going to be re-compressed during the merge into file-based blocks. bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4 bsw.timestampsWriter = &mp.timestampsData bsw.valuesWriter = &mp.valuesData bsw.indexWriter = &mp.indexData bsw.metaindexWriter = &mp.metaindexData bsw.assertWriteClosers() } // InitFromFilePart initializes bsw from a file-based part on the given path. // // The bsw doesn't pollute OS page cache if nocache is set. func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error { path = filepath.Clean(path) // Create the directory if err := fs.MkdirAllFailIfExist(path); err != nil { return fmt.Errorf("cannot create directory %q: %w", path, err) } // Create part files in the directory. timestampsPath := path + "/timestamps.bin" timestampsFile, err := filestream.Create(timestampsPath, nocache) if err != nil { fs.MustRemoveAll(path) return fmt.Errorf("cannot create timestamps file: %w", err) } valuesPath := path + "/values.bin" valuesFile, err := filestream.Create(valuesPath, nocache) if err != nil { timestampsFile.MustClose() fs.MustRemoveAll(path) return fmt.Errorf("cannot create values file: %w", err) } indexPath := path + "/index.bin" indexFile, err := filestream.Create(indexPath, nocache) if err != nil { timestampsFile.MustClose() valuesFile.MustClose() fs.MustRemoveAll(path) return fmt.Errorf("cannot create index file: %w", err) } // Always cache metaindex file in OS page cache, since it is immediately // read after the merge. metaindexPath := path + "/metaindex.bin" metaindexFile, err := filestream.Create(metaindexPath, false) if err != nil { timestampsFile.MustClose() valuesFile.MustClose() indexFile.MustClose() fs.MustRemoveAll(path) return fmt.Errorf("cannot create metaindex file: %w", err) } bsw.reset() bsw.compressLevel = compressLevel bsw.path = path bsw.timestampsWriter = timestampsFile bsw.valuesWriter = valuesFile bsw.indexWriter = indexFile bsw.metaindexWriter = metaindexFile bsw.assertWriteClosers() return nil } // MustClose closes the bsw. // // It closes *Writer files passed to Init*. func (bsw *blockStreamWriter) MustClose() { // Flush remaining data. bsw.flushIndexData() // Write metaindex data. bsw.compressedMetaindexData = encoding.CompressZSTDLevel(bsw.compressedMetaindexData[:0], bsw.metaindexData, bsw.compressLevel) fs.MustWriteData(bsw.metaindexWriter, bsw.compressedMetaindexData) // Close writers. bsw.timestampsWriter.(filestream.WriteCloser).MustClose() bsw.valuesWriter.(filestream.WriteCloser).MustClose() bsw.indexWriter.MustClose() bsw.metaindexWriter.MustClose() // Sync bsw.path contents to make sure it doesn't disappear // after system crash or power loss. if bsw.path != "" { fs.MustSyncPath(bsw.path) } bsw.reset() } // WriteExternalBlock writes b to bsw and updates ph and rowsMerged. func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) { atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) if needDedup { dedupInterval := GetDedupInterval() b.deduplicateSamplesDuringMerge(dedupInterval) } headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData) if usePrevTimestamps { // The current timestamps block equals to the previous timestamps block. // Update headerData so it points to the previous timestamps block. This saves disk space. headerData, timestampsData, valuesData = b.MarshalData(bsw.prevTimestampsBlockOffset, bsw.valuesBlockOffset) atomic.AddUint64(×tampsBlocksMerged, 1) atomic.AddUint64(×tampsBytesSaved, uint64(len(timestampsData))) } bsw.indexData = append(bsw.indexData, headerData...) bsw.mr.RegisterBlockHeader(&b.bh) if len(bsw.indexData) >= maxBlockSize { bsw.flushIndexData() } if !usePrevTimestamps { bsw.prevTimestampsData = append(bsw.prevTimestampsData[:0], timestampsData...) bsw.prevTimestampsBlockOffset = bsw.timestampsBlockOffset fs.MustWriteData(bsw.timestampsWriter, timestampsData) bsw.timestampsBlockOffset += uint64(len(timestampsData)) } fs.MustWriteData(bsw.valuesWriter, valuesData) bsw.valuesBlockOffset += uint64(len(valuesData)) updatePartHeader(b, ph) } var ( timestampsBlocksMerged uint64 timestampsBytesSaved uint64 ) func updatePartHeader(b *Block, ph *partHeader) { ph.BlocksCount++ ph.RowsCount += uint64(b.bh.RowsCount) if b.bh.MinTimestamp < ph.MinTimestamp { ph.MinTimestamp = b.bh.MinTimestamp } if b.bh.MaxTimestamp > ph.MaxTimestamp { ph.MaxTimestamp = b.bh.MaxTimestamp } } func (bsw *blockStreamWriter) flushIndexData() { if len(bsw.indexData) == 0 { return } // Write compressed index block to index data. bsw.compressedIndexData = encoding.CompressZSTDLevel(bsw.compressedIndexData[:0], bsw.indexData, bsw.compressLevel) indexBlockSize := len(bsw.compressedIndexData) if uint64(indexBlockSize) >= 1<<32 { logger.Panicf("BUG: indexBlock size must fit uint32; got %d", indexBlockSize) } fs.MustWriteData(bsw.indexWriter, bsw.compressedIndexData) // Write metaindex row to metaindex data. bsw.mr.IndexBlockOffset = bsw.indexBlockOffset bsw.mr.IndexBlockSize = uint32(indexBlockSize) bsw.metaindexData = bsw.mr.Marshal(bsw.metaindexData) // Update offsets. bsw.indexBlockOffset += uint64(indexBlockSize) bsw.indexData = bsw.indexData[:0] bsw.mr.Reset() } func getBlockStreamWriter() *blockStreamWriter { v := bswPool.Get() if v == nil { return &blockStreamWriter{} } return v.(*blockStreamWriter) } func putBlockStreamWriter(bsw *blockStreamWriter) { bsw.reset() bswPool.Put(bsw) } var bswPool sync.Pool