From e05dd475f0df41cd81f1e9b0359990380d9ac0ee Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 21 May 2021 17:55:14 +0300 Subject: [PATCH] lib/fs: concurrently remove up to 1024 blocked NFS directories Previously the blocked directories were removed sequentially by a single goroutine. This can be not enough for highly loaded VictoriaMetrics that accepts millions of sample per second, when big number of LSM parts are created and removed at high rate. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1313 --- lib/fs/dir_remover.go | 110 ++++++++++++------------------------------ lib/fs/fs.go | 4 +- 2 files changed, 34 insertions(+), 80 deletions(-) diff --git a/lib/fs/dir_remover.go b/lib/fs/dir_remover.go index ed966dab1..72f1c9484 100644 --- a/lib/fs/dir_remover.go +++ b/lib/fs/dir_remover.go @@ -3,16 +3,40 @@ package fs import ( "os" "strings" - "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/metrics" ) -func mustRemoveAll(path string, done func()) bool { +func mustRemoveAll(path string, done func()) { + if tryRemoveAll(path, done) { + return + } + select { + case removeDirConcurrencyCh <- struct{}{}: + default: + logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirConcurrencyCh)) + } + dirRemoverWG.Add(1) + go func() { + defer func() { + dirRemoverWG.Done() + <-removeDirConcurrencyCh + }() + for { + time.Sleep(time.Second) + if tryRemoveAll(path, done) { + return + } + } + }() +} + +var dirRemoverWG syncwg.WaitGroup + +func tryRemoveAll(path string, done func()) bool { err := os.RemoveAll(path) if err == nil || isStaleNFSFileHandleError(err) { // Make sure the parent directory doesn't contain references @@ -28,74 +52,17 @@ func mustRemoveAll(path string, done func()) bool { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . // Schedule for later directory removal. nfsDirRemoveFailedAttempts.Inc() - w := &removeDirWork{ - path: path, - done: done, - } - select { - case removeDirCh <- w: - default: - // Wait for a while in the hope files are removed from removeDirCh. - // This can be the case on highly loaded system with high ingestion rate - // as described at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1313 - t := timerpool.Get(10 * time.Second) - select { - case removeDirCh <- w: - timerpool.Put(t) - case <-t.C: - logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh)) - } - } return false } var ( nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`) _ = metrics.NewGauge(`vm_nfs_pending_dirs_to_remove`, func() float64 { - return float64(len(removeDirCh)) + return float64(len(removeDirConcurrencyCh)) }) ) -type removeDirWork struct { - path string - done func() -} - -var removeDirCh = make(chan *removeDirWork, 1024) - -func dirRemover() { - const minSleepTime = 100 * time.Millisecond - const maxSleepTime = time.Second - sleepTime := minSleepTime - for { - var w *removeDirWork - select { - case w = <-removeDirCh: - default: - if atomic.LoadUint64(&stopDirRemover) != 0 { - return - } - time.Sleep(minSleepTime) - continue - } - if mustRemoveAll(w.path, w.done) { - sleepTime = minSleepTime - continue - } - - // Couldn't remove the directory at the path because of NFS lock. - // Sleep for a while and try again. - // Do not limit the amount of time required for deleting the directory, - // since this may break on laggy NFS. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 . - time.Sleep(sleepTime) - if sleepTime < maxSleepTime { - sleepTime *= 2 - } else if sleepTime > 5*time.Second { - logger.Warnf("failed to remove directory %q due to NFS lock; retrying later in %.3f seconds", w.path, sleepTime.Seconds()) - } - } -} +var removeDirConcurrencyCh = make(chan struct{}, 1024) func isStaleNFSFileHandleError(err error) bool { errStr := err.Error() @@ -108,24 +75,11 @@ func isTemporaryNFSError(err error) bool { return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy") } -var dirRemoverWG sync.WaitGroup -var stopDirRemover uint64 - -func init() { - dirRemoverWG.Add(1) - go func() { - defer dirRemoverWG.Done() - dirRemover() - }() -} - // MustStopDirRemover must be called in the end of graceful shutdown -// in order to wait for removing the remaining directories from removeDirCh. +// in order to wait for removing the remaining directories from removeDirConcurrencyCh. // -// It is expected that nobody calls MustRemoveAll when MustStopDirRemover -// is called. +// It is expected that nobody calls MustRemoveAll when MustStopDirRemover is called. func MustStopDirRemover() { - atomic.StoreUint64(&stopDirRemover, 1) doneCh := make(chan struct{}) go func() { dirRemoverWG.Wait() diff --git a/lib/fs/fs.go b/lib/fs/fs.go index f2227f2db..b7a50dd77 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -198,7 +198,7 @@ func IsEmptyDir(path string) bool { // // It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . func MustRemoveAll(path string) { - _ = mustRemoveAll(path, func() {}) + mustRemoveAll(path, func() {}) } // MustRemoveAllWithDoneCallback removes path with all the contents. @@ -210,7 +210,7 @@ func MustRemoveAll(path string) { // done may be called after the function returns for NFS path. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61. func MustRemoveAllWithDoneCallback(path string, done func()) { - _ = mustRemoveAll(path, done) + mustRemoveAll(path, done) } // HardLinkFiles makes hard links for all the files from srcDir in dstDir.