lib/storage: replace OpenStorage() with MustOpenStorage()

Callers of OpenStorage() log the returned error and exit.
The error logging and exit can be performed inside MustOpenStorage()
alongside with printing the stack trace for better debuggability.
This simplifies the code at caller side.
This commit is contained in:
Aliaksandr Valialkin 2023-04-14 23:01:20 -07:00
parent 2a2036160d
commit 52006149b2
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
13 changed files with 46 additions and 142 deletions

View file

@ -108,10 +108,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
startTime := time.Now()
WG = syncwg.WaitGroup{}
strg, err := storage.OpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries)
if err != nil {
logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err)
}
strg := storage.MustOpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries)
Storage = strg
initStaleSnapshotsRemover(strg)

View file

@ -1446,10 +1446,7 @@ func TestMatchTagFilters(t *testing.T) {
func TestIndexDBRepopulateAfterRotation(t *testing.T) {
r := rand.New(rand.NewSource(1))
path := "TestIndexRepopulateAfterRotation"
s, err := OpenStorage(path, msecsPerMonth, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, msecsPerMonth, 1e5, 1e5)
defer func() {
s.MustClose()
if err := os.RemoveAll(path); err != nil {

View file

@ -250,14 +250,14 @@ func (pt *partition) Drop() {
logger.Infof("partition %q has been dropped", pt.name)
}
// openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, error) {
// mustOpenPartition opens the existing partition from the given paths.
func mustOpenPartition(smallPartsPath, bigPartsPath string, s *Storage) *partition {
smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath)
name := filepath.Base(smallPartsPath)
if !strings.HasSuffix(bigPartsPath, name) {
return nil, fmt.Errorf("patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name)
logger.Panicf("FATAL: patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name)
}
partNamesSmall, partNamesBig := mustReadPartNames(smallPartsPath, bigPartsPath)
@ -269,14 +269,14 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
pt.smallParts = smallParts
pt.bigParts = bigParts
if err := pt.tr.fromPartitionName(name); err != nil {
return nil, fmt.Errorf("cannot obtain partition time range from smallPartsPath %q: %w", smallPartsPath, err)
logger.Panicf("FATAL: cannot obtain partition time range from smallPartsPath %q: %s", smallPartsPath, err)
}
pt.startBackgroundWorkers()
// Wake up a single background merger, so it could start merging parts if needed.
pt.notifyBackgroundMergers()
return pt, nil
return pt
}
func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition {

View file

@ -191,11 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.MustClose()
// Open the created partition and test search on it.
var err error
pt, err = openPartition(smallPartsPath, bigPartsPath, strg)
if err != nil {
t.Fatalf("cannot open partition: %s", err)
}
pt = mustOpenPartition(smallPartsPath, bigPartsPath, strg)
testPartitionSearch(t, pt, tsids, tr, rbsExpected, rowsCountExpected)
pt.MustClose()
}

View file

@ -72,10 +72,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
func TestSearch(t *testing.T) {
path := "TestSearch"
st, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage %q: %s", path, err)
}
st := MustOpenStorage(path, 0, 0, 0)
defer func() {
st.MustClose()
if err := os.RemoveAll(path); err != nil {
@ -121,10 +118,7 @@ func TestSearch(t *testing.T) {
// Re-open the storage in order to flush all the pending cached data.
st.MustClose()
st, err = OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot re-open storage %q: %s", path, err)
}
st = MustOpenStorage(path, 0, 0, 0)
// Run search.
tr := TimeRange{

View file

@ -134,11 +134,11 @@ type Storage struct {
isReadOnly uint32
}
// OpenStorage opens storage on the given path with the given retentionMsecs.
func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) (*Storage, error) {
// MustOpenStorage opens storage on the given path with the given retentionMsecs.
func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) *Storage {
path, err := filepath.Abs(path)
if err != nil {
return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err)
logger.Panicf("FATAL: cannot determine absolute path for %q: %s", path, err)
}
if retentionMsecs <= 0 {
retentionMsecs = maxRetentionMsecs
@ -172,7 +172,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// Check whether restore process finished successfully
restoreLockF := filepath.Join(path, backupnames.RestoreInProgressFilename)
if fs.IsPathExist(restoreLockF) {
return nil, fmt.Errorf("restore lock file exists, incomplete vmrestore run. Run vmrestore again or remove lock file %q", restoreLockF)
logger.Panicf("FATAL: incomplete vmrestore run; run vmrestore again or remove lock file %q", restoreLockF)
}
// Pre-create snapshots directory if it is missing.
@ -227,11 +227,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// Load deleted metricIDs from idbCurr and idbPrev
dmisCurr, err := idbCurr.loadDeletedMetricIDs()
if err != nil {
return nil, fmt.Errorf("cannot load deleted metricIDs for the current indexDB: %w", err)
logger.Panicf("FATAL: cannot load deleted metricIDs for the current indexDB at %q: %s", path, err)
}
dmisPrev, err := idbPrev.loadDeletedMetricIDs()
if err != nil {
return nil, fmt.Errorf("cannot load deleted metricIDs for the previous indexDB: %w", err)
logger.Panicf("FATAL: cannot load deleted metricIDs for the previous indexDB at %q: %s", path, err)
}
s.setDeletedMetricIDs(dmisCurr)
s.updateDeletedMetricIDs(dmisPrev)
@ -242,18 +242,14 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// Load data
tablePath := filepath.Join(path, dataDirname)
tb, err := openTable(tablePath, s)
if err != nil {
s.idb().MustClose()
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)
}
tb := mustOpenTable(tablePath, s)
s.tb = tb
s.startCurrHourMetricIDsUpdater()
s.startNextDayMetricIDsUpdater()
s.startRetentionWatcher()
return s, nil
return s
}
var maxTSIDCacheSize int

View file

@ -422,10 +422,7 @@ func TestNextRetentionDuration(t *testing.T) {
func TestStorageOpenClose(t *testing.T) {
path := "TestStorageOpenClose"
for i := 0; i < 10; i++ {
s, err := OpenStorage(path, -1, 1e5, 1e6)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, -1, 1e5, 1e6)
s.MustClose()
}
if err := os.RemoveAll(path); err != nil {
@ -436,20 +433,14 @@ func TestStorageOpenClose(t *testing.T) {
func TestStorageRandTimestamps(t *testing.T) {
path := "TestStorageRandTimestamps"
retentionMsecs := int64(10 * msecsPerMonth)
s, err := OpenStorage(path, retentionMsecs, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, retentionMsecs, 0, 0)
t.Run("serial", func(t *testing.T) {
for i := 0; i < 3; i++ {
if err := testStorageRandTimestamps(s); err != nil {
t.Fatalf("error on iteration %d: %s", i, err)
}
s.MustClose()
s, err = OpenStorage(path, retentionMsecs, 0, 0)
if err != nil {
t.Fatalf("cannot open storage on iteration %d: %s", i, err)
}
s = MustOpenStorage(path, retentionMsecs, 0, 0)
}
})
t.Run("concurrent", func(t *testing.T) {
@ -526,10 +517,7 @@ func testStorageRandTimestamps(s *Storage) error {
func TestStorageDeleteSeries(t *testing.T) {
path := "TestStorageDeleteSeries"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, 0, 0, 0)
// Verify no label names exist
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
@ -549,10 +537,7 @@ func TestStorageDeleteSeries(t *testing.T) {
// Re-open the storage in order to check how deleted metricIDs
// are persisted.
s.MustClose()
s, err = OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err)
}
s = MustOpenStorage(path, 0, 0, 0)
}
})
@ -747,10 +732,7 @@ func checkLabelNames(lns []string, lnsExpected map[string]bool) error {
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
path := "TestStorageRegisterMetricNamesSerial"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, 0, 0, 0)
if err := testStorageRegisterMetricNames(s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -762,10 +744,7 @@ func TestStorageRegisterMetricNamesSerial(t *testing.T) {
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
path := "TestStorageRegisterMetricNamesConcurrent"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, 0, 0, 0)
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
@ -914,10 +893,7 @@ func TestStorageAddRowsSerial(t *testing.T) {
rng := rand.New(rand.NewSource(1))
path := "TestStorageAddRowsSerial"
retentionMsecs := int64(msecsPerMonth * 10)
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5)
if err := testStorageAddRows(rng, s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -930,10 +906,7 @@ func TestStorageAddRowsSerial(t *testing.T) {
func TestStorageAddRowsConcurrent(t *testing.T) {
path := "TestStorageAddRowsConcurrent"
retentionMsecs := int64(msecsPerMonth * 10)
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5)
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func(n int) {
@ -1018,10 +991,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
// Try opening the storage from snapshot.
snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName)
s1, err := OpenStorage(snapshotPath, 0, 0, 0)
if err != nil {
return fmt.Errorf("cannot open storage from snapshot: %w", err)
}
s1 := MustOpenStorage(snapshotPath, 0, 0, 0)
// Verify the snapshot contains rows
var m1 Metrics
@ -1065,10 +1035,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
func TestStorageRotateIndexDB(t *testing.T) {
path := "TestStorageRotateIndexDB"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, 0, 0, 0)
// Start indexDB rotater in a separate goroutine
stopCh := make(chan struct{})
@ -1152,10 +1119,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
rng := rand.New(rand.NewSource(1))
path := "TestStorageDeleteStaleSnapshots"
retentionMsecs := int64(msecsPerMonth * 10)
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5)
const rowsPerAdd = 1e3
const addsCount = 10
maxTimestamp := timestampFromTime(time.Now())

View file

@ -17,10 +17,7 @@ func BenchmarkStorageAddRows(b *testing.B) {
func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) {
path := fmt.Sprintf("BenchmarkStorageAddRows_%d", rowsPerBatch)
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
b.Fatalf("cannot open storage at %q: %s", path, err)
}
s := MustOpenStorage(path, 0, 0, 0)
defer func() {
s.MustClose()
if err := os.RemoveAll(path); err != nil {

View file

@ -76,10 +76,10 @@ func (ptw *partitionWrapper) scheduleToDrop() {
atomic.AddUint64(&ptw.mustDrop, 1)
}
// openTable opens a table on the given path.
// mustOpenTable opens a table on the given path.
//
// The table is created if it doesn't exist.
func openTable(path string, s *Storage) (*table, error) {
func mustOpenTable(path string, s *Storage) *table {
path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet.
@ -106,10 +106,7 @@ func openTable(path string, s *Storage) (*table, error) {
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
// Open partitions.
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s)
if err != nil {
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
}
pts := mustOpenPartitions(smallPartitionsPath, bigPartitionsPath, s)
tb := &table{
path: path,
@ -126,7 +123,7 @@ func openTable(path string, s *Storage) (*table, error) {
}
tb.startRetentionWatcher()
tb.startFinalDedupWatcher()
return tb, nil
return tb
}
// CreateSnapshot creates tb snapshot and returns paths to small and big parts of it.
@ -482,7 +479,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
}
}
func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ([]*partition, error) {
func mustOpenPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) []*partition {
// Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool)
@ -492,14 +489,10 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) (
for ptName := range ptNames {
smallPartsPath := filepath.Join(smallPartitionsPath, ptName)
bigPartsPath := filepath.Join(bigPartitionsPath, ptName)
pt, err := openPartition(smallPartsPath, bigPartsPath, s)
if err != nil {
mustClosePartitions(pts)
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)
}
pt := mustOpenPartition(smallPartsPath, bigPartsPath, s)
pts = append(pts, pt)
}
return pts, nil
return pts
}
func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool) {
@ -518,12 +511,6 @@ func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool)
}
}
func mustClosePartitions(pts []*partition) {
for _, pt := range pts {
pt.MustClose()
}
}
type partitionWrappers struct {
a []*partitionWrapper
}

View file

@ -184,10 +184,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange,
// Create a table from rowss and test search on it.
strg := newTestStorage()
tb, err := openTable("test-table", strg)
if err != nil {
t.Fatalf("cannot create table: %s", err)
}
tb := mustOpenTable("test-table", strg)
defer func() {
if err := os.RemoveAll("test-table"); err != nil {
t.Fatalf("cannot remove table directory: %s", err)
@ -203,10 +200,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange,
tb.MustClose()
// Open the created table and test search on it.
tb, err = openTable("test-table", strg)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}
tb = mustOpenTable("test-table", strg)
testTableSearch(t, tb, tsids, trSearch, rbsExpected, rowsCountExpected)
tb.MustClose()
}

View file

@ -47,10 +47,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
createdBenchTables[path] = true
}
strg := newTestStorage()
tb, err := openTable(path, strg)
if err != nil {
b.Fatalf("cnanot open table %q: %s", path, err)
}
tb := mustOpenTable(path, strg)
// Verify rows count in the table opened from files.
insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert)
@ -70,10 +67,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
b.Helper()
strg := newTestStorage()
tb, err := openTable(path, strg)
if err != nil {
b.Fatalf("cannot open table %q: %s", path, err)
}
tb := mustOpenTable(path, strg)
insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert)
timestamp := uint64(startTimestamp)

View file

@ -19,20 +19,14 @@ func TestTableOpenClose(t *testing.T) {
// Create a new table
strg := newTestStorage()
strg.retentionMsecs = retentionMsecs
tb, err := openTable(path, strg)
if err != nil {
t.Fatalf("cannot create new table: %s", err)
}
tb := mustOpenTable(path, strg)
// Close it
tb.MustClose()
// Re-open created table multiple times.
for i := 0; i < 10; i++ {
tb, err := openTable(path, strg)
if err != nil {
t.Fatalf("cannot open created table: %s", err)
}
tb := mustOpenTable(path, strg)
tb.MustClose()
}
}

View file

@ -48,10 +48,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tablePath := "benchmarkTableAddRows"
strg := newTestStorage()
for i := 0; i < b.N; i++ {
tb, err := openTable(tablePath, strg)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}
tb := mustOpenTable(tablePath, strg)
workCh := make(chan struct{}, insertsCount)
for j := 0; j < insertsCount; j++ {
@ -94,10 +91,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tb.MustClose()
// Open the table from files and verify the rows count on it
tb, err = openTable(tablePath, strg)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}
tb = mustOpenTable(tablePath, strg)
var m TableMetrics
tb.UpdateMetrics(&m)
if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {