From 9183a439c7d7d1abbcf47c515704b2b03867eaaa Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 14 Apr 2023 15:12:45 -0700 Subject: [PATCH] lib/filestream: change Create() to MustCreate() Callers of this function log the returned error and exit. It is better logging the error together with the path to the filename and call stack directly inside the function. This simplifies the code at callers' side without reducing the level of debuggability --- lib/filestream/filestream.go | 8 +++--- lib/filestream/filestream_test.go | 5 +--- lib/fs/fs.go | 5 +--- lib/mergeset/block_stream_writer.go | 37 +++++--------------------- lib/mergeset/table.go | 4 +-- lib/persistentqueue/persistentqueue.go | 10 ++----- lib/storage/block_stream_writer.go | 37 +++++--------------------- lib/storage/partition.go | 4 +-- 8 files changed, 22 insertions(+), 88 deletions(-) diff --git a/lib/filestream/filestream.go b/lib/filestream/filestream.go index 4be2fa82e..f156fb0d8 100644 --- a/lib/filestream/filestream.go +++ b/lib/filestream/filestream.go @@ -202,15 +202,15 @@ func OpenWriterAt(path string, offset int64, nocache bool) (*Writer, error) { return newWriter(f, nocache), nil } -// Create creates the file for the given path in nocache mode. +// MustCreate creates the file for the given path in nocache mode. // // If nocache is set, the writer doesn't pollute OS page cache. -func Create(path string, nocache bool) (*Writer, error) { +func MustCreate(path string, nocache bool) *Writer { f, err := os.Create(path) if err != nil { - return nil, fmt.Errorf("cannot create file %q: %w", path, err) + logger.Panicf("FATAL: cannot create file %q: %s", path, err) } - return newWriter(f, nocache), nil + return newWriter(f, nocache) } func newWriter(f *os.File, nocache bool) *Writer { diff --git a/lib/filestream/filestream_test.go b/lib/filestream/filestream_test.go index cf3223ac1..760070af7 100644 --- a/lib/filestream/filestream_test.go +++ b/lib/filestream/filestream_test.go @@ -29,10 +29,7 @@ func TestWriteRead(t *testing.T) { func testWriteRead(t *testing.T, nocache bool, testStr string) { t.Helper() - w, err := Create("./nocache_test.txt", nocache) - if err != nil { - t.Fatalf("cannot create file: %s", err) - } + w := MustCreate("./nocache_test.txt", nocache) defer func() { _ = os.Remove("./nocache_test.txt") }() diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 85c0d75c6..f8d585b7c 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -34,10 +34,7 @@ func MustSyncPath(path string) { // Use MustWriteAtomic if the file at the path must be either written in full // or not written at all on app crash in the middle of the write. func MustWriteSync(path string, data []byte) { - f, err := filestream.Create(path, false) - if err != nil { - logger.Panicf("FATAL: cannot create file: %s", err) - } + f := filestream.MustCreate(path, false) if _, err := f.Write(data); err != nil { f.MustClose() // Do not call MustRemoveAll(path), so the user could inspect diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index 9ddb51951..df26e064a 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -1,7 +1,6 @@ package mergeset import ( - "fmt" "path/filepath" "sync" @@ -71,10 +70,10 @@ func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLev bsw.lensWriter = &mp.lensData } -// InitFromFilePart initializes bsw from a file-based part on the given path. +// MustInitFromFilePart initializes bsw from a file-based part on the given path. // // The bsw doesn't pollute OS page cache if nocache is set. -func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error { +func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) { path = filepath.Clean(path) // Create the directory @@ -85,38 +84,16 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre // Always cache metaindex file in OS page cache, since it is immediately // read after the merge. metaindexPath := filepath.Join(path, metaindexFilename) - metaindexFile, err := filestream.Create(metaindexPath, false) - if err != nil { - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create metaindex file: %w", err) - } + metaindexFile := filestream.MustCreate(metaindexPath, false) indexPath := filepath.Join(path, indexFilename) - indexFile, err := filestream.Create(indexPath, nocache) - if err != nil { - metaindexFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create index file: %w", err) - } + indexFile := filestream.MustCreate(indexPath, nocache) itemsPath := filepath.Join(path, itemsFilename) - itemsFile, err := filestream.Create(itemsPath, nocache) - if err != nil { - metaindexFile.MustClose() - indexFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create items file: %w", err) - } + itemsFile := filestream.MustCreate(itemsPath, nocache) lensPath := filepath.Join(path, lensFilename) - lensFile, err := filestream.Create(lensPath, nocache) - if err != nil { - metaindexFile.MustClose() - indexFile.MustClose() - itemsFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create lens file: %w", err) - } + lensFile := filestream.MustCreate(lensPath, nocache) bsw.reset() bsw.compressLevel = compressLevel @@ -125,8 +102,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.indexWriter = indexFile bsw.itemsWriter = itemsFile bsw.lensWriter = lensFile - - return nil } // MustClose closes the bsw. diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a184066e2..10f2e97b0 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1121,9 +1121,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bsw.InitFromInmemoryPart(mpNew, compressLevel) } else { nocache := srcItemsCount > maxItemsPerCachedPart() - if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { - logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err) - } + bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel) } // Merge source parts to destination part. diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 7a548f94e..ffedb9435 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -93,10 +93,7 @@ func (q *queue) mustResetFiles() { q.readerLocalOffset = 0 q.writerPath = q.chunkFilePath(q.writerOffset) - w, err := filestream.Create(q.writerPath, false) - if err != nil { - logger.Panicf("FATAL: cannot create chunk file %q: %s", q.writerPath, err) - } + w := filestream.MustCreate(q.writerPath, false) q.writer = w q.readerPath = q.writerPath @@ -445,10 +442,7 @@ func (q *queue) nextChunkFileForWrite() error { q.writerFlushedOffset = q.writerOffset q.writerLocalOffset = 0 q.writerPath = q.chunkFilePath(q.writerOffset) - w, err := filestream.Create(q.writerPath, false) - if err != nil { - return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err) - } + w := filestream.MustCreate(q.writerPath, false) q.writer = w if err := q.flushMetainfo(); err != nil { return fmt.Errorf("cannot flush metainfo: %w", err) diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index a43725f3c..6c319d7f8 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -2,7 +2,6 @@ package storage import ( "bytes" - "fmt" "path/filepath" "sync" "sync/atomic" @@ -78,10 +77,10 @@ func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLev bsw.metaindexWriter = &mp.metaindexData } -// InitFromFilePart initializes bsw from a file-based part on the given path. +// MustInitFromFilePart initializes bsw from a file-based part on the given path. // // The bsw doesn't pollute OS page cache if nocache is set. -func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error { +func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) { path = filepath.Clean(path) // Create the directory @@ -89,40 +88,18 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre // Create part files in the directory. timestampsPath := filepath.Join(path, timestampsFilename) - timestampsFile, err := filestream.Create(timestampsPath, nocache) - if err != nil { - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create timestamps file: %w", err) - } + timestampsFile := filestream.MustCreate(timestampsPath, nocache) valuesPath := filepath.Join(path, valuesFilename) - valuesFile, err := filestream.Create(valuesPath, nocache) - if err != nil { - timestampsFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create values file: %w", err) - } + valuesFile := filestream.MustCreate(valuesPath, nocache) indexPath := filepath.Join(path, indexFilename) - indexFile, err := filestream.Create(indexPath, nocache) - if err != nil { - timestampsFile.MustClose() - valuesFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create index file: %w", err) - } + indexFile := filestream.MustCreate(indexPath, nocache) // Always cache metaindex file in OS page cache, since it is immediately // read after the merge. metaindexPath := filepath.Join(path, metaindexFilename) - metaindexFile, err := filestream.Create(metaindexPath, false) - if err != nil { - timestampsFile.MustClose() - valuesFile.MustClose() - indexFile.MustClose() - fs.MustRemoveDirAtomic(path) - return fmt.Errorf("cannot create metaindex file: %w", err) - } + metaindexFile := filestream.MustCreate(metaindexPath, false) bsw.reset() bsw.compressLevel = compressLevel @@ -131,8 +108,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.valuesWriter = valuesFile bsw.indexWriter = indexFile bsw.metaindexWriter = metaindexFile - - return nil } // MustClose closes the bsw. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 13cbe4690..7f54859a1 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1300,9 +1300,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi logger.Panicf("BUG: dstPartPath must be non-empty") } nocache := dstPartType == partBig - if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { - logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err) - } + bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel) } // Merge source parts to destination part.