lib/{mergeset,storage}: remove transaction files only after the mentioned dirs are really removed

This should fix the issue on NFS when incompletely removed dirs may be left
after unclean shutdown (OOM, kill -9, hard reset, etc.), while the corresponding transaction
files are already removed.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162
This commit is contained in:
Aliaksandr Valialkin 2019-12-02 21:34:35 +02:00
parent 4e22b521c2
commit b9616c017f
4 changed files with 51 additions and 19 deletions

View file

@ -11,12 +11,13 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
func mustRemoveAll(path string) bool { func mustRemoveAll(path string, done func()) bool {
err := os.RemoveAll(path) err := os.RemoveAll(path)
if err == nil { if err == nil {
// Make sure the parent directory doesn't contain references // Make sure the parent directory doesn't contain references
// to the current directory. // to the current directory.
mustSyncParentDirIfExists(path) mustSyncParentDirIfExists(path)
done()
return true return true
} }
if !isTemporaryNFSError(err) { if !isTemporaryNFSError(err) {
@ -26,8 +27,12 @@ func mustRemoveAll(path string) bool {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
// Schedule for later directory removal. // Schedule for later directory removal.
nfsDirRemoveFailedAttempts.Inc() nfsDirRemoveFailedAttempts.Inc()
w := &removeDirWork{
path: path,
done: done,
}
select { select {
case removeDirCh <- path: case removeDirCh <- w:
default: default:
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh)) logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
} }
@ -36,16 +41,21 @@ func mustRemoveAll(path string) bool {
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`) var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
var removeDirCh = make(chan string, 1024) type removeDirWork struct {
path string
done func()
}
var removeDirCh = make(chan *removeDirWork, 1024)
func dirRemover() { func dirRemover() {
const minSleepTime = 100 * time.Millisecond const minSleepTime = 100 * time.Millisecond
const maxSleepTime = time.Second const maxSleepTime = time.Second
sleepTime := minSleepTime sleepTime := minSleepTime
for { for {
var path string var w *removeDirWork
select { select {
case path = <-removeDirCh: case w = <-removeDirCh:
default: default:
if atomic.LoadUint64(&stopDirRemover) != 0 { if atomic.LoadUint64(&stopDirRemover) != 0 {
return return
@ -53,7 +63,7 @@ func dirRemover() {
time.Sleep(minSleepTime) time.Sleep(minSleepTime)
continue continue
} }
if mustRemoveAll(path) { if mustRemoveAll(w.path, w.done) {
sleepTime = minSleepTime sleepTime = minSleepTime
continue continue
} }
@ -67,7 +77,7 @@ func dirRemover() {
if sleepTime < maxSleepTime { if sleepTime < maxSleepTime {
sleepTime *= 2 sleepTime *= 2
} else { } else {
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path) logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", w.path)
} }
} }
} }

View file

@ -174,7 +174,7 @@ func mkdirSync(path string) error {
return nil return nil
} }
// RemoveDirContents removes all the contents of the given dir it it exists. // RemoveDirContents removes all the contents of the given dir if it exists.
// //
// It doesn't remove the dir itself, so the dir may be mounted // It doesn't remove the dir itself, so the dir may be mounted
// to a separate partition. // to a separate partition.
@ -246,7 +246,17 @@ func mustSyncParentDirIfExists(path string) {
// //
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . // It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
func MustRemoveAll(path string) { func MustRemoveAll(path string) {
_ = mustRemoveAll(path) _ = mustRemoveAll(path, func() {})
}
// MustRemoveAllWithDoneCallback removes path with all the contents.
//
// done is called after the path is successfully removed.
//
// done may be called after the function returns for NFS path.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61.
func MustRemoveAllWithDoneCallback(path string, done func()) {
_ = mustRemoveAll(path, done)
} }
// HardLinkFiles makes hard links for all the files from srcDir in dstDir. // HardLinkFiles makes hard links for all the files from srcDir in dstDir.

View file

@ -1121,12 +1121,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
} }
// Remove old paths. It is OK if certain paths don't exist. // Remove old paths. It is OK if certain paths don't exist.
var removeWG sync.WaitGroup
for _, path := range rmPaths { for _, path := range rmPaths {
path, err := validatePath(pathPrefix, path) path, err := validatePath(pathPrefix, path)
if err != nil { if err != nil {
return fmt.Errorf("invalid path to remove: %s", err) return fmt.Errorf("invalid path to remove: %s", err)
} }
fs.MustRemoveAll(path) removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(path, removeWG.Done)
} }
// Move the new part to new directory. // Move the new part to new directory.
@ -1154,10 +1156,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
// Flush pathPrefix directory metadata to the underying storage. // Flush pathPrefix directory metadata to the underying storage.
fs.MustSyncPath(pathPrefix) fs.MustSyncPath(pathPrefix)
// Remove the transaction file. go func() {
// Remove the transaction file only after all the source paths are deleted.
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
removeWG.Wait()
if err := os.Remove(txnPath); err != nil { if err := os.Remove(txnPath); err != nil {
return fmt.Errorf("cannot remove transaction file %q: %s", txnPath, err) logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
} }
}()
return nil return nil
} }

View file

@ -1452,12 +1452,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
} }
// Remove old paths. It is OK if certain paths don't exist. // Remove old paths. It is OK if certain paths don't exist.
var removeWG sync.WaitGroup
for _, path := range rmPaths { for _, path := range rmPaths {
path, err := validatePath(pathPrefix1, pathPrefix2, path) path, err := validatePath(pathPrefix1, pathPrefix2, path)
if err != nil { if err != nil {
return fmt.Errorf("invalid path to remove: %s", err) return fmt.Errorf("invalid path to remove: %s", err)
} }
fs.MustRemoveAll(path) removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(path, removeWG.Done)
} }
// Move the new part to new directory. // Move the new part to new directory.
@ -1492,10 +1494,14 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
fs.MustSyncPath(pathPrefix1) fs.MustSyncPath(pathPrefix1)
fs.MustSyncPath(pathPrefix2) fs.MustSyncPath(pathPrefix2)
// Remove the transaction file. go func() {
// Remove the transaction file only after all the source paths are deleted.
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
removeWG.Wait()
if err := os.Remove(txnPath); err != nil { if err := os.Remove(txnPath); err != nil {
return fmt.Errorf("cannot remove transaction file: %s", err) logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
} }
}()
return nil return nil
} }