From 036a7b736585329ccb022e9896e5535ae91513eb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Apr 2023 22:11:56 -0700 Subject: [PATCH] lib/fs: replace MkdirAllIfNotExist->MustMkdirIfNotExist and MkdirAllFailIfExist->MustMkdirFailIfExist Callers of these functions log the returned error and then exit. The returned error already contains the path to directory, which was failed to be created. So let's just log the error together with the call stack inside these functions. This leaves the debuggability of the returned error at the same level while allows simplifying the code at callers' side. While at it, properly use MustMkdirFailIfExist instead of MustMkdirIfNotExist inside inmemoryPart.MustStoreToDisk(). It is expected that the inmemoryPart.MustStoreToDick() must fail if there is already a directory under the given path. --- app/vmselect/netstorage/tmp_blocks_file.go | 7 ++- lib/backup/actions/restore.go | 4 +- lib/fs/fs.go | 26 ++++++----- lib/mergeset/block_stream_writer.go | 4 +- lib/mergeset/inmemory_part.go | 11 ++--- lib/mergeset/table.go | 16 ++----- lib/persistentqueue/persistentqueue.go | 4 +- lib/storage/block_stream_writer.go | 4 +- lib/storage/inmemory_part.go | 11 ++--- lib/storage/partition.go | 30 ++++--------- lib/storage/partition_search_test.go | 6 +-- lib/storage/storage.go | 33 ++++---------- lib/storage/table.go | 51 +++++++--------------- lib/storage/table_search_test.go | 4 +- lib/storage/table_search_timing_test.go | 4 +- lib/storage/table_timing_test.go | 4 +- 16 files changed, 71 insertions(+), 148 deletions(-) diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index 522414057..ba3e60de3 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -3,6 +3,7 @@ package netstorage import ( "fmt" "os" + "path/filepath" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -20,11 +21,9 @@ func InitTmpBlocksDir(tmpDirPath string) { if len(tmpDirPath) == 0 { tmpDirPath = os.TempDir() } - tmpBlocksDir = tmpDirPath + "/searchResults" + tmpBlocksDir = filepath.Join(tmpDirPath, "searchResults") fs.MustRemoveAll(tmpBlocksDir) - if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil { - logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err) - } + fs.MustMkdirIfNotExist(tmpBlocksDir) } var tmpBlocksDir string diff --git a/lib/backup/actions/restore.go b/lib/backup/actions/restore.go index 4664d5728..155b2d8ea 100644 --- a/lib/backup/actions/restore.go +++ b/lib/backup/actions/restore.go @@ -45,9 +45,7 @@ func (r *Restore) Run() error { startTime := time.Now() // Make sure VictoriaMetrics doesn't run during the restore process. - if err := fs.MkdirAllIfNotExist(r.Dst.Dir); err != nil { - return fmt.Errorf("cannot create dir %q: %w", r.Dst.Dir, err) - } + fs.MustMkdirIfNotExist(r.Dst.Dir) flockF, err := fs.CreateFlockFile(r.Dst.Dir) if err != nil { return fmt.Errorf("cannot create lock file in %q; make sure VictoriaMetrics doesn't use the dir; error: %w", r.Dst.Dir, err) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 380a1cba4..8cf547e58 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -99,22 +99,26 @@ func IsTemporaryFileName(fn string) bool { // tmpFileNameRe is regexp for temporary file name - see WriteFileAtomically for details. var tmpFileNameRe = regexp.MustCompile(`\.tmp\.\d+$`) -// MkdirAllIfNotExist creates the given path dir if it isn't exist. -func MkdirAllIfNotExist(path string) error { +// MustMkdirIfNotExist creates the given path dir if it isn't exist. +func MustMkdirIfNotExist(path string) { if IsPathExist(path) { - return nil + return + } + if err := mkdirSync(path); err != nil { + logger.Panicf("FATAL: cannot create directory: %s", err) } - return mkdirSync(path) } -// MkdirAllFailIfExist creates the given path dir if it isn't exist. +// MustMkdirFailIfExist creates the given path dir if it isn't exist. // -// Returns error if path already exists. -func MkdirAllFailIfExist(path string) error { +// If the directory at the given path already exists, then the function logs the error and exits. +func MustMkdirFailIfExist(path string) { if IsPathExist(path) { - return fmt.Errorf("the %q already exists", path) + logger.Panicf("FATAL: the %q already exists", path) + } + if err := mkdirSync(path); err != nil { + logger.Panicf("FATAL: cannot create directory: %s", err) } - return mkdirSync(path) } func mkdirSync(path string) error { @@ -310,9 +314,7 @@ func CopyDirectory(srcPath, dstPath string) error { if err != nil { return err } - if err := MkdirAllIfNotExist(dstPath); err != nil { - return err - } + MustMkdirIfNotExist(dstPath) for _, de := range des { if !de.Type().IsRegular() { // Skip non-files diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index 4287722c0..9ddb51951 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -78,9 +78,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre path = filepath.Clean(path) // Create the directory - if err := fs.MkdirAllFailIfExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirFailIfExist(path) // Create part files in the directory. diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index 794f1dedc..1a8d1ade4 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -1,7 +1,6 @@ package mergeset import ( - "fmt" "path/filepath" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -32,11 +31,10 @@ func (mp *inmemoryPart) Reset() { mp.lensData.Reset() } -// StoreToDisk stores mp to the given path on disk. -func (mp *inmemoryPart) StoreToDisk(path string) error { - if err := fs.MkdirAllIfNotExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } +// MustStoreToDisk stores mp to the given path on disk. +func (mp *inmemoryPart) MustStoreToDisk(path string) { + fs.MustMkdirFailIfExist(path) + metaindexPath := filepath.Join(path, metaindexFilename) fs.MustWriteSync(metaindexPath, mp.metaindexData.B) @@ -53,7 +51,6 @@ func (mp *inmemoryPart) StoreToDisk(path string) error { fs.MustSyncPath(path) // Do not sync parent directory - it must be synced by the caller. - return nil } // Init initializes mp from ib. diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index eadb40de0..7111cc458 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -328,9 +328,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb startTime := time.Now() // Create a directory for the table if it doesn't exist yet. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) // Protect from concurrent opens. flockF, err := fs.CreateFlockFile(path) @@ -1091,9 +1089,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal if isFinal && len(pws) == 1 && pws[0].mp != nil { // Fast path: flush a single in-memory part to disk. mp := pws[0].mp - if err := mp.StoreToDisk(dstPartPath); err != nil { - logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err) - } + mp.MustStoreToDisk(dstPartPath) pwNew := tb.openCreatedPart(pws, nil, dstPartPath) tb.swapSrcWithDstParts(pws, pwNew, dstPartType) return nil @@ -1373,9 +1369,7 @@ var mergeWorkersLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) func openParts(path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, err - } + fs.MustMkdirIfNotExist(path) fs.MustRemoveTemporaryDirs(path) // Remove txn and tmp directories, which may be left after the upgrade @@ -1464,9 +1458,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error { // Flush inmemory items to disk. tb.flushInmemoryItems() - if err := fs.MkdirAllFailIfExist(dstDir); err != nil { - return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err) - } + fs.MustMkdirFailIfExist(dstDir) pws := tb.getParts(nil) defer tb.putParts(pws) diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index fef7bbdbe..067969aaf 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -183,9 +183,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB } } - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) q.flockF = mustCreateFlockFile(path) mustCloseFlockF := true defer func() { diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index f17397da0..b60662c6e 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -97,9 +97,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre path = filepath.Clean(path) // Create the directory - if err := fs.MkdirAllFailIfExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirFailIfExist(path) // Create part files in the directory. timestampsPath := filepath.Join(path, timestampsFilename) diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index b5b9b0559..2e3c48084 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -1,7 +1,6 @@ package storage import ( - "fmt" "path/filepath" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -35,11 +34,10 @@ func (mp *inmemoryPart) Reset() { mp.creationTime = 0 } -// StoreToDisk stores the mp to the given path on disk. -func (mp *inmemoryPart) StoreToDisk(path string) error { - if err := fs.MkdirAllIfNotExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } +// MustStoreToDisk stores the mp to the given path on disk. +func (mp *inmemoryPart) MustStoreToDisk(path string) { + fs.MustMkdirFailIfExist(path) + timestampsPath := filepath.Join(path, timestampsFilename) fs.MustWriteSync(timestampsPath, mp.timestampsData.B) @@ -56,7 +54,6 @@ func (mp *inmemoryPart) StoreToDisk(path string) error { fs.MustSyncPath(path) // Do not sync parent directory - it must be synced by the caller. - return nil } // InitFromRows initializes mp from the given rows. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index c7f5fa2ff..2b8c4a1d6 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -212,20 +212,16 @@ func (pw *partWrapper) decRef() { } } -// createPartition creates new partition for the given timestamp and the given paths +// mustCreatePartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) (*partition, error) { +func mustCreatePartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) *partition { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Join(filepath.Clean(smallPartitionsPath), name) bigPartsPath := filepath.Join(filepath.Clean(bigPartitionsPath), name) logger.Infof("creating a partition %q with smallPartsPath=%q, bigPartsPath=%q", name, smallPartsPath, bigPartsPath) - if err := fs.MkdirAllFailIfExist(smallPartsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for small parts %q: %w", smallPartsPath, err) - } - if err := fs.MkdirAllFailIfExist(bigPartsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for big parts %q: %w", bigPartsPath, err) - } + fs.MustMkdirFailIfExist(smallPartsPath) + fs.MustMkdirFailIfExist(bigPartsPath) pt := newPartition(name, smallPartsPath, bigPartsPath, s) pt.tr.fromPartitionTimestamp(timestamp) @@ -233,7 +229,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str logger.Infof("partition %q has been created", name) - return pt, nil + return pt } func (pt *partition) startBackgroundWorkers() { @@ -1268,9 +1264,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi if !isDedupEnabled() && isFinal && len(pws) == 1 && pws[0].mp != nil { // Fast path: flush a single in-memory part to disk. mp := pws[0].mp - if err := mp.StoreToDisk(dstPartPath); err != nil { - logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err) - } + mp.MustStoreToDisk(dstPartPath) pwNew := pt.openCreatedPart(&mp.ph, pws, nil, dstPartPath) pt.swapSrcWithDstParts(pws, pwNew, dstPartType) return nil @@ -1796,9 +1790,7 @@ func getPartsSize(pws []*partWrapper) uint64 { func openParts(path string, partNames []string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, err - } + fs.MustMkdirIfNotExist(path) fs.MustRemoveTemporaryDirs(path) // Remove txn and tmp directories, which may be left after the upgrade @@ -1879,12 +1871,8 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error { pt.PutParts(pwsBig) }() - if err := fs.MkdirAllFailIfExist(smallPath); err != nil { - return fmt.Errorf("cannot create snapshot dir %q: %w", smallPath, err) - } - if err := fs.MkdirAllFailIfExist(bigPath); err != nil { - return fmt.Errorf("cannot create snapshot dir %q: %w", bigPath, err) - } + fs.MustMkdirFailIfExist(smallPath) + fs.MustMkdirFailIfExist(bigPath) // Create a file with part names at smallPath mustWritePartNames(pwsSmall, pwsBig, smallPath) diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 50a4fe66f..c1176279f 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -169,10 +169,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma // Create partition from rowss and test search on it. strg := newTestStorage() strg.retentionMsecs = timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 - pt, err := createPartition(ptt, "small-table", "big-table", strg) - if err != nil { - t.Fatalf("cannot create partition: %s", err) - } + pt := mustCreatePartition(ptt, "small-table", "big-table", strg) smallPartsPath := pt.smallPartsPath bigPartsPath := pt.bigPartsPath defer func() { @@ -194,6 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. + var err error pt, err = openPartition(smallPartsPath, bigPartsPath, strg) if err != nil { t.Fatalf("cannot open partition: %s", err) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 43e7df1dd..c79df60c4 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -152,9 +152,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer retentionMsecs: retentionMsecs, stop: make(chan struct{}), } - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create a directory for the storage at %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) // Check whether the cache directory must be removed // It is removed if it contains resetCacheOnStartupFilename. @@ -184,9 +182,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Pre-create snapshots directory if it is missing. snapshotsPath := filepath.Join(path, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err) - } + fs.MustMkdirIfNotExist(snapshotsPath) fs.MustRemoveTemporaryDirs(snapshotsPath) // Initialize series cardinality limiter. @@ -221,17 +217,13 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load metadata metadataDir := filepath.Join(path, metadataDirname) isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname)) - if err := fs.MkdirAllIfNotExist(metadataDir); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", metadataDir, err) - } + fs.MustMkdirIfNotExist(metadataDir) s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB) // Load indexdb idbPath := filepath.Join(path, indexdbDirname) idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err) - } + fs.MustMkdirIfNotExist(idbSnapshotsPath) fs.MustRemoveTemporaryDirs(idbSnapshotsPath) idbCurr, idbPrev, err := s.openIndexDBTables(idbPath) if err != nil { @@ -327,9 +319,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) { snapshotName := snapshot.NewName() srcDir := s.path dstDir := filepath.Join(srcDir, snapshotsDirname, snapshotName) - if err := fs.MkdirAllFailIfExist(dstDir); err != nil { - return "", fmt.Errorf("cannot create dir %q: %w", dstDir, err) - } + fs.MustMkdirFailIfExist(dstDir) dirsToRemoveOnError = append(dirsToRemoveOnError, dstDir) smallDir, bigDir, err := s.tb.CreateSnapshot(snapshotName, deadline) @@ -339,9 +329,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) { dirsToRemoveOnError = append(dirsToRemoveOnError, smallDir, bigDir) dstDataDir := filepath.Join(dstDir, dataDirname) - if err := fs.MkdirAllFailIfExist(dstDataDir); err != nil { - return "", fmt.Errorf("cannot create dir %q: %w", dstDataDir, err) - } + fs.MustMkdirFailIfExist(dstDataDir) dstSmallDir := filepath.Join(dstDataDir, smallDirname) if err := fs.SymlinkRelative(smallDir, dstSmallDir); err != nil { return "", fmt.Errorf("cannot create symlink from %q to %q: %w", smallDir, dstSmallDir, err) @@ -1788,10 +1776,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci if err != nil { err = fmt.Errorf("cannot update per-date data: %w", err) } else { - err = s.tb.AddRows(rows) - if err != nil { - err = fmt.Errorf("cannot add rows to table: %w", err) - } + s.tb.MustAddRows(rows) } if err != nil { return fmt.Errorf("error occurred during rows addition: %w", err) @@ -2349,9 +2334,7 @@ func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) { } func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) { - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) fs.MustRemoveTemporaryDirs(path) // Search for the two most recent tables - the last one is active, diff --git a/lib/storage/table.go b/lib/storage/table.go index 6b0bf82dd..6bb4afa43 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -83,9 +83,7 @@ func openTable(path string, s *Storage) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create directory for table %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) // Protect from concurrent opens. flockF, err := fs.CreateFlockFile(path) @@ -96,25 +94,19 @@ func openTable(path string, s *Storage) (*table, error) { // Create directories for small and big partitions if they don't exist yet. smallPartitionsPath := filepath.Join(path, smallDirname) - if err := fs.MkdirAllIfNotExist(smallPartitionsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for small partitions %q: %w", smallPartitionsPath, err) - } + fs.MustMkdirIfNotExist(smallPartitionsPath) fs.MustRemoveTemporaryDirs(smallPartitionsPath) + smallSnapshotsPath := filepath.Join(smallPartitionsPath, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(smallSnapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", smallSnapshotsPath, err) - } + fs.MustMkdirIfNotExist(smallSnapshotsPath) fs.MustRemoveTemporaryDirs(smallSnapshotsPath) bigPartitionsPath := filepath.Join(path, bigDirname) - if err := fs.MkdirAllIfNotExist(bigPartitionsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for big partitions %q: %w", bigPartitionsPath, err) - } + fs.MustMkdirIfNotExist(bigPartitionsPath) fs.MustRemoveTemporaryDirs(bigPartitionsPath) + bigSnapshotsPath := filepath.Join(bigPartitionsPath, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(bigSnapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", bigSnapshotsPath, err) - } + fs.MustMkdirIfNotExist(bigSnapshotsPath) fs.MustRemoveTemporaryDirs(bigSnapshotsPath) // Open partitions. @@ -152,14 +144,10 @@ func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, s defer tb.PutPartitions(ptws) dstSmallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName) - if err := fs.MkdirAllFailIfExist(dstSmallDir); err != nil { - return "", "", fmt.Errorf("cannot create dir %q: %w", dstSmallDir, err) - } + fs.MustMkdirFailIfExist(dstSmallDir) + dstBigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName) - if err := fs.MkdirAllFailIfExist(dstBigDir); err != nil { - fs.MustRemoveAll(dstSmallDir) - return "", "", fmt.Errorf("cannot create dir %q: %w", dstBigDir, err) - } + fs.MustMkdirFailIfExist(dstBigDir) for _, ptw := range ptws { if deadline > 0 && fasttime.UnixTimestamp() > deadline { @@ -277,10 +265,10 @@ func (tb *table) ForceMergePartitions(partitionNamePrefix string) error { return nil } -// AddRows adds the given rows to the table tb. -func (tb *table) AddRows(rows []rawRow) error { +// MustAddRows adds the given rows to the table tb. +func (tb *table) MustAddRows(rows []rawRow) { if len(rows) == 0 { - return nil + return } // Verify whether all the rows may be added to a single partition. @@ -317,7 +305,7 @@ func (tb *table) AddRows(rows []rawRow) error { // Fast path - add all the rows into the ptw. ptw.pt.AddRows(rows) tb.PutPartitions(ptws) - return nil + return } // Slower path - split rows into per-partition buckets. @@ -343,7 +331,7 @@ func (tb *table) AddRows(rows []rawRow) error { } tb.PutPartitions(ptws) if len(missingRows) == 0 { - return nil + return } // The slowest path - there are rows that don't fit any existing partition. @@ -372,18 +360,11 @@ func (tb *table) AddRows(rows []rawRow) error { continue } - pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s) - if err != nil { - // Return only the first error, since it has no sense in returning all errors. - tb.ptwsLock.Unlock() - return fmt.Errorf("errors while adding rows to table %q: %w", tb.path, err) - } + pt := mustCreatePartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s) pt.AddRows(missingRows[i : i+1]) tb.addPartitionNolock(pt) } tb.ptwsLock.Unlock() - - return nil } func (tb *table) getMinMaxTimestamps() (int64, int64) { diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index 033fc8943..df60fbe59 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -194,9 +194,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange, } }() for _, rows := range rowss { - if err := tb.AddRows(rows); err != nil { - t.Fatalf("cannot add rows to table: %s", err) - } + tb.MustAddRows(rows) // Flush rows to parts. tb.flushPendingRows() diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 8f28aa739..9025bef8a 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -95,9 +95,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn r.Timestamp = int64(ts) r.Value = value } - if err := tb.AddRows(rows); err != nil { - panic(fmt.Errorf("cannot add %d rows: %w", rowsPerInsert, err)) - } + tb.MustAddRows(rows) } wg.Done() }(k) diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 714c33c06..0082e1c6a 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -80,9 +80,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { r.Value++ } // Add updated rowsCopy. - if err := tb.AddRows(rowsCopy); err != nil { - panic(fmt.Errorf("cannot add rows to table %q: %w", tablePath, err)) - } + tb.MustAddRows(rowsCopy) } doneCh <- struct{}{}