diff --git a/lib/filestream/filestream.go b/lib/filestream/filestream.go index 4be2fa82e..f156fb0d8 100644 --- a/lib/filestream/filestream.go +++ b/lib/filestream/filestream.go @@ -202,15 +202,15 @@ func OpenWriterAt(path string, offset int64, nocache bool) (*Writer, error) { return newWriter(f, nocache), nil } -// Create creates the file for the given path in nocache mode. +// MustCreate creates the file for the given path in nocache mode. // // If nocache is set, the writer doesn't pollute OS page cache. -func Create(path string, nocache bool) (*Writer, error) { +func MustCreate(path string, nocache bool) *Writer { f, err := os.Create(path) if err != nil { - return nil, fmt.Errorf("cannot create file %q: %w", path, err) + logger.Panicf("FATAL: cannot create file %q: %s", path, err) } - return newWriter(f, nocache), nil + return newWriter(f, nocache) } func newWriter(f *os.File, nocache bool) *Writer { diff --git a/lib/filestream/filestream_test.go b/lib/filestream/filestream_test.go index cf3223ac1..760070af7 100644 --- a/lib/filestream/filestream_test.go +++ b/lib/filestream/filestream_test.go @@ -29,10 +29,7 @@ func TestWriteRead(t *testing.T) { func testWriteRead(t *testing.T, nocache bool, testStr string) { t.Helper() - w, err := Create("./nocache_test.txt", nocache) - if err != nil { - t.Fatalf("cannot create file: %s", err) - } + w := MustCreate("./nocache_test.txt", nocache) defer func() { _ = os.Remove("./nocache_test.txt") }() diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 85c0d75c6..f8d585b7c 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -34,10 +34,7 @@ func MustSyncPath(path string) { // Use MustWriteAtomic if the file at the path must be either written in full // or not written at all on app crash in the middle of the write. func MustWriteSync(path string, data []byte) { - f, err := filestream.Create(path, false) - if err != nil { - logger.Panicf("FATAL: cannot create file: %s", err) - } + f := filestream.MustCreate(path, false) if _, err := f.Write(data); err != nil { f.MustClose() // Do not call MustRemoveAll(path), so the user could inspect diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index 9ddb51951..df26e064a 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -1,7 +1,6 @@ package mergeset import ( - "fmt" "path/filepath" "sync" @@ -71,10 +70,10 @@ func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLev bsw.lensWriter = &mp.lensData } -// InitFromFilePart initializes bsw from a file-based part on the given path. +// MustInitFromFilePart initializes bsw from a file-based part on the given path. // // The bsw doesn't pollute OS page cache if nocache is set. -func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error { +func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) { path = filepath.Clean(path) // Create the directory @@ -85,38 +84,16 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre // Always cache metaindex file in OS page cache, since it is immediately // read after the merge. metaindexPath := filepath.Join(path, metaindexFilename) - metaindexFile, err := filestream.Create(metaindexPath, false) - if err != nil { - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create metaindex file: %w", err) - } + metaindexFile := filestream.MustCreate(metaindexPath, false) indexPath := filepath.Join(path, indexFilename) - indexFile, err := filestream.Create(indexPath, nocache) - if err != nil { - metaindexFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create index file: %w", err) - } + indexFile := filestream.MustCreate(indexPath, nocache) itemsPath := filepath.Join(path, itemsFilename) - itemsFile, err := filestream.Create(itemsPath, nocache) - if err != nil { - metaindexFile.MustClose() - indexFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create items file: %w", err) - } + itemsFile := filestream.MustCreate(itemsPath, nocache) lensPath := filepath.Join(path, lensFilename) - lensFile, err := filestream.Create(lensPath, nocache) - if err != nil { - metaindexFile.MustClose() - indexFile.MustClose() - itemsFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create lens file: %w", err) - } + lensFile := filestream.MustCreate(lensPath, nocache) bsw.reset() bsw.compressLevel = compressLevel @@ -125,8 +102,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.indexWriter = indexFile bsw.itemsWriter = itemsFile bsw.lensWriter = lensFile - - return nil } // MustClose closes the bsw. diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a184066e2..10f2e97b0 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1121,9 +1121,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bsw.InitFromInmemoryPart(mpNew, compressLevel) } else { nocache := srcItemsCount > maxItemsPerCachedPart() - if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { - logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err) - } + bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel) } // Merge source parts to destination part. diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 7a548f94e..ffedb9435 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -93,10 +93,7 @@ func (q *queue) mustResetFiles() { q.readerLocalOffset = 0 q.writerPath = q.chunkFilePath(q.writerOffset) - w, err := filestream.Create(q.writerPath, false) - if err != nil { - logger.Panicf("FATAL: cannot create chunk file %q: %s", q.writerPath, err) - } + w := filestream.MustCreate(q.writerPath, false) q.writer = w q.readerPath = q.writerPath @@ -445,10 +442,7 @@ func (q *queue) nextChunkFileForWrite() error { q.writerFlushedOffset = q.writerOffset q.writerLocalOffset = 0 q.writerPath = q.chunkFilePath(q.writerOffset) - w, err := filestream.Create(q.writerPath, false) - if err != nil { - return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err) - } + w := filestream.MustCreate(q.writerPath, false) q.writer = w if err := q.flushMetainfo(); err != nil { return fmt.Errorf("cannot flush metainfo: %w", err) diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index a43725f3c..6c319d7f8 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -2,7 +2,6 @@ package storage import ( "bytes" - "fmt" "path/filepath" "sync" "sync/atomic" @@ -78,10 +77,10 @@ func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLev bsw.metaindexWriter = &mp.metaindexData } -// InitFromFilePart initializes bsw from a file-based part on the given path. +// MustInitFromFilePart initializes bsw from a file-based part on the given path. // // The bsw doesn't pollute OS page cache if nocache is set. -func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error { +func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) { path = filepath.Clean(path) // Create the directory @@ -89,40 +88,18 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre // Create part files in the directory. timestampsPath := filepath.Join(path, timestampsFilename) - timestampsFile, err := filestream.Create(timestampsPath, nocache) - if err != nil { - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create timestamps file: %w", err) - } + timestampsFile := filestream.MustCreate(timestampsPath, nocache) valuesPath := filepath.Join(path, valuesFilename) - valuesFile, err := filestream.Create(valuesPath, nocache) - if err != nil { - timestampsFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create values file: %w", err) - } + valuesFile := filestream.MustCreate(valuesPath, nocache) indexPath := filepath.Join(path, indexFilename) - indexFile, err := filestream.Create(indexPath, nocache) - if err != nil { - timestampsFile.MustClose() - valuesFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create index file: %w", err) - } + indexFile := filestream.MustCreate(indexPath, nocache) // Always cache metaindex file in OS page cache, since it is immediately // read after the merge. metaindexPath := filepath.Join(path, metaindexFilename) - metaindexFile, err := filestream.Create(metaindexPath, false) - if err != nil { - timestampsFile.MustClose() - valuesFile.MustClose() - indexFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create metaindex file: %w", err) - } + metaindexFile := filestream.MustCreate(metaindexPath, false) bsw.reset() bsw.compressLevel = compressLevel @@ -131,8 +108,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.valuesWriter = valuesFile bsw.indexWriter = indexFile bsw.metaindexWriter = metaindexFile - - return nil } // MustClose closes the bsw. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 13cbe4690..7f54859a1 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1300,9 +1300,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi logger.Panicf("BUG: dstPartPath must be non-empty") } nocache := dstPartType == partBig - if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { - logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err) - } + bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel) } // Merge source parts to destination part.