diff --git a/lib/fs/dir_remover.go b/lib/fs/dir_remover.go index 15a9daab8..4f9de27b5 100644 --- a/lib/fs/dir_remover.go +++ b/lib/fs/dir_remover.go @@ -11,12 +11,13 @@ import ( "github.com/VictoriaMetrics/metrics" ) -func mustRemoveAll(path string) bool { +func mustRemoveAll(path string, done func()) bool { err := os.RemoveAll(path) if err == nil { // Make sure the parent directory doesn't contain references // to the current directory. mustSyncParentDirIfExists(path) + done() return true } if !isTemporaryNFSError(err) { @@ -26,8 +27,12 @@ func mustRemoveAll(path string) bool { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . // Schedule for later directory removal. nfsDirRemoveFailedAttempts.Inc() + w := &removeDirWork{ + path: path, + done: done, + } select { - case removeDirCh <- path: + case removeDirCh <- w: default: logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh)) } @@ -36,16 +41,21 @@ func mustRemoveAll(path string) bool { var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`) -var removeDirCh = make(chan string, 1024) +type removeDirWork struct { + path string + done func() +} + +var removeDirCh = make(chan *removeDirWork, 1024) func dirRemover() { const minSleepTime = 100 * time.Millisecond const maxSleepTime = time.Second sleepTime := minSleepTime for { - var path string + var w *removeDirWork select { - case path = <-removeDirCh: + case w = <-removeDirCh: default: if atomic.LoadUint64(&stopDirRemover) != 0 { return @@ -53,7 +63,7 @@ func dirRemover() { time.Sleep(minSleepTime) continue } - if mustRemoveAll(path) { + if mustRemoveAll(w.path, w.done) { sleepTime = minSleepTime continue } @@ -67,7 +77,7 @@ func dirRemover() { if sleepTime < maxSleepTime { sleepTime *= 2 } else { - logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path) + logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", w.path) } } } diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 4b0cd3619..69cf96c97 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -174,7 +174,7 @@ func mkdirSync(path string) error { return nil } -// RemoveDirContents removes all the contents of the given dir it it exists. +// RemoveDirContents removes all the contents of the given dir if it exists. // // It doesn't remove the dir itself, so the dir may be mounted // to a separate partition. @@ -246,7 +246,17 @@ func mustSyncParentDirIfExists(path string) { // // It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . func MustRemoveAll(path string) { - _ = mustRemoveAll(path) + _ = mustRemoveAll(path, func() {}) +} + +// MustRemoveAllWithDoneCallback removes path with all the contents. +// +// done is called after the path is successfully removed. +// +// done may be called after the function returns for NFS path. +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61. +func MustRemoveAllWithDoneCallback(path string, done func()) { + _ = mustRemoveAll(path, done) } // HardLinkFiles makes hard links for all the files from srcDir in dstDir. diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 8eddea043..10ffdd22d 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1121,12 +1121,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { } // Remove old paths. It is OK if certain paths don't exist. + var removeWG sync.WaitGroup for _, path := range rmPaths { path, err := validatePath(pathPrefix, path) if err != nil { return fmt.Errorf("invalid path to remove: %s", err) } - fs.MustRemoveAll(path) + removeWG.Add(1) + fs.MustRemoveAllWithDoneCallback(path, removeWG.Done) } // Move the new part to new directory. @@ -1154,10 +1156,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { // Flush pathPrefix directory metadata to the underying storage. fs.MustSyncPath(pathPrefix) - // Remove the transaction file. - if err := os.Remove(txnPath); err != nil { - return fmt.Errorf("cannot remove transaction file %q: %s", txnPath, err) - } + go func() { + // Remove the transaction file only after all the source paths are deleted. + // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . + removeWG.Wait() + if err := os.Remove(txnPath); err != nil { + logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) + } + }() return nil } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index d03ecc59b..c6b28c662 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1452,12 +1452,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str } // Remove old paths. It is OK if certain paths don't exist. + var removeWG sync.WaitGroup for _, path := range rmPaths { path, err := validatePath(pathPrefix1, pathPrefix2, path) if err != nil { return fmt.Errorf("invalid path to remove: %s", err) } - fs.MustRemoveAll(path) + removeWG.Add(1) + fs.MustRemoveAllWithDoneCallback(path, removeWG.Done) } // Move the new part to new directory. @@ -1492,10 +1494,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str fs.MustSyncPath(pathPrefix1) fs.MustSyncPath(pathPrefix2) - // Remove the transaction file. - if err := os.Remove(txnPath); err != nil { - return fmt.Errorf("cannot remove transaction file: %s", err) - } + go func() { + // Remove the transaction file only after all the source paths are deleted. + // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . + removeWG.Wait() + if err := os.Remove(txnPath); err != nil { + logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) + } + }() return nil }