diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index 522414057..ba3e60de3 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -3,6 +3,7 @@ package netstorage import ( "fmt" "os" + "path/filepath" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -20,11 +21,9 @@ func InitTmpBlocksDir(tmpDirPath string) { if len(tmpDirPath) == 0 { tmpDirPath = os.TempDir() } - tmpBlocksDir = tmpDirPath + "/searchResults" + tmpBlocksDir = filepath.Join(tmpDirPath, "searchResults") fs.MustRemoveAll(tmpBlocksDir) - if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil { - logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err) - } + fs.MustMkdirIfNotExist(tmpBlocksDir) } var tmpBlocksDir string diff --git a/lib/backup/actions/restore.go b/lib/backup/actions/restore.go index 4664d5728..155b2d8ea 100644 --- a/lib/backup/actions/restore.go +++ b/lib/backup/actions/restore.go @@ -45,9 +45,7 @@ func (r *Restore) Run() error { startTime := time.Now() // Make sure VictoriaMetrics doesn't run during the restore process. - if err := fs.MkdirAllIfNotExist(r.Dst.Dir); err != nil { - return fmt.Errorf("cannot create dir %q: %w", r.Dst.Dir, err) - } + fs.MustMkdirIfNotExist(r.Dst.Dir) flockF, err := fs.CreateFlockFile(r.Dst.Dir) if err != nil { return fmt.Errorf("cannot create lock file in %q; make sure VictoriaMetrics doesn't use the dir; error: %w", r.Dst.Dir, err) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 380a1cba4..8cf547e58 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -99,22 +99,26 @@ func IsTemporaryFileName(fn string) bool { // tmpFileNameRe is regexp for temporary file name - see WriteFileAtomically for details. var tmpFileNameRe = regexp.MustCompile(`\.tmp\.\d+$`) -// MkdirAllIfNotExist creates the given path dir if it isn't exist. -func MkdirAllIfNotExist(path string) error { +// MustMkdirIfNotExist creates the given path dir if it isn't exist. +func MustMkdirIfNotExist(path string) { if IsPathExist(path) { - return nil + return + } + if err := mkdirSync(path); err != nil { + logger.Panicf("FATAL: cannot create directory: %s", err) } - return mkdirSync(path) } -// MkdirAllFailIfExist creates the given path dir if it isn't exist. +// MustMkdirFailIfExist creates the given path dir if it isn't exist. // -// Returns error if path already exists. -func MkdirAllFailIfExist(path string) error { +// If the directory at the given path already exists, then the function logs the error and exits. +func MustMkdirFailIfExist(path string) { if IsPathExist(path) { - return fmt.Errorf("the %q already exists", path) + logger.Panicf("FATAL: the %q already exists", path) + } + if err := mkdirSync(path); err != nil { + logger.Panicf("FATAL: cannot create directory: %s", err) } - return mkdirSync(path) } func mkdirSync(path string) error { @@ -310,9 +314,7 @@ func CopyDirectory(srcPath, dstPath string) error { if err != nil { return err } - if err := MkdirAllIfNotExist(dstPath); err != nil { - return err - } + MustMkdirIfNotExist(dstPath) for _, de := range des { if !de.Type().IsRegular() { // Skip non-files diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index 4287722c0..9ddb51951 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -78,9 +78,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre path = filepath.Clean(path) // Create the directory - if err := fs.MkdirAllFailIfExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirFailIfExist(path) // Create part files in the directory. diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index 794f1dedc..1a8d1ade4 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -1,7 +1,6 @@ package mergeset import ( - "fmt" "path/filepath" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -32,11 +31,10 @@ func (mp *inmemoryPart) Reset() { mp.lensData.Reset() } -// StoreToDisk stores mp to the given path on disk. -func (mp *inmemoryPart) StoreToDisk(path string) error { - if err := fs.MkdirAllIfNotExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } +// MustStoreToDisk stores mp to the given path on disk. +func (mp *inmemoryPart) MustStoreToDisk(path string) { + fs.MustMkdirFailIfExist(path) + metaindexPath := filepath.Join(path, metaindexFilename) fs.MustWriteSync(metaindexPath, mp.metaindexData.B) @@ -53,7 +51,6 @@ func (mp *inmemoryPart) StoreToDisk(path string) error { fs.MustSyncPath(path) // Do not sync parent directory - it must be synced by the caller. - return nil } // Init initializes mp from ib. diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index eadb40de0..7111cc458 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -328,9 +328,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb startTime := time.Now() // Create a directory for the table if it doesn't exist yet. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) // Protect from concurrent opens. flockF, err := fs.CreateFlockFile(path) @@ -1091,9 +1089,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal if isFinal && len(pws) == 1 && pws[0].mp != nil { // Fast path: flush a single in-memory part to disk. mp := pws[0].mp - if err := mp.StoreToDisk(dstPartPath); err != nil { - logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err) - } + mp.MustStoreToDisk(dstPartPath) pwNew := tb.openCreatedPart(pws, nil, dstPartPath) tb.swapSrcWithDstParts(pws, pwNew, dstPartType) return nil @@ -1373,9 +1369,7 @@ var mergeWorkersLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) func openParts(path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, err - } + fs.MustMkdirIfNotExist(path) fs.MustRemoveTemporaryDirs(path) // Remove txn and tmp directories, which may be left after the upgrade @@ -1464,9 +1458,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error { // Flush inmemory items to disk. tb.flushInmemoryItems() - if err := fs.MkdirAllFailIfExist(dstDir); err != nil { - return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err) - } + fs.MustMkdirFailIfExist(dstDir) pws := tb.getParts(nil) defer tb.putParts(pws) diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index fef7bbdbe..067969aaf 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -183,9 +183,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB } } - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) q.flockF = mustCreateFlockFile(path) mustCloseFlockF := true defer func() { diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index f17397da0..b60662c6e 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -97,9 +97,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre path = filepath.Clean(path) // Create the directory - if err := fs.MkdirAllFailIfExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirFailIfExist(path) // Create part files in the directory. timestampsPath := filepath.Join(path, timestampsFilename) diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index b5b9b0559..2e3c48084 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -1,7 +1,6 @@ package storage import ( - "fmt" "path/filepath" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -35,11 +34,10 @@ func (mp *inmemoryPart) Reset() { mp.creationTime = 0 } -// StoreToDisk stores the mp to the given path on disk. -func (mp *inmemoryPart) StoreToDisk(path string) error { - if err := fs.MkdirAllIfNotExist(path); err != nil { - return fmt.Errorf("cannot create directory %q: %w", path, err) - } +// MustStoreToDisk stores the mp to the given path on disk. +func (mp *inmemoryPart) MustStoreToDisk(path string) { + fs.MustMkdirFailIfExist(path) + timestampsPath := filepath.Join(path, timestampsFilename) fs.MustWriteSync(timestampsPath, mp.timestampsData.B) @@ -56,7 +54,6 @@ func (mp *inmemoryPart) StoreToDisk(path string) error { fs.MustSyncPath(path) // Do not sync parent directory - it must be synced by the caller. - return nil } // InitFromRows initializes mp from the given rows. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index c7f5fa2ff..2b8c4a1d6 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -212,20 +212,16 @@ func (pw *partWrapper) decRef() { } } -// createPartition creates new partition for the given timestamp and the given paths +// mustCreatePartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) (*partition, error) { +func mustCreatePartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) *partition { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Join(filepath.Clean(smallPartitionsPath), name) bigPartsPath := filepath.Join(filepath.Clean(bigPartitionsPath), name) logger.Infof("creating a partition %q with smallPartsPath=%q, bigPartsPath=%q", name, smallPartsPath, bigPartsPath) - if err := fs.MkdirAllFailIfExist(smallPartsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for small parts %q: %w", smallPartsPath, err) - } - if err := fs.MkdirAllFailIfExist(bigPartsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for big parts %q: %w", bigPartsPath, err) - } + fs.MustMkdirFailIfExist(smallPartsPath) + fs.MustMkdirFailIfExist(bigPartsPath) pt := newPartition(name, smallPartsPath, bigPartsPath, s) pt.tr.fromPartitionTimestamp(timestamp) @@ -233,7 +229,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str logger.Infof("partition %q has been created", name) - return pt, nil + return pt } func (pt *partition) startBackgroundWorkers() { @@ -1268,9 +1264,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi if !isDedupEnabled() && isFinal && len(pws) == 1 && pws[0].mp != nil { // Fast path: flush a single in-memory part to disk. mp := pws[0].mp - if err := mp.StoreToDisk(dstPartPath); err != nil { - logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err) - } + mp.MustStoreToDisk(dstPartPath) pwNew := pt.openCreatedPart(&mp.ph, pws, nil, dstPartPath) pt.swapSrcWithDstParts(pws, pwNew, dstPartType) return nil @@ -1796,9 +1790,7 @@ func getPartsSize(pws []*partWrapper) uint64 { func openParts(path string, partNames []string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, err - } + fs.MustMkdirIfNotExist(path) fs.MustRemoveTemporaryDirs(path) // Remove txn and tmp directories, which may be left after the upgrade @@ -1879,12 +1871,8 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error { pt.PutParts(pwsBig) }() - if err := fs.MkdirAllFailIfExist(smallPath); err != nil { - return fmt.Errorf("cannot create snapshot dir %q: %w", smallPath, err) - } - if err := fs.MkdirAllFailIfExist(bigPath); err != nil { - return fmt.Errorf("cannot create snapshot dir %q: %w", bigPath, err) - } + fs.MustMkdirFailIfExist(smallPath) + fs.MustMkdirFailIfExist(bigPath) // Create a file with part names at smallPath mustWritePartNames(pwsSmall, pwsBig, smallPath) diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 50a4fe66f..c1176279f 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -169,10 +169,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma // Create partition from rowss and test search on it. strg := newTestStorage() strg.retentionMsecs = timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 - pt, err := createPartition(ptt, "small-table", "big-table", strg) - if err != nil { - t.Fatalf("cannot create partition: %s", err) - } + pt := mustCreatePartition(ptt, "small-table", "big-table", strg) smallPartsPath := pt.smallPartsPath bigPartsPath := pt.bigPartsPath defer func() { @@ -194,6 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. + var err error pt, err = openPartition(smallPartsPath, bigPartsPath, strg) if err != nil { t.Fatalf("cannot open partition: %s", err) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 43e7df1dd..c79df60c4 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -152,9 +152,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer retentionMsecs: retentionMsecs, stop: make(chan struct{}), } - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create a directory for the storage at %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) // Check whether the cache directory must be removed // It is removed if it contains resetCacheOnStartupFilename. @@ -184,9 +182,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Pre-create snapshots directory if it is missing. snapshotsPath := filepath.Join(path, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err) - } + fs.MustMkdirIfNotExist(snapshotsPath) fs.MustRemoveTemporaryDirs(snapshotsPath) // Initialize series cardinality limiter. @@ -221,17 +217,13 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load metadata metadataDir := filepath.Join(path, metadataDirname) isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname)) - if err := fs.MkdirAllIfNotExist(metadataDir); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", metadataDir, err) - } + fs.MustMkdirIfNotExist(metadataDir) s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB) // Load indexdb idbPath := filepath.Join(path, indexdbDirname) idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err) - } + fs.MustMkdirIfNotExist(idbSnapshotsPath) fs.MustRemoveTemporaryDirs(idbSnapshotsPath) idbCurr, idbPrev, err := s.openIndexDBTables(idbPath) if err != nil { @@ -327,9 +319,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) { snapshotName := snapshot.NewName() srcDir := s.path dstDir := filepath.Join(srcDir, snapshotsDirname, snapshotName) - if err := fs.MkdirAllFailIfExist(dstDir); err != nil { - return "", fmt.Errorf("cannot create dir %q: %w", dstDir, err) - } + fs.MustMkdirFailIfExist(dstDir) dirsToRemoveOnError = append(dirsToRemoveOnError, dstDir) smallDir, bigDir, err := s.tb.CreateSnapshot(snapshotName, deadline) @@ -339,9 +329,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) { dirsToRemoveOnError = append(dirsToRemoveOnError, smallDir, bigDir) dstDataDir := filepath.Join(dstDir, dataDirname) - if err := fs.MkdirAllFailIfExist(dstDataDir); err != nil { - return "", fmt.Errorf("cannot create dir %q: %w", dstDataDir, err) - } + fs.MustMkdirFailIfExist(dstDataDir) dstSmallDir := filepath.Join(dstDataDir, smallDirname) if err := fs.SymlinkRelative(smallDir, dstSmallDir); err != nil { return "", fmt.Errorf("cannot create symlink from %q to %q: %w", smallDir, dstSmallDir, err) @@ -1788,10 +1776,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci if err != nil { err = fmt.Errorf("cannot update per-date data: %w", err) } else { - err = s.tb.AddRows(rows) - if err != nil { - err = fmt.Errorf("cannot add rows to table: %w", err) - } + s.tb.MustAddRows(rows) } if err != nil { return fmt.Errorf("error occurred during rows addition: %w", err) @@ -2349,9 +2334,7 @@ func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) { } func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) { - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) fs.MustRemoveTemporaryDirs(path) // Search for the two most recent tables - the last one is active, diff --git a/lib/storage/table.go b/lib/storage/table.go index 6b0bf82dd..6bb4afa43 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -83,9 +83,7 @@ func openTable(path string, s *Storage) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. - if err := fs.MkdirAllIfNotExist(path); err != nil { - return nil, fmt.Errorf("cannot create directory for table %q: %w", path, err) - } + fs.MustMkdirIfNotExist(path) // Protect from concurrent opens. flockF, err := fs.CreateFlockFile(path) @@ -96,25 +94,19 @@ func openTable(path string, s *Storage) (*table, error) { // Create directories for small and big partitions if they don't exist yet. smallPartitionsPath := filepath.Join(path, smallDirname) - if err := fs.MkdirAllIfNotExist(smallPartitionsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for small partitions %q: %w", smallPartitionsPath, err) - } + fs.MustMkdirIfNotExist(smallPartitionsPath) fs.MustRemoveTemporaryDirs(smallPartitionsPath) + smallSnapshotsPath := filepath.Join(smallPartitionsPath, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(smallSnapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", smallSnapshotsPath, err) - } + fs.MustMkdirIfNotExist(smallSnapshotsPath) fs.MustRemoveTemporaryDirs(smallSnapshotsPath) bigPartitionsPath := filepath.Join(path, bigDirname) - if err := fs.MkdirAllIfNotExist(bigPartitionsPath); err != nil { - return nil, fmt.Errorf("cannot create directory for big partitions %q: %w", bigPartitionsPath, err) - } + fs.MustMkdirIfNotExist(bigPartitionsPath) fs.MustRemoveTemporaryDirs(bigPartitionsPath) + bigSnapshotsPath := filepath.Join(bigPartitionsPath, snapshotsDirname) - if err := fs.MkdirAllIfNotExist(bigSnapshotsPath); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", bigSnapshotsPath, err) - } + fs.MustMkdirIfNotExist(bigSnapshotsPath) fs.MustRemoveTemporaryDirs(bigSnapshotsPath) // Open partitions. @@ -152,14 +144,10 @@ func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, s defer tb.PutPartitions(ptws) dstSmallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName) - if err := fs.MkdirAllFailIfExist(dstSmallDir); err != nil { - return "", "", fmt.Errorf("cannot create dir %q: %w", dstSmallDir, err) - } + fs.MustMkdirFailIfExist(dstSmallDir) + dstBigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName) - if err := fs.MkdirAllFailIfExist(dstBigDir); err != nil { - fs.MustRemoveAll(dstSmallDir) - return "", "", fmt.Errorf("cannot create dir %q: %w", dstBigDir, err) - } + fs.MustMkdirFailIfExist(dstBigDir) for _, ptw := range ptws { if deadline > 0 && fasttime.UnixTimestamp() > deadline { @@ -277,10 +265,10 @@ func (tb *table) ForceMergePartitions(partitionNamePrefix string) error { return nil } -// AddRows adds the given rows to the table tb. -func (tb *table) AddRows(rows []rawRow) error { +// MustAddRows adds the given rows to the table tb. +func (tb *table) MustAddRows(rows []rawRow) { if len(rows) == 0 { - return nil + return } // Verify whether all the rows may be added to a single partition. @@ -317,7 +305,7 @@ func (tb *table) AddRows(rows []rawRow) error { // Fast path - add all the rows into the ptw. ptw.pt.AddRows(rows) tb.PutPartitions(ptws) - return nil + return } // Slower path - split rows into per-partition buckets. @@ -343,7 +331,7 @@ func (tb *table) AddRows(rows []rawRow) error { } tb.PutPartitions(ptws) if len(missingRows) == 0 { - return nil + return } // The slowest path - there are rows that don't fit any existing partition. @@ -372,18 +360,11 @@ func (tb *table) AddRows(rows []rawRow) error { continue } - pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s) - if err != nil { - // Return only the first error, since it has no sense in returning all errors. - tb.ptwsLock.Unlock() - return fmt.Errorf("errors while adding rows to table %q: %w", tb.path, err) - } + pt := mustCreatePartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s) pt.AddRows(missingRows[i : i+1]) tb.addPartitionNolock(pt) } tb.ptwsLock.Unlock() - - return nil } func (tb *table) getMinMaxTimestamps() (int64, int64) { diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index 033fc8943..df60fbe59 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -194,9 +194,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange, } }() for _, rows := range rowss { - if err := tb.AddRows(rows); err != nil { - t.Fatalf("cannot add rows to table: %s", err) - } + tb.MustAddRows(rows) // Flush rows to parts. tb.flushPendingRows() diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 8f28aa739..9025bef8a 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -95,9 +95,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn r.Timestamp = int64(ts) r.Value = value } - if err := tb.AddRows(rows); err != nil { - panic(fmt.Errorf("cannot add %d rows: %w", rowsPerInsert, err)) - } + tb.MustAddRows(rows) } wg.Done() }(k) diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 714c33c06..0082e1c6a 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -80,9 +80,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { r.Value++ } // Add updated rowsCopy. - if err := tb.AddRows(rowsCopy); err != nil { - panic(fmt.Errorf("cannot add rows to table %q: %w", tablePath, err)) - } + tb.MustAddRows(rowsCopy) } doneCh <- struct{}{}