mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/fs: add MustReadDir() function
Use fs.MustReadDir() instead of os.ReadDir() across the code in order to reduce the code verbosity. The fs.MustReadDir() logs the error with the directory name and the call stack on error before exit. This information should be enough for debugging the cause of the error.
This commit is contained in:
parent
60d92894c5
commit
3727251910
14 changed files with 71 additions and 210 deletions
|
@ -4,7 +4,6 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -265,10 +264,7 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
|||
}
|
||||
|
||||
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname)
|
||||
files, err := os.ReadDir(queuesDir)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err)
|
||||
}
|
||||
files := fs.MustReadDir(queuesDir)
|
||||
removed := 0
|
||||
for _, f := range files {
|
||||
dirname := f.Name()
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"path/filepath"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -76,18 +75,11 @@ func collectDashboardsSettings(path string) ([]byte, error) {
|
|||
if !fs.IsPathExist(path) {
|
||||
return nil, fmt.Errorf("cannot find folder %q", path)
|
||||
}
|
||||
files, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read folder %q", path)
|
||||
}
|
||||
files := fs.MustReadDir(path)
|
||||
|
||||
var dss []dashboardSettings
|
||||
for _, file := range files {
|
||||
filename := file.Name()
|
||||
if err != nil {
|
||||
logger.Errorf("skipping %q at -vmui.customDashboardsPath=%q, since the info for this file cannot be obtained: %s", filename, path, err)
|
||||
continue
|
||||
}
|
||||
if filepath.Ext(filename) != ".json" {
|
||||
continue
|
||||
}
|
||||
|
|
24
lib/fs/fs.go
24
lib/fs/fs.go
|
@ -234,14 +234,20 @@ func MustRemoveDirAtomic(dir string) {
|
|||
|
||||
var atomicDirRemoveCounter = uint64(time.Now().UnixNano())
|
||||
|
||||
// MustReadDir reads directory entries at the given dir.
|
||||
func MustReadDir(dir string) []os.DirEntry {
|
||||
des, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read directory contents: %s", err)
|
||||
}
|
||||
return des
|
||||
}
|
||||
|
||||
// MustRemoveTemporaryDirs removes all the subdirectories with ".must-remove.<XYZ>" suffix.
|
||||
//
|
||||
// Such directories may be left on unclean shutdown during MustRemoveDirAtomic call.
|
||||
func MustRemoveTemporaryDirs(dir string) {
|
||||
des, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read dir: %s", err)
|
||||
}
|
||||
des := MustReadDir(dir)
|
||||
for _, de := range des {
|
||||
if !IsDirOrSymlink(de) {
|
||||
// Skip non-directories
|
||||
|
@ -260,10 +266,7 @@ func MustRemoveTemporaryDirs(dir string) {
|
|||
func MustHardLinkFiles(srcDir, dstDir string) {
|
||||
mustMkdirSync(dstDir)
|
||||
|
||||
des, err := os.ReadDir(srcDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read files in scrDir: %s", err)
|
||||
}
|
||||
des := MustReadDir(srcDir)
|
||||
for _, de := range des {
|
||||
if IsDirOrSymlink(de) {
|
||||
// Skip directories.
|
||||
|
@ -299,10 +302,7 @@ func MustSymlinkRelative(srcPath, dstPath string) {
|
|||
|
||||
// MustCopyDirectory copies all the files in srcPath to dstPath.
|
||||
func MustCopyDirectory(srcPath, dstPath string) {
|
||||
des, err := os.ReadDir(srcPath)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read srcDir: %s", err)
|
||||
}
|
||||
des := MustReadDir(srcPath)
|
||||
MustMkdirIfNotExist(dstPath)
|
||||
for _, de := range des {
|
||||
if !de.Type().IsRegular() {
|
||||
|
|
|
@ -313,7 +313,7 @@ func (pw *partWrapper) decRef() {
|
|||
}
|
||||
}
|
||||
|
||||
// OpenTable opens a table on the given path.
|
||||
// MustOpenTable opens a table on the given path.
|
||||
//
|
||||
// Optional flushCallback is called every time new data batch is flushed
|
||||
// to the underlying storage and becomes visible to search.
|
||||
|
@ -322,7 +322,7 @@ func (pw *partWrapper) decRef() {
|
|||
// to persistent storage.
|
||||
//
|
||||
// The table is created if it doesn't exist yet.
|
||||
func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *uint32) (*Table, error) {
|
||||
func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *uint32) *Table {
|
||||
path = filepath.Clean(path)
|
||||
logger.Infof("opening table %q...", path)
|
||||
startTime := time.Now()
|
||||
|
@ -334,10 +334,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
|
|||
flockF := fs.MustCreateFlockFile(path)
|
||||
|
||||
// Open table parts.
|
||||
pws, err := openParts(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open table parts at %q: %w", path, err)
|
||||
}
|
||||
pws := mustOpenParts(path)
|
||||
|
||||
tb := &Table{
|
||||
path: path,
|
||||
|
@ -382,7 +379,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
|
|||
}()
|
||||
}
|
||||
|
||||
return tb, nil
|
||||
return tb
|
||||
}
|
||||
|
||||
func (tb *Table) startBackgroundWorkers() {
|
||||
|
@ -1354,7 +1351,7 @@ func getWorkersCount() int {
|
|||
return n
|
||||
}
|
||||
|
||||
func openParts(path string) ([]*partWrapper, error) {
|
||||
func mustOpenParts(path string) []*partWrapper {
|
||||
// The path can be missing after restoring from backup, so create it if needed.
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
@ -1368,10 +1365,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
|
||||
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
|
||||
// or after the update from versions prior to v1.90.0.
|
||||
des, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read mergetree table dir: %w", err)
|
||||
}
|
||||
des := fs.MustReadDir(path)
|
||||
m := make(map[string]struct{}, len(partNames))
|
||||
for _, partName := range partNames {
|
||||
m[partName] = struct{}{}
|
||||
|
@ -1401,7 +1395,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
pws = append(pws, pw)
|
||||
}
|
||||
|
||||
return pws, nil
|
||||
return pws
|
||||
}
|
||||
|
||||
// CreateSnapshotAt creates tb snapshot in the given dstDir.
|
||||
|
@ -1496,10 +1490,7 @@ func mustReadPartNames(srcDir string) []string {
|
|||
}
|
||||
// The partsFilename is missing. This is the upgrade from versions previous to v1.90.0.
|
||||
// Read part names from directories under srcDir
|
||||
des, err := os.ReadDir(srcDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read mergeset table dir: %s", err)
|
||||
}
|
||||
des := fs.MustReadDir(srcDir)
|
||||
var partNames []string
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
|
|
|
@ -42,10 +42,7 @@ func TestTableSearchSerial(t *testing.T) {
|
|||
func() {
|
||||
// Re-open the table and verify the search works.
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
defer tb.MustClose()
|
||||
if err := testTableSearchSerial(tb, items); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
|
@ -79,10 +76,7 @@ func TestTableSearchConcurrent(t *testing.T) {
|
|||
// Re-open the table and verify the search works.
|
||||
func() {
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
defer tb.MustClose()
|
||||
if err := testTableSearchConcurrent(tb, items); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
|
@ -156,10 +150,7 @@ func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string,
|
|||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, flushCallback, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open table: %w", err)
|
||||
}
|
||||
tb := MustOpenTable(path, flushCallback, nil, &isReadOnly)
|
||||
items := make([]string, itemsCount)
|
||||
for i := 0; i < itemsCount; i++ {
|
||||
item := fmt.Sprintf("%d:%d", r.Intn(1e9), i)
|
||||
|
|
|
@ -35,10 +35,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
|
|||
// Force finishing pending merges
|
||||
tb.MustClose()
|
||||
var isReadOnly uint32
|
||||
tb, err = OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
|
||||
}
|
||||
tb = MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
defer tb.MustClose()
|
||||
|
||||
keys := make([][]byte, len(items))
|
||||
|
|
|
@ -21,20 +21,14 @@ func TestTableOpenClose(t *testing.T) {
|
|||
|
||||
// Create a new table
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new table: %s", err)
|
||||
}
|
||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
|
||||
// Close it
|
||||
tb.MustClose()
|
||||
|
||||
// Re-open created table multiple times.
|
||||
for i := 0; i < 4; i++ {
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open created table: %s", err)
|
||||
}
|
||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
tb.MustClose()
|
||||
}
|
||||
}
|
||||
|
@ -54,10 +48,7 @@ func TestTableAddItemsSerial(t *testing.T) {
|
|||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, flushCallback, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
tb := MustOpenTable(path, flushCallback, nil, &isReadOnly)
|
||||
|
||||
const itemsCount = 10e3
|
||||
testAddItemsSerial(r, tb, itemsCount)
|
||||
|
@ -80,10 +71,7 @@ func TestTableAddItemsSerial(t *testing.T) {
|
|||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
tb = MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
const moreItemsCount = itemsCount * 3
|
||||
testAddItemsSerial(r, tb, moreItemsCount)
|
||||
tb.MustClose()
|
||||
|
@ -112,10 +100,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
|||
}()
|
||||
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
defer tb.MustClose()
|
||||
|
||||
// Write a lot of items into the table, so background merges would start.
|
||||
|
@ -141,16 +126,10 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Verify snapshots contain all the data.
|
||||
tb1, err := OpenTable(snapshot1, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
tb1 := MustOpenTable(snapshot1, nil, nil, &isReadOnly)
|
||||
defer tb1.MustClose()
|
||||
|
||||
tb2, err := OpenTable(snapshot2, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
tb2 := MustOpenTable(snapshot2, nil, nil, &isReadOnly)
|
||||
defer tb2.MustClose()
|
||||
|
||||
var ts, ts1, ts2 TableSearch
|
||||
|
@ -199,10 +178,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
return data, items
|
||||
}
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, flushCallback, prepareBlock, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
tb := MustOpenTable(path, flushCallback, prepareBlock, &isReadOnly)
|
||||
|
||||
const itemsCount = 10e3
|
||||
testAddItemsConcurrent(tb, itemsCount)
|
||||
|
@ -225,10 +201,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
tb = MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
const moreItemsCount = itemsCount * 3
|
||||
testAddItemsConcurrent(tb, moreItemsCount)
|
||||
tb.MustClose()
|
||||
|
@ -267,10 +240,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
|
|||
|
||||
for i := 0; i < 10; i++ {
|
||||
var isReadOnly uint32
|
||||
tb, err := OpenTable(path, nil, nil, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot re-open %q: %s", path, err)
|
||||
}
|
||||
tb := MustOpenTable(path, nil, nil, &isReadOnly)
|
||||
var m TableMetrics
|
||||
tb.UpdateMetrics(&m)
|
||||
if n := m.TotalItemsCount(); n != uint64(itemsCount) {
|
||||
|
|
|
@ -203,10 +203,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
}
|
||||
|
||||
// Locate reader and writer chunks in the path.
|
||||
des, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read contents of the directory %q: %w", path, err)
|
||||
}
|
||||
des := fs.MustReadDir(path)
|
||||
for _, de := range des {
|
||||
fname := de.Name()
|
||||
filepath := filepath.Join(path, fname)
|
||||
|
|
|
@ -125,14 +125,14 @@ func getTagFiltersCacheSize() int {
|
|||
return maxTagFiltersCacheSize
|
||||
}
|
||||
|
||||
// openIndexDB opens index db from the given path.
|
||||
// mustOpenIndexDB opens index db from the given path.
|
||||
//
|
||||
// The last segment of the path should contain unique hex value which
|
||||
// will be then used as indexDB.generation
|
||||
//
|
||||
// The rotationTimestamp must be set to the current unix timestamp when openIndexDB
|
||||
// The rotationTimestamp must be set to the current unix timestamp when mustOpenIndexDB
|
||||
// is called when creating new indexdb during indexdb rotation.
|
||||
func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) (*indexDB, error) {
|
||||
func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) *indexDB {
|
||||
if s == nil {
|
||||
logger.Panicf("BUG: Storage must be nin-nil")
|
||||
}
|
||||
|
@ -140,13 +140,10 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
|
|||
name := filepath.Base(path)
|
||||
gen, err := strconv.ParseUint(name, 16, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err)
|
||||
logger.Panicf("FATAL: cannot parse indexdb path %q: %s", path, err)
|
||||
}
|
||||
|
||||
tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
|
||||
}
|
||||
tb := mergeset.MustOpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
|
||||
|
||||
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile.
|
||||
mem := memory.Allowed()
|
||||
|
@ -163,7 +160,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
|
|||
s: s,
|
||||
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
|
||||
}
|
||||
return db, nil
|
||||
return db
|
||||
}
|
||||
|
||||
const noDeadline = 1<<64 - 1
|
||||
|
@ -599,7 +596,7 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b
|
|||
is.createPerDayIndexes(date, dst.MetricID, mn)
|
||||
|
||||
// There is no need in invalidating tag cache, since it is invalidated
|
||||
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to OpenTable.
|
||||
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable.
|
||||
|
||||
if created {
|
||||
// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
|
||||
|
|
|
@ -496,10 +496,7 @@ func TestIndexDBOpenClose(t *testing.T) {
|
|||
tableName := nextIndexDBTableName()
|
||||
for i := 0; i < 5; i++ {
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(tableName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db := mustOpenIndexDB(tableName, s, 0, &isReadOnly)
|
||||
db.MustClose()
|
||||
}
|
||||
if err := os.RemoveAll(tableName); err != nil {
|
||||
|
@ -516,10 +513,7 @@ func TestIndexDB(t *testing.T) {
|
|||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
|
@ -537,10 +531,7 @@ func TestIndexDB(t *testing.T) {
|
|||
|
||||
// Re-open the db and verify it works as expected.
|
||||
db.MustClose()
|
||||
db, err = openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db = mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err := testIndexDBCheckTSIDByName(db, mns, tsids, false); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
@ -552,10 +543,7 @@ func TestIndexDB(t *testing.T) {
|
|||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
|
@ -1562,10 +1550,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
|
|
|
@ -45,10 +45,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
|||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
|
@ -109,10 +106,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
|
@ -289,10 +283,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
|||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db, err := openIndexDB(dbName, s, 0, &isReadOnly)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
|
|
|
@ -262,15 +262,8 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
|
|||
|
||||
partNamesSmall, partNamesBig := mustReadPartNames(smallPartsPath, bigPartsPath)
|
||||
|
||||
smallParts, err := openParts(smallPartsPath, partNamesSmall)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open small parts from %q: %w", smallPartsPath, err)
|
||||
}
|
||||
bigParts, err := openParts(bigPartsPath, partNamesBig)
|
||||
if err != nil {
|
||||
mustCloseParts(smallParts)
|
||||
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
|
||||
}
|
||||
smallParts := mustOpenParts(smallPartsPath, partNamesSmall)
|
||||
bigParts := mustOpenParts(bigPartsPath, partNamesBig)
|
||||
|
||||
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
|
||||
pt.smallParts = smallParts
|
||||
|
@ -1770,7 +1763,7 @@ func getPartsSize(pws []*partWrapper) uint64 {
|
|||
return n
|
||||
}
|
||||
|
||||
func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
||||
func mustOpenParts(path string, partNames []string) []*partWrapper {
|
||||
// The path can be missing after restoring from backup, so create it if needed.
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
@ -1782,10 +1775,7 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
|||
|
||||
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
|
||||
// or after the update from versions prior to v1.90.0.
|
||||
des, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read partition dir: %w", err)
|
||||
}
|
||||
des := fs.MustReadDir(path)
|
||||
m := make(map[string]struct{}, len(partNames))
|
||||
for _, partName := range partNames {
|
||||
m[partName] = struct{}{}
|
||||
|
@ -1815,16 +1805,7 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
|||
pws = append(pws, pw)
|
||||
}
|
||||
|
||||
return pws, nil
|
||||
}
|
||||
|
||||
func mustCloseParts(pws []*partWrapper) {
|
||||
for _, pw := range pws {
|
||||
if pw.refCount != 1 {
|
||||
logger.Panicf("BUG: unexpected refCount when closing part %q: %d; want 1", &pw.p.ph, pw.refCount)
|
||||
}
|
||||
pw.p.MustClose()
|
||||
}
|
||||
return pws
|
||||
}
|
||||
|
||||
// MustCreateSnapshotAt creates pt snapshot at the given smallPath and bigPath dirs.
|
||||
|
@ -1941,13 +1922,10 @@ func mustReadPartNames(smallPartsPath, bigPartsPath string) ([]string, []string)
|
|||
}
|
||||
|
||||
func mustReadPartNamesFromDir(srcDir string) []string {
|
||||
des, err := os.ReadDir(srcDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
logger.Panicf("FATAL: cannot read partition dir: %s", err)
|
||||
if !fs.IsPathExist(srcDir) {
|
||||
return nil
|
||||
}
|
||||
des := fs.MustReadDir(srcDir)
|
||||
var partNames []string
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
|
|
|
@ -220,10 +220,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname)
|
||||
fs.MustMkdirIfNotExist(idbSnapshotsPath)
|
||||
fs.MustRemoveTemporaryDirs(idbSnapshotsPath)
|
||||
idbCurr, idbPrev, err := s.openIndexDBTables(idbPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
|
||||
}
|
||||
idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath)
|
||||
idbCurr.SetExtDB(idbPrev)
|
||||
s.idbCurr.Store(idbCurr)
|
||||
|
||||
|
@ -707,10 +704,7 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
newTableName := nextIndexDBTableName()
|
||||
idbNewPath := filepath.Join(s.path, indexdbDirname, newTableName)
|
||||
rotationTimestamp := fasttime.UnixTimestamp()
|
||||
idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
|
||||
}
|
||||
idbNew := mustOpenIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly)
|
||||
|
||||
// Drop extDB
|
||||
idbCurr := s.idb()
|
||||
|
@ -2320,16 +2314,13 @@ func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) {
|
|||
s.tsidCache.Set(metricName, buf)
|
||||
}
|
||||
|
||||
func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) {
|
||||
func (s *Storage) mustOpenIndexDBTables(path string) (curr, prev *indexDB) {
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
||||
// Search for the two most recent tables - the last one is active,
|
||||
// the previous one contains backup data.
|
||||
des, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot read directory: %w", err)
|
||||
}
|
||||
des := fs.MustReadDir(path)
|
||||
var tableNames []string
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
|
@ -2372,18 +2363,11 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
|
|||
// Open the last two tables.
|
||||
currPath := filepath.Join(path, tableNames[len(tableNames)-1])
|
||||
|
||||
curr, err = openIndexDB(currPath, s, 0, &s.isReadOnly)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
|
||||
}
|
||||
curr = mustOpenIndexDB(currPath, s, 0, &s.isReadOnly)
|
||||
prevPath := filepath.Join(path, tableNames[len(tableNames)-2])
|
||||
prev, err = openIndexDB(prevPath, s, 0, &s.isReadOnly)
|
||||
if err != nil {
|
||||
curr.MustClose()
|
||||
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
||||
}
|
||||
prev = mustOpenIndexDB(prevPath, s, 0, &s.isReadOnly)
|
||||
|
||||
return curr, prev, nil
|
||||
return curr, prev
|
||||
}
|
||||
|
||||
var indexDBTableNameRegexp = regexp.MustCompile("^[0-9A-F]{16}$")
|
||||
|
|
|
@ -486,12 +486,8 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) (
|
|||
// Certain partition directories in either `big` or `small` dir may be missing
|
||||
// after restoring from backup. So populate partition names from both dirs.
|
||||
ptNames := make(map[string]bool)
|
||||
if err := populatePartitionNames(smallPartitionsPath, ptNames); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := populatePartitionNames(bigPartitionsPath, ptNames); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mustPopulatePartitionNames(smallPartitionsPath, ptNames)
|
||||
mustPopulatePartitionNames(bigPartitionsPath, ptNames)
|
||||
var pts []*partition
|
||||
for ptName := range ptNames {
|
||||
smallPartsPath := filepath.Join(smallPartitionsPath, ptName)
|
||||
|
@ -506,11 +502,8 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) (
|
|||
return pts, nil
|
||||
}
|
||||
|
||||
func populatePartitionNames(partitionsPath string, ptNames map[string]bool) error {
|
||||
des, err := os.ReadDir(partitionsPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read directory with partitions: %w", err)
|
||||
}
|
||||
func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool) {
|
||||
des := fs.MustReadDir(partitionsPath)
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
// Skip non-directories
|
||||
|
@ -523,7 +516,6 @@ func populatePartitionNames(partitionsPath string, ptNames map[string]bool) erro
|
|||
}
|
||||
ptNames[ptName] = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mustClosePartitions(pts []*partition) {
|
||||
|
|
Loading…
Reference in a new issue