mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
lib/fs: do not postpone directory removal on NFS error
Continue trying to remove NFS directory on temporary errors for up to a minute. The previous async removal process breaks in the following case during VictoriaMetrics start - VictoriaMetrics opens index, finds incomplete merge transactions and starts replaying them. - The transaction instructs removing old directories for parts, which were already merged into bigger part. - VictoriaMetrics removes these directories, but their removal is delayed due to NFS errors. - VictoriaMetrics scans partition directory after all the incomplete merge transactions are finished and finds directories, which should be removed, but weren't still removed due to NFS errors. - VictoriaMetrics panics when it finds unexpected empty directory. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162
This commit is contained in:
parent
9ea2bd822e
commit
5f52eb7653
5 changed files with 42 additions and 120 deletions
|
@ -16,7 +16,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
|
@ -89,8 +88,6 @@ func main() {
|
||||||
netstorage.Stop()
|
netstorage.Stop()
|
||||||
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
||||||
|
|
||||||
fs.MustStopDirRemover()
|
|
||||||
|
|
||||||
logger.Infof("the vminsert has been stopped")
|
logger.Infof("the vminsert has been stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,8 +76,6 @@ func main() {
|
||||||
}
|
}
|
||||||
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
logger.Infof("successfully stopped netstorage in %s", time.Since(startTime))
|
||||||
|
|
||||||
fs.MustStopDirRemover()
|
|
||||||
|
|
||||||
logger.Infof("the vmselect has been stopped")
|
logger.Infof("the vmselect has been stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/transport"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/transport"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
|
@ -83,8 +82,6 @@ func main() {
|
||||||
strg.MustClose()
|
strg.MustClose()
|
||||||
logger.Infof("successfully closed the storage in %s", time.Since(startTime))
|
logger.Infof("successfully closed the storage in %s", time.Since(startTime))
|
||||||
|
|
||||||
fs.MustStopDirRemover()
|
|
||||||
|
|
||||||
logger.Infof("the vmstorage has been stopped")
|
logger.Infof("the vmstorage has been stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,111 +0,0 @@
|
||||||
package fs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
|
||||||
)
|
|
||||||
|
|
||||||
func mustRemoveAll(path string) bool {
|
|
||||||
err := os.RemoveAll(path)
|
|
||||||
if err == nil {
|
|
||||||
// Make sure the parent directory doesn't contain references
|
|
||||||
// to the current directory.
|
|
||||||
mustSyncParentDirIfExists(path)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if !isTemporaryNFSError(err) {
|
|
||||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
|
||||||
}
|
|
||||||
// NFS prevents from removing directories with open files.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
|
||||||
// Schedule for later directory removal.
|
|
||||||
nfsDirRemoveFailedAttempts.Inc()
|
|
||||||
select {
|
|
||||||
case removeDirCh <- path:
|
|
||||||
default:
|
|
||||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
|
||||||
|
|
||||||
var removeDirCh = make(chan string, 1024)
|
|
||||||
|
|
||||||
func dirRemover() {
|
|
||||||
const minSleepTime = 100 * time.Millisecond
|
|
||||||
const maxSleepTime = time.Second
|
|
||||||
sleepTime := minSleepTime
|
|
||||||
for {
|
|
||||||
var path string
|
|
||||||
select {
|
|
||||||
case path = <-removeDirCh:
|
|
||||||
default:
|
|
||||||
if atomic.LoadUint64(&stopDirRemover) != 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
time.Sleep(minSleepTime)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if mustRemoveAll(path) {
|
|
||||||
sleepTime = minSleepTime
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Couldn't remove the directory at the path because of NFS lock.
|
|
||||||
// Sleep for a while and try again.
|
|
||||||
// Do not limit the amount of time required for deleting the directory,
|
|
||||||
// since this may break on laggy NFS.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
|
||||||
time.Sleep(sleepTime)
|
|
||||||
if sleepTime < maxSleepTime {
|
|
||||||
sleepTime *= 2
|
|
||||||
} else {
|
|
||||||
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isTemporaryNFSError(err error) bool {
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
|
||||||
errStr := err.Error()
|
|
||||||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
|
||||||
}
|
|
||||||
|
|
||||||
var dirRemoverWG sync.WaitGroup
|
|
||||||
var stopDirRemover uint64
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
dirRemoverWG.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer dirRemoverWG.Done()
|
|
||||||
dirRemover()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// MustStopDirRemover must be called in the end of graceful shutdown
|
|
||||||
// in order to wait for removing the remaining directories from removeDirCh.
|
|
||||||
//
|
|
||||||
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover
|
|
||||||
// is called.
|
|
||||||
func MustStopDirRemover() {
|
|
||||||
atomic.StoreUint64(&stopDirRemover, 1)
|
|
||||||
doneCh := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
dirRemoverWG.Wait()
|
|
||||||
close(doneCh)
|
|
||||||
}()
|
|
||||||
const maxWaitTime = 5 * time.Second
|
|
||||||
select {
|
|
||||||
case <-doneCh:
|
|
||||||
return
|
|
||||||
case <-time.After(maxWaitTime):
|
|
||||||
logger.Panicf("FATAL: cannot stop dirRemover in %s", maxWaitTime)
|
|
||||||
}
|
|
||||||
}
|
|
43
lib/fs/fs.go
43
lib/fs/fs.go
|
@ -6,7 +6,9 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
@ -246,7 +248,46 @@ func mustSyncParentDirIfExists(path string) {
|
||||||
//
|
//
|
||||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||||
func MustRemoveAll(path string) {
|
func MustRemoveAll(path string) {
|
||||||
_ = mustRemoveAll(path)
|
startTime := time.Now()
|
||||||
|
sleepTime := 100 * time.Millisecond
|
||||||
|
again:
|
||||||
|
err := os.RemoveAll(path)
|
||||||
|
if err == nil {
|
||||||
|
// Make sure the parent directory doesn't contain references
|
||||||
|
// to the current directory.
|
||||||
|
mustSyncParentDirIfExists(path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !isTemporaryNFSError(err) {
|
||||||
|
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||||
|
}
|
||||||
|
// NFS prevents from removing directories with open files.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||||
|
// Continuously try removing the directory for up to a minute before giving up.
|
||||||
|
//
|
||||||
|
// Do not postpone directory removal, since it breaks in the following case:
|
||||||
|
// - Remove the directory (the removal is postponed)
|
||||||
|
// - Scan for exsiting directories and open them. The scan finds
|
||||||
|
// the `removed` directory, but its contents may be already broken.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
||||||
|
nfsDirRemoveFailedAttempts.Inc()
|
||||||
|
if time.Since(startTime) > time.Minute {
|
||||||
|
logger.Panicf("FATAL: couldn't remove NFS directory %q in %s", path, time.Minute)
|
||||||
|
}
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
sleepTime *= 2
|
||||||
|
if sleepTime > time.Second {
|
||||||
|
sleepTime = time.Second
|
||||||
|
}
|
||||||
|
goto again
|
||||||
|
}
|
||||||
|
|
||||||
|
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
||||||
|
|
||||||
|
func isTemporaryNFSError(err error) bool {
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
||||||
|
errStr := err.Error()
|
||||||
|
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||||
}
|
}
|
||||||
|
|
||||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||||
|
|
Loading…
Reference in a new issue