From 52006149b2a03fab2c174990ccc6ec1564932e4b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 14 Apr 2023 23:01:20 -0700 Subject: [PATCH] lib/storage: replace OpenStorage() with MustOpenStorage() Callers of OpenStorage() log the returned error and exit. The error logging and exit can be performed inside MustOpenStorage() alongside with printing the stack trace for better debuggability. This simplifies the code at caller side. --- app/vmstorage/main.go | 5 +-- lib/storage/index_db_test.go | 5 +-- lib/storage/partition.go | 10 ++--- lib/storage/partition_search_test.go | 6 +-- lib/storage/search_test.go | 10 +---- lib/storage/storage.go | 20 ++++----- lib/storage/storage_test.go | 60 +++++-------------------- lib/storage/storage_timing_test.go | 5 +-- lib/storage/table.go | 27 +++-------- lib/storage/table_search_test.go | 10 +---- lib/storage/table_search_timing_test.go | 10 +---- lib/storage/table_test.go | 10 +---- lib/storage/table_timing_test.go | 10 +---- 13 files changed, 46 insertions(+), 142 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 2fa08e3d64..432b9262c0 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -108,10 +108,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod) startTime := time.Now() WG = syncwg.WaitGroup{} - strg, err := storage.OpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries) - if err != nil { - logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err) - } + strg := storage.MustOpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries) Storage = strg initStaleSnapshotsRemover(strg) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 82fdd0b725..6b75eb1f5a 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1446,10 +1446,7 @@ func TestMatchTagFilters(t *testing.T) { func TestIndexDBRepopulateAfterRotation(t *testing.T) { r := rand.New(rand.NewSource(1)) path := "TestIndexRepopulateAfterRotation" - s, err := OpenStorage(path, msecsPerMonth, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, msecsPerMonth, 1e5, 1e5) defer func() { s.MustClose() if err := os.RemoveAll(path); err != nil { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index abc20458bd..8b344453f1 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -250,14 +250,14 @@ func (pt *partition) Drop() { logger.Infof("partition %q has been dropped", pt.name) } -// openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, error) { +// mustOpenPartition opens the existing partition from the given paths. +func mustOpenPartition(smallPartsPath, bigPartsPath string, s *Storage) *partition { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) name := filepath.Base(smallPartsPath) if !strings.HasSuffix(bigPartsPath, name) { - return nil, fmt.Errorf("patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name) + logger.Panicf("FATAL: patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name) } partNamesSmall, partNamesBig := mustReadPartNames(smallPartsPath, bigPartsPath) @@ -269,14 +269,14 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, pt.smallParts = smallParts pt.bigParts = bigParts if err := pt.tr.fromPartitionName(name); err != nil { - return nil, fmt.Errorf("cannot obtain partition time range from smallPartsPath %q: %w", smallPartsPath, err) + logger.Panicf("FATAL: cannot obtain partition time range from smallPartsPath %q: %s", smallPartsPath, err) } pt.startBackgroundWorkers() // Wake up a single background merger, so it could start merging parts if needed. pt.notifyBackgroundMergers() - return pt, nil + return pt } func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index c1176279fe..86d5bd81c8 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -191,11 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. - var err error - pt, err = openPartition(smallPartsPath, bigPartsPath, strg) - if err != nil { - t.Fatalf("cannot open partition: %s", err) - } + pt = mustOpenPartition(smallPartsPath, bigPartsPath, strg) testPartitionSearch(t, pt, tsids, tr, rbsExpected, rowsCountExpected) pt.MustClose() } diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index bcc024b4ec..7cd13703be 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -72,10 +72,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) { func TestSearch(t *testing.T) { path := "TestSearch" - st, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage %q: %s", path, err) - } + st := MustOpenStorage(path, 0, 0, 0) defer func() { st.MustClose() if err := os.RemoveAll(path); err != nil { @@ -121,10 +118,7 @@ func TestSearch(t *testing.T) { // Re-open the storage in order to flush all the pending cached data. st.MustClose() - st, err = OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot re-open storage %q: %s", path, err) - } + st = MustOpenStorage(path, 0, 0, 0) // Run search. tr := TimeRange{ diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a566b469a1..9d0d8ef5bc 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -134,11 +134,11 @@ type Storage struct { isReadOnly uint32 } -// OpenStorage opens storage on the given path with the given retentionMsecs. -func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) (*Storage, error) { +// MustOpenStorage opens storage on the given path with the given retentionMsecs. +func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) *Storage { path, err := filepath.Abs(path) if err != nil { - return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err) + logger.Panicf("FATAL: cannot determine absolute path for %q: %s", path, err) } if retentionMsecs <= 0 { retentionMsecs = maxRetentionMsecs @@ -172,7 +172,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Check whether restore process finished successfully restoreLockF := filepath.Join(path, backupnames.RestoreInProgressFilename) if fs.IsPathExist(restoreLockF) { - return nil, fmt.Errorf("restore lock file exists, incomplete vmrestore run. Run vmrestore again or remove lock file %q", restoreLockF) + logger.Panicf("FATAL: incomplete vmrestore run; run vmrestore again or remove lock file %q", restoreLockF) } // Pre-create snapshots directory if it is missing. @@ -227,11 +227,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load deleted metricIDs from idbCurr and idbPrev dmisCurr, err := idbCurr.loadDeletedMetricIDs() if err != nil { - return nil, fmt.Errorf("cannot load deleted metricIDs for the current indexDB: %w", err) + logger.Panicf("FATAL: cannot load deleted metricIDs for the current indexDB at %q: %s", path, err) } dmisPrev, err := idbPrev.loadDeletedMetricIDs() if err != nil { - return nil, fmt.Errorf("cannot load deleted metricIDs for the previous indexDB: %w", err) + logger.Panicf("FATAL: cannot load deleted metricIDs for the previous indexDB at %q: %s", path, err) } s.setDeletedMetricIDs(dmisCurr) s.updateDeletedMetricIDs(dmisPrev) @@ -242,18 +242,14 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load data tablePath := filepath.Join(path, dataDirname) - tb, err := openTable(tablePath, s) - if err != nil { - s.idb().MustClose() - return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) - } + tb := mustOpenTable(tablePath, s) s.tb = tb s.startCurrHourMetricIDsUpdater() s.startNextDayMetricIDsUpdater() s.startRetentionWatcher() - return s, nil + return s } var maxTSIDCacheSize int diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index cf5cc8b9e3..595d7c3e50 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -422,10 +422,7 @@ func TestNextRetentionDuration(t *testing.T) { func TestStorageOpenClose(t *testing.T) { path := "TestStorageOpenClose" for i := 0; i < 10; i++ { - s, err := OpenStorage(path, -1, 1e5, 1e6) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, -1, 1e5, 1e6) s.MustClose() } if err := os.RemoveAll(path); err != nil { @@ -436,20 +433,14 @@ func TestStorageOpenClose(t *testing.T) { func TestStorageRandTimestamps(t *testing.T) { path := "TestStorageRandTimestamps" retentionMsecs := int64(10 * msecsPerMonth) - s, err := OpenStorage(path, retentionMsecs, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 0, 0) t.Run("serial", func(t *testing.T) { for i := 0; i < 3; i++ { if err := testStorageRandTimestamps(s); err != nil { t.Fatalf("error on iteration %d: %s", i, err) } s.MustClose() - s, err = OpenStorage(path, retentionMsecs, 0, 0) - if err != nil { - t.Fatalf("cannot open storage on iteration %d: %s", i, err) - } + s = MustOpenStorage(path, retentionMsecs, 0, 0) } }) t.Run("concurrent", func(t *testing.T) { @@ -526,10 +517,7 @@ func testStorageRandTimestamps(s *Storage) error { func TestStorageDeleteSeries(t *testing.T) { path := "TestStorageDeleteSeries" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) // Verify no label names exist lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline) @@ -549,10 +537,7 @@ func TestStorageDeleteSeries(t *testing.T) { // Re-open the storage in order to check how deleted metricIDs // are persisted. s.MustClose() - s, err = OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err) - } + s = MustOpenStorage(path, 0, 0, 0) } }) @@ -747,10 +732,7 @@ func checkLabelNames(lns []string, lnsExpected map[string]bool) error { func TestStorageRegisterMetricNamesSerial(t *testing.T) { path := "TestStorageRegisterMetricNamesSerial" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) if err := testStorageRegisterMetricNames(s); err != nil { t.Fatalf("unexpected error: %s", err) } @@ -762,10 +744,7 @@ func TestStorageRegisterMetricNamesSerial(t *testing.T) { func TestStorageRegisterMetricNamesConcurrent(t *testing.T) { path := "TestStorageRegisterMetricNamesConcurrent" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func() { @@ -914,10 +893,7 @@ func TestStorageAddRowsSerial(t *testing.T) { rng := rand.New(rand.NewSource(1)) path := "TestStorageAddRowsSerial" retentionMsecs := int64(msecsPerMonth * 10) - s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) if err := testStorageAddRows(rng, s); err != nil { t.Fatalf("unexpected error: %s", err) } @@ -930,10 +906,7 @@ func TestStorageAddRowsSerial(t *testing.T) { func TestStorageAddRowsConcurrent(t *testing.T) { path := "TestStorageAddRowsConcurrent" retentionMsecs := int64(msecsPerMonth * 10) - s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func(n int) { @@ -1018,10 +991,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error { // Try opening the storage from snapshot. snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName) - s1, err := OpenStorage(snapshotPath, 0, 0, 0) - if err != nil { - return fmt.Errorf("cannot open storage from snapshot: %w", err) - } + s1 := MustOpenStorage(snapshotPath, 0, 0, 0) // Verify the snapshot contains rows var m1 Metrics @@ -1065,10 +1035,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error { func TestStorageRotateIndexDB(t *testing.T) { path := "TestStorageRotateIndexDB" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) // Start indexDB rotater in a separate goroutine stopCh := make(chan struct{}) @@ -1152,10 +1119,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) { rng := rand.New(rand.NewSource(1)) path := "TestStorageDeleteStaleSnapshots" retentionMsecs := int64(msecsPerMonth * 10) - s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) const rowsPerAdd = 1e3 const addsCount = 10 maxTimestamp := timestampFromTime(time.Now()) diff --git a/lib/storage/storage_timing_test.go b/lib/storage/storage_timing_test.go index a7ff988c68..1e95c8ce07 100644 --- a/lib/storage/storage_timing_test.go +++ b/lib/storage/storage_timing_test.go @@ -17,10 +17,7 @@ func BenchmarkStorageAddRows(b *testing.B) { func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) { path := fmt.Sprintf("BenchmarkStorageAddRows_%d", rowsPerBatch) - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - b.Fatalf("cannot open storage at %q: %s", path, err) - } + s := MustOpenStorage(path, 0, 0, 0) defer func() { s.MustClose() if err := os.RemoveAll(path); err != nil { diff --git a/lib/storage/table.go b/lib/storage/table.go index 265f78f1cd..4f9a6e84f9 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -76,10 +76,10 @@ func (ptw *partitionWrapper) scheduleToDrop() { atomic.AddUint64(&ptw.mustDrop, 1) } -// openTable opens a table on the given path. +// mustOpenTable opens a table on the given path. // // The table is created if it doesn't exist. -func openTable(path string, s *Storage) (*table, error) { +func mustOpenTable(path string, s *Storage) *table { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -106,10 +106,7 @@ func openTable(path string, s *Storage) (*table, error) { fs.MustRemoveTemporaryDirs(bigSnapshotsPath) // Open partitions. - pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s) - if err != nil { - return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) - } + pts := mustOpenPartitions(smallPartitionsPath, bigPartitionsPath, s) tb := &table{ path: path, @@ -126,7 +123,7 @@ func openTable(path string, s *Storage) (*table, error) { } tb.startRetentionWatcher() tb.startFinalDedupWatcher() - return tb, nil + return tb } // CreateSnapshot creates tb snapshot and returns paths to small and big parts of it. @@ -482,7 +479,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ([]*partition, error) { +func mustOpenPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) []*partition { // Certain partition directories in either `big` or `small` dir may be missing // after restoring from backup. So populate partition names from both dirs. ptNames := make(map[string]bool) @@ -492,14 +489,10 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ( for ptName := range ptNames { smallPartsPath := filepath.Join(smallPartitionsPath, ptName) bigPartsPath := filepath.Join(bigPartitionsPath, ptName) - pt, err := openPartition(smallPartsPath, bigPartsPath, s) - if err != nil { - mustClosePartitions(pts) - return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) - } + pt := mustOpenPartition(smallPartsPath, bigPartsPath, s) pts = append(pts, pt) } - return pts, nil + return pts } func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool) { @@ -518,12 +511,6 @@ func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool) } } -func mustClosePartitions(pts []*partition) { - for _, pt := range pts { - pt.MustClose() - } -} - type partitionWrappers struct { a []*partitionWrapper } diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index df60fbe596..20a3870617 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -184,10 +184,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange, // Create a table from rowss and test search on it. strg := newTestStorage() - tb, err := openTable("test-table", strg) - if err != nil { - t.Fatalf("cannot create table: %s", err) - } + tb := mustOpenTable("test-table", strg) defer func() { if err := os.RemoveAll("test-table"); err != nil { t.Fatalf("cannot remove table directory: %s", err) @@ -203,10 +200,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange, tb.MustClose() // Open the created table and test search on it. - tb, err = openTable("test-table", strg) - if err != nil { - t.Fatalf("cannot open table: %s", err) - } + tb = mustOpenTable("test-table", strg) testTableSearch(t, tb, tsids, trSearch, rbsExpected, rowsCountExpected) tb.MustClose() } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index a834924f18..1e9a62b487 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -47,10 +47,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount createdBenchTables[path] = true } strg := newTestStorage() - tb, err := openTable(path, strg) - if err != nil { - b.Fatalf("cnanot open table %q: %s", path, err) - } + tb := mustOpenTable(path, strg) // Verify rows count in the table opened from files. insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert) @@ -70,10 +67,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn b.Helper() strg := newTestStorage() - tb, err := openTable(path, strg) - if err != nil { - b.Fatalf("cannot open table %q: %s", path, err) - } + tb := mustOpenTable(path, strg) insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert) timestamp := uint64(startTimestamp) diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index e6cdc044af..2e8da8aadb 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -19,20 +19,14 @@ func TestTableOpenClose(t *testing.T) { // Create a new table strg := newTestStorage() strg.retentionMsecs = retentionMsecs - tb, err := openTable(path, strg) - if err != nil { - t.Fatalf("cannot create new table: %s", err) - } + tb := mustOpenTable(path, strg) // Close it tb.MustClose() // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := openTable(path, strg) - if err != nil { - t.Fatalf("cannot open created table: %s", err) - } + tb := mustOpenTable(path, strg) tb.MustClose() } } diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 0082e1c6a3..56865900f2 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -48,10 +48,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tablePath := "benchmarkTableAddRows" strg := newTestStorage() for i := 0; i < b.N; i++ { - tb, err := openTable(tablePath, strg) - if err != nil { - b.Fatalf("cannot open table %q: %s", tablePath, err) - } + tb := mustOpenTable(tablePath, strg) workCh := make(chan struct{}, insertsCount) for j := 0; j < insertsCount; j++ { @@ -94,10 +91,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tb.MustClose() // Open the table from files and verify the rows count on it - tb, err = openTable(tablePath, strg) - if err != nil { - b.Fatalf("cannot open table %q: %s", tablePath, err) - } + tb = mustOpenTable(tablePath, strg) var m TableMetrics tb.UpdateMetrics(&m) if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {