mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/fs: add MustStopDirRemover for waiting until pending directories are removed on graceful shutdown
This patch is mainly required for laggy NFS. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162
This commit is contained in:
parent
d0953e9f02
commit
2c654258ef
5 changed files with 119 additions and 63 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
|
@ -88,6 +89,8 @@ func main() {
|
||||||
netstorage.Stop()
|
netstorage.Stop()
|
||||||
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
||||||
|
|
||||||
|
fs.MustStopDirRemover()
|
||||||
|
|
||||||
logger.Infof("the vminsert has been stopped")
|
logger.Infof("the vminsert has been stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,8 @@ func main() {
|
||||||
}
|
}
|
||||||
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
||||||
|
|
||||||
|
fs.MustStopDirRemover()
|
||||||
|
|
||||||
logger.Infof("the vmselect has been stopped")
|
logger.Infof("the vmselect has been stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/transport"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/transport"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
|
@ -76,6 +77,8 @@ func main() {
|
||||||
strg.MustClose()
|
strg.MustClose()
|
||||||
logger.Infof("successfully closed the storage in %s", time.Since(startTime))
|
logger.Infof("successfully closed the storage in %s", time.Since(startTime))
|
||||||
|
|
||||||
|
fs.MustStopDirRemover()
|
||||||
|
|
||||||
logger.Infof("the vmstorage has been stopped")
|
logger.Infof("the vmstorage has been stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
111
lib/fs/dir_remover.go
Normal file
111
lib/fs/dir_remover.go
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
package fs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
func mustRemoveAll(path string) bool {
|
||||||
|
err := os.RemoveAll(path)
|
||||||
|
if err == nil {
|
||||||
|
// Make sure the parent directory doesn't contain references
|
||||||
|
// to the current directory.
|
||||||
|
mustSyncParentDirIfExists(path)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if !isTemporaryNFSError(err) {
|
||||||
|
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||||
|
}
|
||||||
|
// NFS prevents from removing directories with open files.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||||
|
// Schedule for later directory removal.
|
||||||
|
nfsDirRemoveFailedAttempts.Inc()
|
||||||
|
select {
|
||||||
|
case removeDirCh <- path:
|
||||||
|
default:
|
||||||
|
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
||||||
|
|
||||||
|
var removeDirCh = make(chan string, 1024)
|
||||||
|
|
||||||
|
func dirRemover() {
|
||||||
|
const minSleepTime = 100 * time.Millisecond
|
||||||
|
const maxSleepTime = time.Second
|
||||||
|
sleepTime := minSleepTime
|
||||||
|
for {
|
||||||
|
var path string
|
||||||
|
select {
|
||||||
|
case path = <-removeDirCh:
|
||||||
|
default:
|
||||||
|
if atomic.LoadUint64(&stopDirRemover) != 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(minSleepTime)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if mustRemoveAll(path) {
|
||||||
|
sleepTime = minSleepTime
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Couldn't remove the directory at the path because of NFS lock.
|
||||||
|
// Sleep for a while and try again.
|
||||||
|
// Do not limit the amount of time required for deleting the directory,
|
||||||
|
// since this may break on laggy NFS.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
if sleepTime < maxSleepTime {
|
||||||
|
sleepTime *= 2
|
||||||
|
} else {
|
||||||
|
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isTemporaryNFSError(err error) bool {
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
||||||
|
errStr := err.Error()
|
||||||
|
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||||
|
}
|
||||||
|
|
||||||
|
var dirRemoverWG sync.WaitGroup
|
||||||
|
var stopDirRemover uint64
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
dirRemoverWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer dirRemoverWG.Done()
|
||||||
|
dirRemover()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustStopDirRemover must be called in the end of graceful shutdown
|
||||||
|
// in order to wait for removing the remaining directories from removeDirCh.
|
||||||
|
//
|
||||||
|
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover
|
||||||
|
// is called.
|
||||||
|
func MustStopDirRemover() {
|
||||||
|
atomic.StoreUint64(&stopDirRemover, 1)
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
dirRemoverWG.Wait()
|
||||||
|
close(doneCh)
|
||||||
|
}()
|
||||||
|
const maxWaitTime = 5 * time.Second
|
||||||
|
select {
|
||||||
|
case <-doneCh:
|
||||||
|
return
|
||||||
|
case <-time.After(maxWaitTime):
|
||||||
|
logger.Panicf("FATAL: cannot stop dirRemover in %s", maxWaitTime)
|
||||||
|
}
|
||||||
|
}
|
63
lib/fs/fs.go
63
lib/fs/fs.go
|
@ -6,9 +6,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
@ -251,67 +249,6 @@ func MustRemoveAll(path string) {
|
||||||
_ = mustRemoveAll(path)
|
_ = mustRemoveAll(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustRemoveAll(path string) bool {
|
|
||||||
err := os.RemoveAll(path)
|
|
||||||
if err == nil {
|
|
||||||
// Make sure the parent directory doesn't contain references
|
|
||||||
// to the current directory.
|
|
||||||
mustSyncParentDirIfExists(path)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if !isTemporaryNFSError(err) {
|
|
||||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
|
||||||
}
|
|
||||||
// NFS prevents from removing directories with open files.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
|
||||||
// Schedule for later directory removal.
|
|
||||||
nfsDirRemoveFailedAttempts.Inc()
|
|
||||||
select {
|
|
||||||
case removeDirCh <- path:
|
|
||||||
default:
|
|
||||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
|
||||||
|
|
||||||
var removeDirCh = make(chan string, 1024)
|
|
||||||
|
|
||||||
func dirRemover() {
|
|
||||||
const minSleepTime = 100 * time.Millisecond
|
|
||||||
const maxSleepTime = time.Second
|
|
||||||
sleepTime := minSleepTime
|
|
||||||
for path := range removeDirCh {
|
|
||||||
if mustRemoveAll(path) {
|
|
||||||
sleepTime = minSleepTime
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Couldn't remove the directory at the path because of NFS lock.
|
|
||||||
// Sleep for a while and try again.
|
|
||||||
// Do not limit the amount of time required for deleting the directory,
|
|
||||||
// since this may break on laggy NFS.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
|
||||||
time.Sleep(sleepTime)
|
|
||||||
if sleepTime < maxSleepTime {
|
|
||||||
sleepTime *= 2
|
|
||||||
} else {
|
|
||||||
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isTemporaryNFSError(err error) bool {
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
|
||||||
errStr := err.Error()
|
|
||||||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
go dirRemover()
|
|
||||||
}
|
|
||||||
|
|
||||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||||
func HardLinkFiles(srcDir, dstDir string) error {
|
func HardLinkFiles(srcDir, dstDir string) error {
|
||||||
if err := mkdirSync(dstDir); err != nil {
|
if err := mkdirSync(dstDir); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue