lib/storage: wait for pending transactions before closing and dropping the partition

This deflakes `make test-full-386` test
This commit is contained in:
Aliaksandr Valialkin 2020-12-25 11:45:47 +02:00
parent 932e53522d
commit 490c69c64e
2 changed files with 24 additions and 7 deletions

View file

@ -186,6 +186,8 @@ func mustSyncParentDirIfExists(path string) {
// MustRemoveAll removes path with all the contents.
//
// It properly fsyncs the parent directory after path removal.
//
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
func MustRemoveAll(path string) {
_ = mustRemoveAll(path, func() {})
@ -193,6 +195,8 @@ func MustRemoveAll(path string) {
// MustRemoveAllWithDoneCallback removes path with all the contents.
//
// It properly fsyncs the parent directory after path removal.
//
// done is called after the path is successfully removed.
//
// done may be called after the function returns for NFS path.

View file

@ -240,6 +240,9 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
// The pt must be detached from table before calling pt.Drop.
func (pt *partition) Drop() {
logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath)
// Wait until all the pending transaction deletions are finished before removing partition directories.
pendingTxnDeletionsWG.Wait()
fs.MustRemoveAll(pt.smallPartsPath)
fs.MustRemoveAll(pt.bigPartsPath)
logger.Infof("partition %q has been dropped", pt.name)
@ -643,6 +646,9 @@ func (pt *partition) PutParts(pws []*partWrapper) {
func (pt *partition) MustClose() {
close(pt.stopCh)
// Wait until all the pending transaction deletions are finished.
pendingTxnDeletionsWG.Wait()
logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath)
startTime := time.Now()
pt.stalePartsRemoverWG.Wait()
@ -1352,13 +1358,14 @@ func (pt *partition) removeStaleParts() {
pt.snapshotLock.RLock()
var removeWG sync.WaitGroup
for pw := range m {
removeWG.Add(1)
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000)
removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done)
}
removeWG.Wait()
fs.MustSyncPath(pt.smallPartsPath)
fs.MustSyncPath(pt.bigPartsPath)
// There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath,
// since they should be automatically called inside fs.MustRemoveAllWithDoneCallback.
pt.snapshotLock.RUnlock()
// Remove partition references from removed parts.
@ -1738,9 +1745,15 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
}
} else {
// Just remove srcPath.
fs.MustRemoveAll(srcPath)
removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(srcPath, removeWG.Done)
}
// Flush pathPrefix* directory metadata to the underying storage,
// so the moved files become visible there.
fs.MustSyncPath(pathPrefix1)
fs.MustSyncPath(pathPrefix2)
pendingTxnDeletionsWG.Add(1)
go func() {
defer pendingTxnDeletionsWG.Done()
@ -1748,9 +1761,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
removeWG.Wait()
// Flush pathPrefix* directory metadata to the underying storage.
fs.MustSyncPath(pathPrefix1)
fs.MustSyncPath(pathPrefix2)
// There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal,
// since it is already called by fs.MustRemoveAllWithDoneCallback.
if err := os.Remove(txnPath); err != nil {
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
}