From 5b488a339d7c81cf33473235a8ab85420c5a10c7 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Tue, 13 Sep 2022 15:56:05 +0300
Subject: [PATCH] lib/mergeset: atomically remove part dirs

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038
---
 lib/fs/dir_remover.go               |  9 ++++++---
 lib/fs/fs.go                        | 21 ---------------------
 lib/mergeset/block_stream_writer.go |  8 ++++----
 lib/mergeset/table.go               | 14 +++++---------
 4 files changed, 15 insertions(+), 37 deletions(-)

diff --git a/lib/fs/dir_remover.go b/lib/fs/dir_remover.go
index ba3e8d52ef..79b3e429d9 100644
--- a/lib/fs/dir_remover.go
+++ b/lib/fs/dir_remover.go
@@ -10,9 +10,13 @@ import (
 	"github.com/VictoriaMetrics/metrics"
 )
 
-func mustRemoveAll(path string, done func()) {
+// MustRemoveAll removes path with all the contents.
+//
+// It properly fsyncs the parent directory after path removal.
+//
+// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
+func MustRemoveAll(path string) {
 	if tryRemoveAll(path) {
-		done()
 		return
 	}
 	select {
@@ -29,7 +33,6 @@ func mustRemoveAll(path string, done func()) {
 		for {
 			time.Sleep(time.Second)
 			if tryRemoveAll(path) {
-				done()
 				return
 			}
 		}
diff --git a/lib/fs/fs.go b/lib/fs/fs.go
index 702cb0c9ec..10b9e0cc97 100644
--- a/lib/fs/fs.go
+++ b/lib/fs/fs.go
@@ -248,27 +248,6 @@ func MustRemoveTemporaryDirs(dir string) {
 	MustSyncPath(dir)
 }
 
-// MustRemoveAll removes path with all the contents.
-//
-// It properly fsyncs the parent directory after path removal.
-//
-// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
-func MustRemoveAll(path string) {
-	mustRemoveAll(path, func() {})
-}
-
-// MustRemoveAllWithDoneCallback removes path with all the contents.
-//
-// It properly fsyncs the parent directory after path removal.
-//
-// done is called after the path is successfully removed.
-//
-// done may be called after the function returns for NFS path.
-// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61.
-func MustRemoveAllWithDoneCallback(path string, done func()) {
-	mustRemoveAll(path, done)
-}
-
 // HardLinkFiles makes hard links for all the files from srcDir in dstDir.
 func HardLinkFiles(srcDir, dstDir string) error {
 	if err := mkdirSync(dstDir); err != nil {
diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go
index 46bac85a65..b25e473257 100644
--- a/lib/mergeset/block_stream_writer.go
+++ b/lib/mergeset/block_stream_writer.go
@@ -94,7 +94,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
 	metaindexPath := path + "/metaindex.bin"
 	metaindexFile, err := filestream.Create(metaindexPath, false)
 	if err != nil {
-		fs.MustRemoveAll(path)
+		fs.MustRemoveDirAtomic(path)
 		return fmt.Errorf("cannot create metaindex file: %w", err)
 	}
 
@@ -102,7 +102,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
 	indexFile, err := filestream.Create(indexPath, nocache)
 	if err != nil {
 		metaindexFile.MustClose()
-		fs.MustRemoveAll(path)
+		fs.MustRemoveDirAtomic(path)
 		return fmt.Errorf("cannot create index file: %w", err)
 	}
 
@@ -111,7 +111,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
 	if err != nil {
 		metaindexFile.MustClose()
 		indexFile.MustClose()
-		fs.MustRemoveAll(path)
+		fs.MustRemoveDirAtomic(path)
 		return fmt.Errorf("cannot create items file: %w", err)
 	}
 
@@ -121,7 +121,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
 		metaindexFile.MustClose()
 		indexFile.MustClose()
 		itemsFile.MustClose()
-		fs.MustRemoveAll(path)
+		fs.MustRemoveDirAtomic(path)
 		return fmt.Errorf("cannot create lens file: %w", err)
 	}
 
diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go
index e614b7ad26..3b3ceb8f92 100644
--- a/lib/mergeset/table.go
+++ b/lib/mergeset/table.go
@@ -1059,6 +1059,7 @@ func openParts(path string) ([]*partWrapper, error) {
 	if err := fs.MkdirAllIfNotExist(path); err != nil {
 		return nil, err
 	}
+	fs.MustRemoveTemporaryDirs(path)
 	d, err := os.Open(path)
 	if err != nil {
 		return nil, fmt.Errorf("cannot open difrectory: %w", err)
@@ -1073,13 +1074,13 @@ func openParts(path string) ([]*partWrapper, error) {
 	}
 
 	txnDir := path + "/txn"
-	fs.MustRemoveAll(txnDir)
+	fs.MustRemoveDirAtomic(txnDir)
 	if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
 		return nil, fmt.Errorf("cannot create %q: %w", txnDir, err)
 	}
 
 	tmpDir := path + "/tmp"
-	fs.MustRemoveAll(tmpDir)
+	fs.MustRemoveDirAtomic(tmpDir)
 	if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
 		return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err)
 	}
@@ -1106,7 +1107,7 @@ func openParts(path string) ([]*partWrapper, error) {
 		if fs.IsEmptyDir(partPath) {
 			// Remove empty directory, which can be left after unclean shutdown on NFS.
 			// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
-			fs.MustRemoveAll(partPath)
+			fs.MustRemoveDirAtomic(partPath)
 			continue
 		}
 		p, err := openFilePart(partPath)
@@ -1277,14 +1278,12 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
 	}
 
 	// Remove old paths. It is OK if certain paths don't exist.
-	var removeWG sync.WaitGroup
 	for _, path := range rmPaths {
 		path, err := validatePath(pathPrefix, path)
 		if err != nil {
 			return fmt.Errorf("invalid path to remove: %w", err)
 		}
-		removeWG.Add(1)
-		fs.MustRemoveAllWithDoneCallback(path, removeWG.Done)
+		fs.MustRemoveDirAtomic(path)
 	}
 
 	// Move the new part to new directory.
@@ -1316,9 +1315,6 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
 	pendingTxnDeletionsWG.Add(1)
 	go func() {
 		defer pendingTxnDeletionsWG.Done()
-		// Remove the transaction file only after all the source paths are deleted.
-		// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
-		removeWG.Wait()
 		if err := os.Remove(txnPath); err != nil {
 			logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
 		}