diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 5ee2cc1e6..d7af8a17a 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1394,17 +1394,14 @@ func (pt *partition) removeStaleParts() { } // Physically remove stale parts under snapshotLock in order to provide - // consistent snapshots with partition.CreateSnapshot(). + // consistent snapshots with table.CreateSnapshot(). pt.snapshotLock.RLock() - var removeWG sync.WaitGroup for pw := range m { logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.retentionMsecs/1000) - removeWG.Add(1) - fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done) + fs.MustRemoveDirAtomic(pw.p.path) } - removeWG.Wait() // There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath, - // since they should be automatically called inside fs.MustRemoveAllWithDoneCallback. + // since they should be automatically called inside fs.MustRemoveDirAtomic(). pt.snapshotLock.RUnlock() @@ -1554,6 +1551,7 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, err } + fs.MustRemoveTemporaryDirs(path) d, err := os.Open(path) if err != nil { return nil, fmt.Errorf("cannot open directory %q: %w", path, err) @@ -1568,9 +1566,9 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { } txnDir := path + "/txn" - fs.MustRemoveAll(txnDir) + fs.MustRemoveDirAtomic(txnDir) tmpDir := path + "/tmp" - fs.MustRemoveAll(tmpDir) + fs.MustRemoveDirAtomic(tmpDir) if err := createPartitionDirs(path); err != nil { return nil, fmt.Errorf("cannot create directories for partition %q: %w", path, err) } @@ -1596,7 +1594,7 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { if fs.IsEmptyDir(partPath) { // Remove empty directory, which can be left after unclean shutdown on NFS. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142 - fs.MustRemoveAll(partPath) + fs.MustRemoveDirAtomic(partPath) continue } startTime := time.Now() @@ -1761,14 +1759,12 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str } // Remove old paths. It is OK if certain paths don't exist. - var removeWG sync.WaitGroup for _, path := range rmPaths { path, err := validatePath(pathPrefix1, pathPrefix2, path) if err != nil { return fmt.Errorf("invalid path to remove: %w", err) } - removeWG.Add(1) - fs.MustRemoveAllWithDoneCallback(path, removeWG.Done) + fs.MustRemoveDirAtomic(path) } // Move the new part to new directory. @@ -1797,8 +1793,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str } } else { // Just remove srcPath. - removeWG.Add(1) - fs.MustRemoveAllWithDoneCallback(srcPath, removeWG.Done) + fs.MustRemoveDirAtomic(srcPath) } // Flush pathPrefix* directory metadata to the underying storage, @@ -1809,12 +1804,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str pendingTxnDeletionsWG.Add(1) go func() { defer pendingTxnDeletionsWG.Done() - // Remove the transaction file only after all the source paths are deleted. - // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . - removeWG.Wait() // There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal, - // since it is already called by fs.MustRemoveAllWithDoneCallback. + // since it is already called by fs.MustRemoveDirAtomic. if err := os.Remove(txnPath); err != nil { logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)