diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 10ffdd22dd..edbefa7645 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1062,6 +1062,12 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error { } func runTransactions(txnLock *sync.RWMutex, path string) error { + // Wait until all the previous pending transaction deletions are finished. + pendingTxnDeletionsWG.Wait() + + // Make sure all the current transaction deletions are finished before exiting. + defer pendingTxnDeletionsWG.Wait() + txnDir := path + "/txn" d, err := os.Open(txnDir) if err != nil { @@ -1156,7 +1162,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { // Flush pathPrefix directory metadata to the underying storage. fs.MustSyncPath(pathPrefix) + pendingTxnDeletionsWG.Add(1) go func() { + defer pendingTxnDeletionsWG.Done() // Remove the transaction file only after all the source paths are deleted. // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . removeWG.Wait() @@ -1168,6 +1176,8 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { return nil } +var pendingTxnDeletionsWG syncwg.WaitGroup + func validatePath(pathPrefix, path string) (string, error) { var err error diff --git a/lib/storage/partition.go b/lib/storage/partition.go index c6b28c6625..39ab6af08e 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -19,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -1393,6 +1394,12 @@ func (pt *partition) createSnapshot(srcDir, dstDir string) error { } func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path string) error { + // Wait until all the previous pending transaction deletions are finished. + pendingTxnDeletionsWG.Wait() + + // Make sure all the current transaction deletions are finished before exiting. + defer pendingTxnDeletionsWG.Wait() + txnDir := path + "/txn" d, err := os.Open(txnDir) if err != nil { @@ -1494,7 +1501,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str fs.MustSyncPath(pathPrefix1) fs.MustSyncPath(pathPrefix2) + pendingTxnDeletionsWG.Add(1) go func() { + defer pendingTxnDeletionsWG.Done() // Remove the transaction file only after all the source paths are deleted. // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . removeWG.Wait() @@ -1506,6 +1515,8 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str return nil } +var pendingTxnDeletionsWG syncwg.WaitGroup + func validatePath(pathPrefix1, pathPrefix2, path string) (string, error) { var err error