diff --git a/lib/fs/fs.go b/lib/fs/fs.go index fb05fa876..9e0cefb13 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -186,6 +186,8 @@ func mustSyncParentDirIfExists(path string) { // MustRemoveAll removes path with all the contents. // +// It properly fsyncs the parent directory after path removal. +// // It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . func MustRemoveAll(path string) { _ = mustRemoveAll(path, func() {}) @@ -193,6 +195,8 @@ func MustRemoveAll(path string) { // MustRemoveAllWithDoneCallback removes path with all the contents. // +// It properly fsyncs the parent directory after path removal. +// // done is called after the path is successfully removed. // // done may be called after the function returns for NFS path. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 7ed6ca89f..472c532c8 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -240,6 +240,9 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str // The pt must be detached from table before calling pt.Drop. func (pt *partition) Drop() { logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath) + // Wait until all the pending transaction deletions are finished before removing partition directories. + pendingTxnDeletionsWG.Wait() + fs.MustRemoveAll(pt.smallPartsPath) fs.MustRemoveAll(pt.bigPartsPath) logger.Infof("partition %q has been dropped", pt.name) @@ -643,6 +646,9 @@ func (pt *partition) PutParts(pws []*partWrapper) { func (pt *partition) MustClose() { close(pt.stopCh) + // Wait until all the pending transaction deletions are finished. + pendingTxnDeletionsWG.Wait() + logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath) startTime := time.Now() pt.stalePartsRemoverWG.Wait() @@ -1352,13 +1358,14 @@ func (pt *partition) removeStaleParts() { pt.snapshotLock.RLock() var removeWG sync.WaitGroup for pw := range m { - removeWG.Add(1) logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) + removeWG.Add(1) fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done) } removeWG.Wait() - fs.MustSyncPath(pt.smallPartsPath) - fs.MustSyncPath(pt.bigPartsPath) + // There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath, + // since they should be automatically called inside fs.MustRemoveAllWithDoneCallback. + pt.snapshotLock.RUnlock() // Remove partition references from removed parts. @@ -1738,9 +1745,15 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str } } else { // Just remove srcPath. - fs.MustRemoveAll(srcPath) + removeWG.Add(1) + fs.MustRemoveAllWithDoneCallback(srcPath, removeWG.Done) } + // Flush pathPrefix* directory metadata to the underying storage, + // so the moved files become visible there. + fs.MustSyncPath(pathPrefix1) + fs.MustSyncPath(pathPrefix2) + pendingTxnDeletionsWG.Add(1) go func() { defer pendingTxnDeletionsWG.Done() @@ -1748,9 +1761,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . removeWG.Wait() - // Flush pathPrefix* directory metadata to the underying storage. - fs.MustSyncPath(pathPrefix1) - fs.MustSyncPath(pathPrefix2) + // There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal, + // since it is already called by fs.MustRemoveAllWithDoneCallback. + if err := os.Remove(txnPath); err != nil { logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) }