mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/fs: concurrently remove up to 1024 blocked NFS directories
Previously the blocked directories were removed sequentially by a single goroutine. This can be not enough for highly loaded VictoriaMetrics that accepts millions of sample per second, when big number of LSM parts are created and removed at high rate. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1313
This commit is contained in:
parent
8e2985b53d
commit
e05dd475f0
2 changed files with 34 additions and 80 deletions
|
@ -3,16 +3,40 @@ package fs
|
|||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
func mustRemoveAll(path string, done func()) bool {
|
||||
func mustRemoveAll(path string, done func()) {
|
||||
if tryRemoveAll(path, done) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case removeDirConcurrencyCh <- struct{}{}:
|
||||
default:
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirConcurrencyCh))
|
||||
}
|
||||
dirRemoverWG.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
dirRemoverWG.Done()
|
||||
<-removeDirConcurrencyCh
|
||||
}()
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
if tryRemoveAll(path, done) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var dirRemoverWG syncwg.WaitGroup
|
||||
|
||||
func tryRemoveAll(path string, done func()) bool {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil || isStaleNFSFileHandleError(err) {
|
||||
// Make sure the parent directory doesn't contain references
|
||||
|
@ -28,74 +52,17 @@ func mustRemoveAll(path string, done func()) bool {
|
|||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
// Schedule for later directory removal.
|
||||
nfsDirRemoveFailedAttempts.Inc()
|
||||
w := &removeDirWork{
|
||||
path: path,
|
||||
done: done,
|
||||
}
|
||||
select {
|
||||
case removeDirCh <- w:
|
||||
default:
|
||||
// Wait for a while in the hope files are removed from removeDirCh.
|
||||
// This can be the case on highly loaded system with high ingestion rate
|
||||
// as described at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1313
|
||||
t := timerpool.Get(10 * time.Second)
|
||||
select {
|
||||
case removeDirCh <- w:
|
||||
timerpool.Put(t)
|
||||
case <-t.C:
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
var (
|
||||
nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
||||
_ = metrics.NewGauge(`vm_nfs_pending_dirs_to_remove`, func() float64 {
|
||||
return float64(len(removeDirCh))
|
||||
return float64(len(removeDirConcurrencyCh))
|
||||
})
|
||||
)
|
||||
|
||||
type removeDirWork struct {
|
||||
path string
|
||||
done func()
|
||||
}
|
||||
|
||||
var removeDirCh = make(chan *removeDirWork, 1024)
|
||||
|
||||
func dirRemover() {
|
||||
const minSleepTime = 100 * time.Millisecond
|
||||
const maxSleepTime = time.Second
|
||||
sleepTime := minSleepTime
|
||||
for {
|
||||
var w *removeDirWork
|
||||
select {
|
||||
case w = <-removeDirCh:
|
||||
default:
|
||||
if atomic.LoadUint64(&stopDirRemover) != 0 {
|
||||
return
|
||||
}
|
||||
time.Sleep(minSleepTime)
|
||||
continue
|
||||
}
|
||||
if mustRemoveAll(w.path, w.done) {
|
||||
sleepTime = minSleepTime
|
||||
continue
|
||||
}
|
||||
|
||||
// Couldn't remove the directory at the path because of NFS lock.
|
||||
// Sleep for a while and try again.
|
||||
// Do not limit the amount of time required for deleting the directory,
|
||||
// since this may break on laggy NFS.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
||||
time.Sleep(sleepTime)
|
||||
if sleepTime < maxSleepTime {
|
||||
sleepTime *= 2
|
||||
} else if sleepTime > 5*time.Second {
|
||||
logger.Warnf("failed to remove directory %q due to NFS lock; retrying later in %.3f seconds", w.path, sleepTime.Seconds())
|
||||
}
|
||||
}
|
||||
}
|
||||
var removeDirConcurrencyCh = make(chan struct{}, 1024)
|
||||
|
||||
func isStaleNFSFileHandleError(err error) bool {
|
||||
errStr := err.Error()
|
||||
|
@ -108,24 +75,11 @@ func isTemporaryNFSError(err error) bool {
|
|||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||
}
|
||||
|
||||
var dirRemoverWG sync.WaitGroup
|
||||
var stopDirRemover uint64
|
||||
|
||||
func init() {
|
||||
dirRemoverWG.Add(1)
|
||||
go func() {
|
||||
defer dirRemoverWG.Done()
|
||||
dirRemover()
|
||||
}()
|
||||
}
|
||||
|
||||
// MustStopDirRemover must be called in the end of graceful shutdown
|
||||
// in order to wait for removing the remaining directories from removeDirCh.
|
||||
// in order to wait for removing the remaining directories from removeDirConcurrencyCh.
|
||||
//
|
||||
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover
|
||||
// is called.
|
||||
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover is called.
|
||||
func MustStopDirRemover() {
|
||||
atomic.StoreUint64(&stopDirRemover, 1)
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
dirRemoverWG.Wait()
|
||||
|
|
|
@ -198,7 +198,7 @@ func IsEmptyDir(path string) bool {
|
|||
//
|
||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
func MustRemoveAll(path string) {
|
||||
_ = mustRemoveAll(path, func() {})
|
||||
mustRemoveAll(path, func() {})
|
||||
}
|
||||
|
||||
// MustRemoveAllWithDoneCallback removes path with all the contents.
|
||||
|
@ -210,7 +210,7 @@ func MustRemoveAll(path string) {
|
|||
// done may be called after the function returns for NFS path.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61.
|
||||
func MustRemoveAllWithDoneCallback(path string, done func()) {
|
||||
_ = mustRemoveAll(path, done)
|
||||
mustRemoveAll(path, done)
|
||||
}
|
||||
|
||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||
|
|
Loading…
Reference in a new issue