diff --git a/lib/fs/dir_remover.go b/lib/fs/dir_remover.go index ed966dab1..72f1c9484 100644 --- a/lib/fs/dir_remover.go +++ b/lib/fs/dir_remover.go @@ -3,16 +3,40 @@ package fs import ( "os" "strings" - "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/metrics" ) -func mustRemoveAll(path string, done func()) bool { +func mustRemoveAll(path string, done func()) { + if tryRemoveAll(path, done) { + return + } + select { + case removeDirConcurrencyCh <- struct{}{}: + default: + logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirConcurrencyCh)) + } + dirRemoverWG.Add(1) + go func() { + defer func() { + dirRemoverWG.Done() + <-removeDirConcurrencyCh + }() + for { + time.Sleep(time.Second) + if tryRemoveAll(path, done) { + return + } + } + }() +} + +var dirRemoverWG syncwg.WaitGroup + +func tryRemoveAll(path string, done func()) bool { err := os.RemoveAll(path) if err == nil || isStaleNFSFileHandleError(err) { // Make sure the parent directory doesn't contain references @@ -28,74 +52,17 @@ func mustRemoveAll(path string, done func()) bool { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . // Schedule for later directory removal. nfsDirRemoveFailedAttempts.Inc() - w := &removeDirWork{ - path: path, - done: done, - } - select { - case removeDirCh <- w: - default: - // Wait for a while in the hope files are removed from removeDirCh. - // This can be the case on highly loaded system with high ingestion rate - // as described at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1313 - t := timerpool.Get(10 * time.Second) - select { - case removeDirCh <- w: - timerpool.Put(t) - case <-t.C: - logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh)) - } - } return false } var ( nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`) _ = metrics.NewGauge(`vm_nfs_pending_dirs_to_remove`, func() float64 { - return float64(len(removeDirCh)) + return float64(len(removeDirConcurrencyCh)) }) ) -type removeDirWork struct { - path string - done func() -} - -var removeDirCh = make(chan *removeDirWork, 1024) - -func dirRemover() { - const minSleepTime = 100 * time.Millisecond - const maxSleepTime = time.Second - sleepTime := minSleepTime - for { - var w *removeDirWork - select { - case w = <-removeDirCh: - default: - if atomic.LoadUint64(&stopDirRemover) != 0 { - return - } - time.Sleep(minSleepTime) - continue - } - if mustRemoveAll(w.path, w.done) { - sleepTime = minSleepTime - continue - } - - // Couldn't remove the directory at the path because of NFS lock. - // Sleep for a while and try again. - // Do not limit the amount of time required for deleting the directory, - // since this may break on laggy NFS. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 . - time.Sleep(sleepTime) - if sleepTime < maxSleepTime { - sleepTime *= 2 - } else if sleepTime > 5*time.Second { - logger.Warnf("failed to remove directory %q due to NFS lock; retrying later in %.3f seconds", w.path, sleepTime.Seconds()) - } - } -} +var removeDirConcurrencyCh = make(chan struct{}, 1024) func isStaleNFSFileHandleError(err error) bool { errStr := err.Error() @@ -108,24 +75,11 @@ func isTemporaryNFSError(err error) bool { return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy") } -var dirRemoverWG sync.WaitGroup -var stopDirRemover uint64 - -func init() { - dirRemoverWG.Add(1) - go func() { - defer dirRemoverWG.Done() - dirRemover() - }() -} - // MustStopDirRemover must be called in the end of graceful shutdown -// in order to wait for removing the remaining directories from removeDirCh. +// in order to wait for removing the remaining directories from removeDirConcurrencyCh. // -// It is expected that nobody calls MustRemoveAll when MustStopDirRemover -// is called. +// It is expected that nobody calls MustRemoveAll when MustStopDirRemover is called. func MustStopDirRemover() { - atomic.StoreUint64(&stopDirRemover, 1) doneCh := make(chan struct{}) go func() { dirRemoverWG.Wait() diff --git a/lib/fs/fs.go b/lib/fs/fs.go index f2227f2db..b7a50dd77 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -198,7 +198,7 @@ func IsEmptyDir(path string) bool { // // It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . func MustRemoveAll(path string) { - _ = mustRemoveAll(path, func() {}) + mustRemoveAll(path, func() {}) } // MustRemoveAllWithDoneCallback removes path with all the contents. @@ -210,7 +210,7 @@ func MustRemoveAll(path string) { // done may be called after the function returns for NFS path. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61. func MustRemoveAllWithDoneCallback(path string, done func()) { - _ = mustRemoveAll(path, done) + mustRemoveAll(path, done) } // HardLinkFiles makes hard links for all the files from srcDir in dstDir.