mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/fs: add MustStopDirRemover for waiting until pending directories are removed on graceful shutdown
This patch is mainly required for laggy NFS. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162
This commit is contained in:
parent
9eb5de334f
commit
88f8670ede
4 changed files with 123 additions and 70 deletions
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
|
@ -43,6 +44,8 @@ func main() {
|
|||
vmstorage.Stop()
|
||||
vmselect.Stop()
|
||||
|
||||
fs.MustStopDirRemover()
|
||||
|
||||
logger.Infof("the VictoriaMetrics has been stopped in %s", time.Since(startTime))
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
@ -92,7 +93,7 @@ func setUp() {
|
|||
|
||||
func processFlags() {
|
||||
flag.Parse()
|
||||
for _, fs := range []struct {
|
||||
for _, fv := range []struct {
|
||||
flag string
|
||||
value string
|
||||
}{
|
||||
|
@ -103,8 +104,8 @@ func processFlags() {
|
|||
{flag: "loggerLevel", value: testLogLevel},
|
||||
} {
|
||||
// panics if flag doesn't exist
|
||||
if err := flag.Lookup(fs.flag).Value.Set(fs.value); err != nil {
|
||||
log.Fatalf("unable to set %q with value %q, err: %v", fs.flag, fs.value, err)
|
||||
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
|
||||
log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,13 +122,14 @@ func waitFor(timeout time.Duration, f func() bool) error {
|
|||
}
|
||||
|
||||
func tearDown() {
|
||||
vminsert.Stop()
|
||||
vmstorage.Stop()
|
||||
vmselect.Stop()
|
||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||
log.Fatalf("cannot stop the webservice: %s", err)
|
||||
}
|
||||
os.RemoveAll(storagePath)
|
||||
vminsert.Stop()
|
||||
vmstorage.Stop()
|
||||
vmselect.Stop()
|
||||
fs.MustRemoveAll(storagePath)
|
||||
fs.MustStopDirRemover()
|
||||
}
|
||||
|
||||
func TestWriteRead(t *testing.T) {
|
||||
|
|
111
lib/fs/dir_remover.go
Normal file
111
lib/fs/dir_remover.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
package fs
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
func mustRemoveAll(path string) bool {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
// Make sure the parent directory doesn't contain references
|
||||
// to the current directory.
|
||||
mustSyncParentDirIfExists(path)
|
||||
return true
|
||||
}
|
||||
if !isTemporaryNFSError(err) {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
// NFS prevents from removing directories with open files.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
// Schedule for later directory removal.
|
||||
nfsDirRemoveFailedAttempts.Inc()
|
||||
select {
|
||||
case removeDirCh <- path:
|
||||
default:
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
||||
|
||||
var removeDirCh = make(chan string, 1024)
|
||||
|
||||
func dirRemover() {
|
||||
const minSleepTime = 100 * time.Millisecond
|
||||
const maxSleepTime = time.Second
|
||||
sleepTime := minSleepTime
|
||||
for {
|
||||
var path string
|
||||
select {
|
||||
case path = <-removeDirCh:
|
||||
default:
|
||||
if atomic.LoadUint64(&stopDirRemover) != 0 {
|
||||
return
|
||||
}
|
||||
time.Sleep(minSleepTime)
|
||||
continue
|
||||
}
|
||||
if mustRemoveAll(path) {
|
||||
sleepTime = minSleepTime
|
||||
continue
|
||||
}
|
||||
|
||||
// Couldn't remove the directory at the path because of NFS lock.
|
||||
// Sleep for a while and try again.
|
||||
// Do not limit the amount of time required for deleting the directory,
|
||||
// since this may break on laggy NFS.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
||||
time.Sleep(sleepTime)
|
||||
if sleepTime < maxSleepTime {
|
||||
sleepTime *= 2
|
||||
} else {
|
||||
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isTemporaryNFSError(err error) bool {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
||||
errStr := err.Error()
|
||||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||
}
|
||||
|
||||
var dirRemoverWG sync.WaitGroup
|
||||
var stopDirRemover uint64
|
||||
|
||||
func init() {
|
||||
dirRemoverWG.Add(1)
|
||||
go func() {
|
||||
defer dirRemoverWG.Done()
|
||||
dirRemover()
|
||||
}()
|
||||
}
|
||||
|
||||
// MustStopDirRemover must be called in the end of graceful shutdown
|
||||
// in order to wait for removing the remaining directories from removeDirCh.
|
||||
//
|
||||
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover
|
||||
// is called.
|
||||
func MustStopDirRemover() {
|
||||
atomic.StoreUint64(&stopDirRemover, 1)
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
dirRemoverWG.Wait()
|
||||
close(doneCh)
|
||||
}()
|
||||
const maxWaitTime = 5 * time.Second
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-time.After(maxWaitTime):
|
||||
logger.Panicf("FATAL: cannot stop dirRemover in %s", maxWaitTime)
|
||||
}
|
||||
}
|
63
lib/fs/fs.go
63
lib/fs/fs.go
|
@ -6,9 +6,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -251,67 +249,6 @@ func MustRemoveAll(path string) {
|
|||
_ = mustRemoveAll(path)
|
||||
}
|
||||
|
||||
func mustRemoveAll(path string) bool {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
// Make sure the parent directory doesn't contain references
|
||||
// to the current directory.
|
||||
mustSyncParentDirIfExists(path)
|
||||
return true
|
||||
}
|
||||
if !isTemporaryNFSError(err) {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
// NFS prevents from removing directories with open files.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
// Schedule for later directory removal.
|
||||
nfsDirRemoveFailedAttempts.Inc()
|
||||
select {
|
||||
case removeDirCh <- path:
|
||||
default:
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
||||
|
||||
var removeDirCh = make(chan string, 1024)
|
||||
|
||||
func dirRemover() {
|
||||
const minSleepTime = 100 * time.Millisecond
|
||||
const maxSleepTime = time.Second
|
||||
sleepTime := minSleepTime
|
||||
for path := range removeDirCh {
|
||||
if mustRemoveAll(path) {
|
||||
sleepTime = minSleepTime
|
||||
continue
|
||||
}
|
||||
|
||||
// Couldn't remove the directory at the path because of NFS lock.
|
||||
// Sleep for a while and try again.
|
||||
// Do not limit the amount of time required for deleting the directory,
|
||||
// since this may break on laggy NFS.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
||||
time.Sleep(sleepTime)
|
||||
if sleepTime < maxSleepTime {
|
||||
sleepTime *= 2
|
||||
} else {
|
||||
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isTemporaryNFSError(err error) bool {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
||||
errStr := err.Error()
|
||||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||
}
|
||||
|
||||
func init() {
|
||||
go dirRemover()
|
||||
}
|
||||
|
||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||
func HardLinkFiles(srcDir, dstDir string) error {
|
||||
if err := mkdirSync(dstDir); err != nil {
|
||||
|
|
Loading…
Reference in a new issue