mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/{fs,mergeset,storage}: substitute os.Open()+os.File.Readdir() with os.ReadDir()
This simplifies code a bit
This commit is contained in:
parent
8fdd613f25
commit
a26c6628fd
5 changed files with 51 additions and 112 deletions
41
lib/fs/fs.go
41
lib/fs/fs.go
|
@ -247,21 +247,16 @@ var atomicDirRemoveCounter = uint64(time.Now().UnixNano())
|
|||
//
|
||||
// Such directories may be left on unclean shutdown during MustRemoveDirAtomic call.
|
||||
func MustRemoveTemporaryDirs(dir string) {
|
||||
d, err := os.Open(dir)
|
||||
des, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open dir: %s", err)
|
||||
logger.Panicf("FATAL: cannot read dir: %s", err)
|
||||
}
|
||||
defer MustClose(d)
|
||||
fis, err := d.Readdir(-1)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read dir %q: %s", dir, err)
|
||||
}
|
||||
for _, fi := range fis {
|
||||
if !IsDirOrSymlink(fi) {
|
||||
for _, de := range des {
|
||||
if !IsDirOrSymlink(de) {
|
||||
// Skip non-directories
|
||||
continue
|
||||
}
|
||||
dirName := fi.Name()
|
||||
dirName := de.Name()
|
||||
if IsScheduledForRemoval(dirName) {
|
||||
fullPath := dir + "/" + dirName
|
||||
MustRemoveAll(fullPath)
|
||||
|
@ -276,26 +271,16 @@ func HardLinkFiles(srcDir, dstDir string) error {
|
|||
return fmt.Errorf("cannot create dstDir=%q: %w", dstDir, err)
|
||||
}
|
||||
|
||||
d, err := os.Open(srcDir)
|
||||
des, err := os.ReadDir(srcDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open srcDir: %w", err)
|
||||
return fmt.Errorf("cannot read files in scrDir: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := d.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close %q: %s", srcDir, err)
|
||||
}
|
||||
}()
|
||||
|
||||
fis, err := d.Readdir(-1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read files in scrDir=%q: %w", srcDir, err)
|
||||
}
|
||||
for _, fi := range fis {
|
||||
if IsDirOrSymlink(fi) {
|
||||
for _, de := range des {
|
||||
if IsDirOrSymlink(de) {
|
||||
// Skip directories.
|
||||
continue
|
||||
}
|
||||
fn := fi.Name()
|
||||
fn := de.Name()
|
||||
srcPath := srcDir + "/" + fn
|
||||
dstPath := dstDir + "/" + fn
|
||||
if err := os.Link(srcPath, dstPath); err != nil {
|
||||
|
@ -307,9 +292,9 @@ func HardLinkFiles(srcDir, dstDir string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// IsDirOrSymlink returns true if fi is directory or symlink.
|
||||
func IsDirOrSymlink(fi os.FileInfo) bool {
|
||||
return fi.IsDir() || (fi.Mode()&os.ModeSymlink == os.ModeSymlink)
|
||||
// IsDirOrSymlink returns true if de is directory or symlink.
|
||||
func IsDirOrSymlink(de os.DirEntry) bool {
|
||||
return de.IsDir() || (de.Type()&os.ModeSymlink == os.ModeSymlink)
|
||||
}
|
||||
|
||||
// SymlinkRelative creates relative symlink for srcPath in dstPath.
|
||||
|
|
|
@ -1426,11 +1426,6 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
return nil, err
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open difrectory: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
// Run remaining transactions and cleanup /txn and /tmp directories.
|
||||
// Snapshots cannot be created yet, so use fakeSnapshotLock.
|
||||
|
@ -1454,17 +1449,17 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
fs.MustSyncPath(path)
|
||||
|
||||
// Open parts.
|
||||
fis, err := d.Readdir(-1)
|
||||
des, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read directory: %w", err)
|
||||
}
|
||||
var pws []*partWrapper
|
||||
for _, fi := range fis {
|
||||
if !fs.IsDirOrSymlink(fi) {
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
// Skip non-directories.
|
||||
continue
|
||||
}
|
||||
fn := fi.Name()
|
||||
fn := de.Name()
|
||||
if isSpecialDir(fn) {
|
||||
// Skip special dirs.
|
||||
continue
|
||||
|
@ -1538,24 +1533,18 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error {
|
|||
return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err)
|
||||
}
|
||||
|
||||
d, err := os.Open(srcDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open difrectory: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
fis, err := d.Readdir(-1)
|
||||
des, err := os.ReadDir(srcDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read directory: %w", err)
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
for _, de := range des {
|
||||
if deadline > 0 && fasttime.UnixTimestamp() > deadline {
|
||||
return fmt.Errorf("cannot create snapshot for %q: timeout exceeded", tb.path)
|
||||
}
|
||||
|
||||
fn := fi.Name()
|
||||
if !fs.IsDirOrSymlink(fi) {
|
||||
fn := de.Name()
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
// Skip non-directories.
|
||||
continue
|
||||
}
|
||||
|
@ -1586,27 +1575,21 @@ func runTransactions(txnLock *sync.RWMutex, path string) error {
|
|||
defer pendingTxnDeletionsWG.Wait()
|
||||
|
||||
txnDir := path + "/txn"
|
||||
d, err := os.Open(txnDir)
|
||||
des, err := os.ReadDir(txnDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot open transaction dir: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
fis, err := d.Readdir(-1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read directory %q: %w", d.Name(), err)
|
||||
return fmt.Errorf("cannot read transaction dir: %w", err)
|
||||
}
|
||||
|
||||
// Sort transaction files by id, since transactions must be ordered.
|
||||
sort.Slice(fis, func(i, j int) bool {
|
||||
return fis[i].Name() < fis[j].Name()
|
||||
sort.Slice(des, func(i, j int) bool {
|
||||
return des[i].Name() < des[j].Name()
|
||||
})
|
||||
|
||||
for _, fi := range fis {
|
||||
fn := fi.Name()
|
||||
for _, de := range des {
|
||||
fn := de.Name()
|
||||
if fs.IsTemporaryFileName(fn) {
|
||||
// Skip temporary files, which could be left after unclean shutdown.
|
||||
continue
|
||||
|
|
|
@ -1889,11 +1889,6 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
|
|||
return nil, err
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open partition directory: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
// Run remaining transactions and cleanup /txn and /tmp directories.
|
||||
// Snapshots cannot be created yet, so use fakeSnapshotLock.
|
||||
|
@ -1911,17 +1906,17 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
|
|||
}
|
||||
|
||||
// Open parts.
|
||||
fis, err := d.Readdir(-1)
|
||||
des, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read directory %q: %w", d.Name(), err)
|
||||
return nil, fmt.Errorf("cannot read partition directory: %w", err)
|
||||
}
|
||||
var pws []*partWrapper
|
||||
for _, fi := range fis {
|
||||
if !fs.IsDirOrSymlink(fi) {
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
// Skip non-directories.
|
||||
continue
|
||||
}
|
||||
fn := fi.Name()
|
||||
fn := de.Name()
|
||||
if fn == "snapshots" {
|
||||
// "snapshots" dir is skipped for backwards compatibility. Now it is unused.
|
||||
continue
|
||||
|
@ -2000,19 +1995,13 @@ func (pt *partition) createSnapshot(srcDir, dstDir string) error {
|
|||
return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err)
|
||||
}
|
||||
|
||||
d, err := os.Open(srcDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open partition difrectory: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
fis, err := d.Readdir(-1)
|
||||
des, err := os.ReadDir(srcDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read partition directory: %w", err)
|
||||
}
|
||||
for _, fi := range fis {
|
||||
fn := fi.Name()
|
||||
if !fs.IsDirOrSymlink(fi) {
|
||||
for _, de := range des {
|
||||
fn := de.Name()
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
if fn == "appliedRetention.txt" {
|
||||
// Copy the appliedRetention.txt file to dstDir.
|
||||
// This file can be created by VictoriaMetrics enterprise.
|
||||
|
@ -2052,27 +2041,21 @@ func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path strin
|
|||
defer pendingTxnDeletionsWG.Wait()
|
||||
|
||||
txnDir := path + "/txn"
|
||||
d, err := os.Open(txnDir)
|
||||
des, err := os.ReadDir(txnDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot open transaction directory: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
fis, err := d.Readdir(-1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read directory %q: %w", d.Name(), err)
|
||||
return fmt.Errorf("cannot read transaction directory: %w", err)
|
||||
}
|
||||
|
||||
// Sort transaction files by id.
|
||||
sort.Slice(fis, func(i, j int) bool {
|
||||
return fis[i].Name() < fis[j].Name()
|
||||
sort.Slice(des, func(i, j int) bool {
|
||||
return des[i].Name() < des[j].Name()
|
||||
})
|
||||
|
||||
for _, fi := range fis {
|
||||
fn := fi.Name()
|
||||
for _, de := range des {
|
||||
fn := de.Name()
|
||||
if fs.IsTemporaryFileName(fn) {
|
||||
// Skip temporary files, which could be left after unclean shutdown.
|
||||
continue
|
||||
|
|
|
@ -2339,25 +2339,19 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
|
|||
}
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open directory: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
// Search for the two most recent tables - the last one is active,
|
||||
// the previous one contains backup data.
|
||||
fis, err := d.Readdir(-1)
|
||||
des, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot read directory: %w", err)
|
||||
}
|
||||
var tableNames []string
|
||||
for _, fi := range fis {
|
||||
if !fs.IsDirOrSymlink(fi) {
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
// Skip non-directories.
|
||||
continue
|
||||
}
|
||||
tableName := fi.Name()
|
||||
tableName := de.Name()
|
||||
if !indexDBTableNameRegexp.MatchString(tableName) {
|
||||
// Skip invalid directories.
|
||||
continue
|
||||
|
|
|
@ -535,22 +535,16 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) (
|
|||
}
|
||||
|
||||
func populatePartitionNames(partitionsPath string, ptNames map[string]bool) error {
|
||||
d, err := os.Open(partitionsPath)
|
||||
des, err := os.ReadDir(partitionsPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open directory with partitions: %w", err)
|
||||
return fmt.Errorf("cannot read directory with partitions: %w", err)
|
||||
}
|
||||
defer fs.MustClose(d)
|
||||
|
||||
fis, err := d.Readdir(-1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read directory with partitions %q: %w", partitionsPath, err)
|
||||
}
|
||||
for _, fi := range fis {
|
||||
if !fs.IsDirOrSymlink(fi) {
|
||||
for _, de := range des {
|
||||
if !fs.IsDirOrSymlink(de) {
|
||||
// Skip non-directories
|
||||
continue
|
||||
}
|
||||
ptName := fi.Name()
|
||||
ptName := de.Name()
|
||||
if ptName == "snapshots" {
|
||||
// Skip directory with snapshots
|
||||
continue
|
||||
|
|
Loading…
Reference in a new issue