diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 2fa08e3d6..432b9262c 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -108,10 +108,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod) startTime := time.Now() WG = syncwg.WaitGroup{} - strg, err := storage.OpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries) - if err != nil { - logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err) - } + strg := storage.MustOpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries) Storage = strg initStaleSnapshotsRemover(strg) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 82fdd0b72..6b75eb1f5 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1446,10 +1446,7 @@ func TestMatchTagFilters(t *testing.T) { func TestIndexDBRepopulateAfterRotation(t *testing.T) { r := rand.New(rand.NewSource(1)) path := "TestIndexRepopulateAfterRotation" - s, err := OpenStorage(path, msecsPerMonth, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, msecsPerMonth, 1e5, 1e5) defer func() { s.MustClose() if err := os.RemoveAll(path); err != nil { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index abc20458b..8b344453f 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -250,14 +250,14 @@ func (pt *partition) Drop() { logger.Infof("partition %q has been dropped", pt.name) } -// openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, error) { +// mustOpenPartition opens the existing partition from the given paths. +func mustOpenPartition(smallPartsPath, bigPartsPath string, s *Storage) *partition { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) name := filepath.Base(smallPartsPath) if !strings.HasSuffix(bigPartsPath, name) { - return nil, fmt.Errorf("patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name) + logger.Panicf("FATAL: patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name) } partNamesSmall, partNamesBig := mustReadPartNames(smallPartsPath, bigPartsPath) @@ -269,14 +269,14 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, pt.smallParts = smallParts pt.bigParts = bigParts if err := pt.tr.fromPartitionName(name); err != nil { - return nil, fmt.Errorf("cannot obtain partition time range from smallPartsPath %q: %w", smallPartsPath, err) + logger.Panicf("FATAL: cannot obtain partition time range from smallPartsPath %q: %s", smallPartsPath, err) } pt.startBackgroundWorkers() // Wake up a single background merger, so it could start merging parts if needed. pt.notifyBackgroundMergers() - return pt, nil + return pt } func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index c1176279f..86d5bd81c 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -191,11 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. - var err error - pt, err = openPartition(smallPartsPath, bigPartsPath, strg) - if err != nil { - t.Fatalf("cannot open partition: %s", err) - } + pt = mustOpenPartition(smallPartsPath, bigPartsPath, strg) testPartitionSearch(t, pt, tsids, tr, rbsExpected, rowsCountExpected) pt.MustClose() } diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index bcc024b4e..7cd13703b 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -72,10 +72,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) { func TestSearch(t *testing.T) { path := "TestSearch" - st, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage %q: %s", path, err) - } + st := MustOpenStorage(path, 0, 0, 0) defer func() { st.MustClose() if err := os.RemoveAll(path); err != nil { @@ -121,10 +118,7 @@ func TestSearch(t *testing.T) { // Re-open the storage in order to flush all the pending cached data. st.MustClose() - st, err = OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot re-open storage %q: %s", path, err) - } + st = MustOpenStorage(path, 0, 0, 0) // Run search. tr := TimeRange{ diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a566b469a..9d0d8ef5b 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -134,11 +134,11 @@ type Storage struct { isReadOnly uint32 } -// OpenStorage opens storage on the given path with the given retentionMsecs. -func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) (*Storage, error) { +// MustOpenStorage opens storage on the given path with the given retentionMsecs. +func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) *Storage { path, err := filepath.Abs(path) if err != nil { - return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err) + logger.Panicf("FATAL: cannot determine absolute path for %q: %s", path, err) } if retentionMsecs <= 0 { retentionMsecs = maxRetentionMsecs @@ -172,7 +172,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Check whether restore process finished successfully restoreLockF := filepath.Join(path, backupnames.RestoreInProgressFilename) if fs.IsPathExist(restoreLockF) { - return nil, fmt.Errorf("restore lock file exists, incomplete vmrestore run. Run vmrestore again or remove lock file %q", restoreLockF) + logger.Panicf("FATAL: incomplete vmrestore run; run vmrestore again or remove lock file %q", restoreLockF) } // Pre-create snapshots directory if it is missing. @@ -227,11 +227,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load deleted metricIDs from idbCurr and idbPrev dmisCurr, err := idbCurr.loadDeletedMetricIDs() if err != nil { - return nil, fmt.Errorf("cannot load deleted metricIDs for the current indexDB: %w", err) + logger.Panicf("FATAL: cannot load deleted metricIDs for the current indexDB at %q: %s", path, err) } dmisPrev, err := idbPrev.loadDeletedMetricIDs() if err != nil { - return nil, fmt.Errorf("cannot load deleted metricIDs for the previous indexDB: %w", err) + logger.Panicf("FATAL: cannot load deleted metricIDs for the previous indexDB at %q: %s", path, err) } s.setDeletedMetricIDs(dmisCurr) s.updateDeletedMetricIDs(dmisPrev) @@ -242,18 +242,14 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load data tablePath := filepath.Join(path, dataDirname) - tb, err := openTable(tablePath, s) - if err != nil { - s.idb().MustClose() - return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) - } + tb := mustOpenTable(tablePath, s) s.tb = tb s.startCurrHourMetricIDsUpdater() s.startNextDayMetricIDsUpdater() s.startRetentionWatcher() - return s, nil + return s } var maxTSIDCacheSize int diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index cf5cc8b9e..595d7c3e5 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -422,10 +422,7 @@ func TestNextRetentionDuration(t *testing.T) { func TestStorageOpenClose(t *testing.T) { path := "TestStorageOpenClose" for i := 0; i < 10; i++ { - s, err := OpenStorage(path, -1, 1e5, 1e6) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, -1, 1e5, 1e6) s.MustClose() } if err := os.RemoveAll(path); err != nil { @@ -436,20 +433,14 @@ func TestStorageOpenClose(t *testing.T) { func TestStorageRandTimestamps(t *testing.T) { path := "TestStorageRandTimestamps" retentionMsecs := int64(10 * msecsPerMonth) - s, err := OpenStorage(path, retentionMsecs, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 0, 0) t.Run("serial", func(t *testing.T) { for i := 0; i < 3; i++ { if err := testStorageRandTimestamps(s); err != nil { t.Fatalf("error on iteration %d: %s", i, err) } s.MustClose() - s, err = OpenStorage(path, retentionMsecs, 0, 0) - if err != nil { - t.Fatalf("cannot open storage on iteration %d: %s", i, err) - } + s = MustOpenStorage(path, retentionMsecs, 0, 0) } }) t.Run("concurrent", func(t *testing.T) { @@ -526,10 +517,7 @@ func testStorageRandTimestamps(s *Storage) error { func TestStorageDeleteSeries(t *testing.T) { path := "TestStorageDeleteSeries" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) // Verify no label names exist lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline) @@ -549,10 +537,7 @@ func TestStorageDeleteSeries(t *testing.T) { // Re-open the storage in order to check how deleted metricIDs // are persisted. s.MustClose() - s, err = OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err) - } + s = MustOpenStorage(path, 0, 0, 0) } }) @@ -747,10 +732,7 @@ func checkLabelNames(lns []string, lnsExpected map[string]bool) error { func TestStorageRegisterMetricNamesSerial(t *testing.T) { path := "TestStorageRegisterMetricNamesSerial" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) if err := testStorageRegisterMetricNames(s); err != nil { t.Fatalf("unexpected error: %s", err) } @@ -762,10 +744,7 @@ func TestStorageRegisterMetricNamesSerial(t *testing.T) { func TestStorageRegisterMetricNamesConcurrent(t *testing.T) { path := "TestStorageRegisterMetricNamesConcurrent" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func() { @@ -914,10 +893,7 @@ func TestStorageAddRowsSerial(t *testing.T) { rng := rand.New(rand.NewSource(1)) path := "TestStorageAddRowsSerial" retentionMsecs := int64(msecsPerMonth * 10) - s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) if err := testStorageAddRows(rng, s); err != nil { t.Fatalf("unexpected error: %s", err) } @@ -930,10 +906,7 @@ func TestStorageAddRowsSerial(t *testing.T) { func TestStorageAddRowsConcurrent(t *testing.T) { path := "TestStorageAddRowsConcurrent" retentionMsecs := int64(msecsPerMonth * 10) - s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func(n int) { @@ -1018,10 +991,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error { // Try opening the storage from snapshot. snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName) - s1, err := OpenStorage(snapshotPath, 0, 0, 0) - if err != nil { - return fmt.Errorf("cannot open storage from snapshot: %w", err) - } + s1 := MustOpenStorage(snapshotPath, 0, 0, 0) // Verify the snapshot contains rows var m1 Metrics @@ -1065,10 +1035,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error { func TestStorageRotateIndexDB(t *testing.T) { path := "TestStorageRotateIndexDB" - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, 0, 0, 0) // Start indexDB rotater in a separate goroutine stopCh := make(chan struct{}) @@ -1152,10 +1119,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) { rng := rand.New(rand.NewSource(1)) path := "TestStorageDeleteStaleSnapshots" retentionMsecs := int64(msecsPerMonth * 10) - s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5) - if err != nil { - t.Fatalf("cannot open storage: %s", err) - } + s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) const rowsPerAdd = 1e3 const addsCount = 10 maxTimestamp := timestampFromTime(time.Now()) diff --git a/lib/storage/storage_timing_test.go b/lib/storage/storage_timing_test.go index a7ff988c6..1e95c8ce0 100644 --- a/lib/storage/storage_timing_test.go +++ b/lib/storage/storage_timing_test.go @@ -17,10 +17,7 @@ func BenchmarkStorageAddRows(b *testing.B) { func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) { path := fmt.Sprintf("BenchmarkStorageAddRows_%d", rowsPerBatch) - s, err := OpenStorage(path, 0, 0, 0) - if err != nil { - b.Fatalf("cannot open storage at %q: %s", path, err) - } + s := MustOpenStorage(path, 0, 0, 0) defer func() { s.MustClose() if err := os.RemoveAll(path); err != nil { diff --git a/lib/storage/table.go b/lib/storage/table.go index 265f78f1c..4f9a6e84f 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -76,10 +76,10 @@ func (ptw *partitionWrapper) scheduleToDrop() { atomic.AddUint64(&ptw.mustDrop, 1) } -// openTable opens a table on the given path. +// mustOpenTable opens a table on the given path. // // The table is created if it doesn't exist. -func openTable(path string, s *Storage) (*table, error) { +func mustOpenTable(path string, s *Storage) *table { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -106,10 +106,7 @@ func openTable(path string, s *Storage) (*table, error) { fs.MustRemoveTemporaryDirs(bigSnapshotsPath) // Open partitions. - pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s) - if err != nil { - return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) - } + pts := mustOpenPartitions(smallPartitionsPath, bigPartitionsPath, s) tb := &table{ path: path, @@ -126,7 +123,7 @@ func openTable(path string, s *Storage) (*table, error) { } tb.startRetentionWatcher() tb.startFinalDedupWatcher() - return tb, nil + return tb } // CreateSnapshot creates tb snapshot and returns paths to small and big parts of it. @@ -482,7 +479,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ([]*partition, error) { +func mustOpenPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) []*partition { // Certain partition directories in either `big` or `small` dir may be missing // after restoring from backup. So populate partition names from both dirs. ptNames := make(map[string]bool) @@ -492,14 +489,10 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ( for ptName := range ptNames { smallPartsPath := filepath.Join(smallPartitionsPath, ptName) bigPartsPath := filepath.Join(bigPartitionsPath, ptName) - pt, err := openPartition(smallPartsPath, bigPartsPath, s) - if err != nil { - mustClosePartitions(pts) - return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) - } + pt := mustOpenPartition(smallPartsPath, bigPartsPath, s) pts = append(pts, pt) } - return pts, nil + return pts } func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool) { @@ -518,12 +511,6 @@ func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool) } } -func mustClosePartitions(pts []*partition) { - for _, pt := range pts { - pt.MustClose() - } -} - type partitionWrappers struct { a []*partitionWrapper } diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index df60fbe59..20a387061 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -184,10 +184,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange, // Create a table from rowss and test search on it. strg := newTestStorage() - tb, err := openTable("test-table", strg) - if err != nil { - t.Fatalf("cannot create table: %s", err) - } + tb := mustOpenTable("test-table", strg) defer func() { if err := os.RemoveAll("test-table"); err != nil { t.Fatalf("cannot remove table directory: %s", err) @@ -203,10 +200,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange, tb.MustClose() // Open the created table and test search on it. - tb, err = openTable("test-table", strg) - if err != nil { - t.Fatalf("cannot open table: %s", err) - } + tb = mustOpenTable("test-table", strg) testTableSearch(t, tb, tsids, trSearch, rbsExpected, rowsCountExpected) tb.MustClose() } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index a834924f1..1e9a62b48 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -47,10 +47,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount createdBenchTables[path] = true } strg := newTestStorage() - tb, err := openTable(path, strg) - if err != nil { - b.Fatalf("cnanot open table %q: %s", path, err) - } + tb := mustOpenTable(path, strg) // Verify rows count in the table opened from files. insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert) @@ -70,10 +67,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn b.Helper() strg := newTestStorage() - tb, err := openTable(path, strg) - if err != nil { - b.Fatalf("cannot open table %q: %s", path, err) - } + tb := mustOpenTable(path, strg) insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert) timestamp := uint64(startTimestamp) diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index e6cdc044a..2e8da8aad 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -19,20 +19,14 @@ func TestTableOpenClose(t *testing.T) { // Create a new table strg := newTestStorage() strg.retentionMsecs = retentionMsecs - tb, err := openTable(path, strg) - if err != nil { - t.Fatalf("cannot create new table: %s", err) - } + tb := mustOpenTable(path, strg) // Close it tb.MustClose() // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := openTable(path, strg) - if err != nil { - t.Fatalf("cannot open created table: %s", err) - } + tb := mustOpenTable(path, strg) tb.MustClose() } } diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 0082e1c6a..56865900f 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -48,10 +48,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tablePath := "benchmarkTableAddRows" strg := newTestStorage() for i := 0; i < b.N; i++ { - tb, err := openTable(tablePath, strg) - if err != nil { - b.Fatalf("cannot open table %q: %s", tablePath, err) - } + tb := mustOpenTable(tablePath, strg) workCh := make(chan struct{}, insertsCount) for j := 0; j < insertsCount; j++ { @@ -94,10 +91,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tb.MustClose() // Open the table from files and verify the rows count on it - tb, err = openTable(tablePath, strg) - if err != nil { - b.Fatalf("cannot open table %q: %s", tablePath, err) - } + tb = mustOpenTable(tablePath, strg) var m TableMetrics tb.UpdateMetrics(&m) if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {