lib/filestream: change Create() to MustCreate()

Callers of this function log the returned error and exit.
It is better logging the error together with the path to the filename
and call stack directly inside the function. This simplifies
the code at callers' side without reducing the level of debuggability
This commit is contained in:
Aliaksandr Valialkin 2023-04-14 15:12:45 -07:00
parent 5eb163a08a
commit 9183a439c7
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 22 additions and 88 deletions

View file

@ -202,15 +202,15 @@ func OpenWriterAt(path string, offset int64, nocache bool) (*Writer, error) {
return newWriter(f, nocache), nil return newWriter(f, nocache), nil
} }
// Create creates the file for the given path in nocache mode. // MustCreate creates the file for the given path in nocache mode.
// //
// If nocache is set, the writer doesn't pollute OS page cache. // If nocache is set, the writer doesn't pollute OS page cache.
func Create(path string, nocache bool) (*Writer, error) { func MustCreate(path string, nocache bool) *Writer {
f, err := os.Create(path) f, err := os.Create(path)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot create file %q: %w", path, err) logger.Panicf("FATAL: cannot create file %q: %s", path, err)
} }
return newWriter(f, nocache), nil return newWriter(f, nocache)
} }
func newWriter(f *os.File, nocache bool) *Writer { func newWriter(f *os.File, nocache bool) *Writer {

View file

@ -29,10 +29,7 @@ func TestWriteRead(t *testing.T) {
func testWriteRead(t *testing.T, nocache bool, testStr string) { func testWriteRead(t *testing.T, nocache bool, testStr string) {
t.Helper() t.Helper()
w, err := Create("./nocache_test.txt", nocache) w := MustCreate("./nocache_test.txt", nocache)
if err != nil {
t.Fatalf("cannot create file: %s", err)
}
defer func() { defer func() {
_ = os.Remove("./nocache_test.txt") _ = os.Remove("./nocache_test.txt")
}() }()

View file

@ -34,10 +34,7 @@ func MustSyncPath(path string) {
// Use MustWriteAtomic if the file at the path must be either written in full // Use MustWriteAtomic if the file at the path must be either written in full
// or not written at all on app crash in the middle of the write. // or not written at all on app crash in the middle of the write.
func MustWriteSync(path string, data []byte) { func MustWriteSync(path string, data []byte) {
f, err := filestream.Create(path, false) f := filestream.MustCreate(path, false)
if err != nil {
logger.Panicf("FATAL: cannot create file: %s", err)
}
if _, err := f.Write(data); err != nil { if _, err := f.Write(data); err != nil {
f.MustClose() f.MustClose()
// Do not call MustRemoveAll(path), so the user could inspect // Do not call MustRemoveAll(path), so the user could inspect

View file

@ -1,7 +1,6 @@
package mergeset package mergeset
import ( import (
"fmt"
"path/filepath" "path/filepath"
"sync" "sync"
@ -71,10 +70,10 @@ func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLev
bsw.lensWriter = &mp.lensData bsw.lensWriter = &mp.lensData
} }
// InitFromFilePart initializes bsw from a file-based part on the given path. // MustInitFromFilePart initializes bsw from a file-based part on the given path.
// //
// The bsw doesn't pollute OS page cache if nocache is set. // The bsw doesn't pollute OS page cache if nocache is set.
func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error { func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) {
path = filepath.Clean(path) path = filepath.Clean(path)
// Create the directory // Create the directory
@ -85,38 +84,16 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
// Always cache metaindex file in OS page cache, since it is immediately // Always cache metaindex file in OS page cache, since it is immediately
// read after the merge. // read after the merge.
metaindexPath := filepath.Join(path, metaindexFilename) metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile, err := filestream.Create(metaindexPath, false) metaindexFile := filestream.MustCreate(metaindexPath, false)
if err != nil {
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create metaindex file: %w", err)
}
indexPath := filepath.Join(path, indexFilename) indexPath := filepath.Join(path, indexFilename)
indexFile, err := filestream.Create(indexPath, nocache) indexFile := filestream.MustCreate(indexPath, nocache)
if err != nil {
metaindexFile.MustClose()
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create index file: %w", err)
}
itemsPath := filepath.Join(path, itemsFilename) itemsPath := filepath.Join(path, itemsFilename)
itemsFile, err := filestream.Create(itemsPath, nocache) itemsFile := filestream.MustCreate(itemsPath, nocache)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create items file: %w", err)
}
lensPath := filepath.Join(path, lensFilename) lensPath := filepath.Join(path, lensFilename)
lensFile, err := filestream.Create(lensPath, nocache) lensFile := filestream.MustCreate(lensPath, nocache)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
itemsFile.MustClose()
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create lens file: %w", err)
}
bsw.reset() bsw.reset()
bsw.compressLevel = compressLevel bsw.compressLevel = compressLevel
@ -125,8 +102,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
bsw.indexWriter = indexFile bsw.indexWriter = indexFile
bsw.itemsWriter = itemsFile bsw.itemsWriter = itemsFile
bsw.lensWriter = lensFile bsw.lensWriter = lensFile
return nil
} }
// MustClose closes the bsw. // MustClose closes the bsw.

View file

@ -1121,9 +1121,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
bsw.InitFromInmemoryPart(mpNew, compressLevel) bsw.InitFromInmemoryPart(mpNew, compressLevel)
} else { } else {
nocache := srcItemsCount > maxItemsPerCachedPart() nocache := srcItemsCount > maxItemsPerCachedPart()
if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel)
logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err)
}
} }
// Merge source parts to destination part. // Merge source parts to destination part.

View file

@ -93,10 +93,7 @@ func (q *queue) mustResetFiles() {
q.readerLocalOffset = 0 q.readerLocalOffset = 0
q.writerPath = q.chunkFilePath(q.writerOffset) q.writerPath = q.chunkFilePath(q.writerOffset)
w, err := filestream.Create(q.writerPath, false) w := filestream.MustCreate(q.writerPath, false)
if err != nil {
logger.Panicf("FATAL: cannot create chunk file %q: %s", q.writerPath, err)
}
q.writer = w q.writer = w
q.readerPath = q.writerPath q.readerPath = q.writerPath
@ -445,10 +442,7 @@ func (q *queue) nextChunkFileForWrite() error {
q.writerFlushedOffset = q.writerOffset q.writerFlushedOffset = q.writerOffset
q.writerLocalOffset = 0 q.writerLocalOffset = 0
q.writerPath = q.chunkFilePath(q.writerOffset) q.writerPath = q.chunkFilePath(q.writerOffset)
w, err := filestream.Create(q.writerPath, false) w := filestream.MustCreate(q.writerPath, false)
if err != nil {
return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err)
}
q.writer = w q.writer = w
if err := q.flushMetainfo(); err != nil { if err := q.flushMetainfo(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err) return fmt.Errorf("cannot flush metainfo: %w", err)

View file

@ -2,7 +2,6 @@ package storage
import ( import (
"bytes" "bytes"
"fmt"
"path/filepath" "path/filepath"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -78,10 +77,10 @@ func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLev
bsw.metaindexWriter = &mp.metaindexData bsw.metaindexWriter = &mp.metaindexData
} }
// InitFromFilePart initializes bsw from a file-based part on the given path. // MustInitFromFilePart initializes bsw from a file-based part on the given path.
// //
// The bsw doesn't pollute OS page cache if nocache is set. // The bsw doesn't pollute OS page cache if nocache is set.
func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compressLevel int) error { func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) {
path = filepath.Clean(path) path = filepath.Clean(path)
// Create the directory // Create the directory
@ -89,40 +88,18 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
// Create part files in the directory. // Create part files in the directory.
timestampsPath := filepath.Join(path, timestampsFilename) timestampsPath := filepath.Join(path, timestampsFilename)
timestampsFile, err := filestream.Create(timestampsPath, nocache) timestampsFile := filestream.MustCreate(timestampsPath, nocache)
if err != nil {
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create timestamps file: %w", err)
}
valuesPath := filepath.Join(path, valuesFilename) valuesPath := filepath.Join(path, valuesFilename)
valuesFile, err := filestream.Create(valuesPath, nocache) valuesFile := filestream.MustCreate(valuesPath, nocache)
if err != nil {
timestampsFile.MustClose()
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create values file: %w", err)
}
indexPath := filepath.Join(path, indexFilename) indexPath := filepath.Join(path, indexFilename)
indexFile, err := filestream.Create(indexPath, nocache) indexFile := filestream.MustCreate(indexPath, nocache)
if err != nil {
timestampsFile.MustClose()
valuesFile.MustClose()
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create index file: %w", err)
}
// Always cache metaindex file in OS page cache, since it is immediately // Always cache metaindex file in OS page cache, since it is immediately
// read after the merge. // read after the merge.
metaindexPath := filepath.Join(path, metaindexFilename) metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile, err := filestream.Create(metaindexPath, false) metaindexFile := filestream.MustCreate(metaindexPath, false)
if err != nil {
timestampsFile.MustClose()
valuesFile.MustClose()
indexFile.MustClose()
fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create metaindex file: %w", err)
}
bsw.reset() bsw.reset()
bsw.compressLevel = compressLevel bsw.compressLevel = compressLevel
@ -131,8 +108,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
bsw.valuesWriter = valuesFile bsw.valuesWriter = valuesFile
bsw.indexWriter = indexFile bsw.indexWriter = indexFile
bsw.metaindexWriter = metaindexFile bsw.metaindexWriter = metaindexFile
return nil
} }
// MustClose closes the bsw. // MustClose closes the bsw.

View file

@ -1300,9 +1300,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
logger.Panicf("BUG: dstPartPath must be non-empty") logger.Panicf("BUG: dstPartPath must be non-empty")
} }
nocache := dstPartType == partBig nocache := dstPartType == partBig
if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel)
logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err)
}
} }
// Merge source parts to destination part. // Merge source parts to destination part.