lib/storage: substitute remaining calls to fs.MustRemoveAll with fs.MustRemoveDirAtomic

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038
This commit is contained in:
Aliaksandr Valialkin 2022-09-13 15:48:20 +03:00
parent 68e32b0764
commit 042a532f70
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 13 additions and 9 deletions

View file

@ -204,22 +204,26 @@ func IsEmptyDir(path string) bool {
// 2. Remove the "<dir>.must-remove.XYZ" in background. // 2. Remove the "<dir>.must-remove.XYZ" in background.
// //
// If the process crashes after the step 1, then the directory must be removed // If the process crashes after the step 1, then the directory must be removed
// on the next process start by calling MustRemoveTemporaryDirs. // on the next process start by calling MustRemoveTemporaryDirs on the parent directory.
func MustRemoveDirAtomic(dir string) { func MustRemoveDirAtomic(dir string) {
if !IsPathExist(dir) {
return
}
n := atomic.AddUint64(&atomicDirRemoveCounter, 1) n := atomic.AddUint64(&atomicDirRemoveCounter, 1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n) tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil { if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err) logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)
} }
MustSyncPath(dir)
MustRemoveAll(tmpDir) MustRemoveAll(tmpDir)
parentDir := filepath.Dir(dir)
MustSyncPath(parentDir)
} }
var atomicDirRemoveCounter = uint64(time.Now().UnixNano()) var atomicDirRemoveCounter = uint64(time.Now().UnixNano())
// MustRemoveTemporaryDirs removes all the subdirectories with ".must-remove.<XYZ>" suffix. // MustRemoveTemporaryDirs removes all the subdirectories with ".must-remove.<XYZ>" suffix.
// //
// Such directories may be left on unclean shutdown during MustRemoveDirAtomic. // Such directories may be left on unclean shutdown during MustRemoveDirAtomic call.
func MustRemoveTemporaryDirs(dir string) { func MustRemoveTemporaryDirs(dir string) {
d, err := os.Open(dir) d, err := os.Open(dir)
if err != nil { if err != nil {

View file

@ -110,7 +110,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
timestampsPath := path + "/timestamps.bin" timestampsPath := path + "/timestamps.bin"
timestampsFile, err := filestream.Create(timestampsPath, nocache) timestampsFile, err := filestream.Create(timestampsPath, nocache)
if err != nil { if err != nil {
fs.MustRemoveAll(path) fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create timestamps file: %w", err) return fmt.Errorf("cannot create timestamps file: %w", err)
} }
@ -118,7 +118,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
valuesFile, err := filestream.Create(valuesPath, nocache) valuesFile, err := filestream.Create(valuesPath, nocache)
if err != nil { if err != nil {
timestampsFile.MustClose() timestampsFile.MustClose()
fs.MustRemoveAll(path) fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create values file: %w", err) return fmt.Errorf("cannot create values file: %w", err)
} }
@ -127,7 +127,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
if err != nil { if err != nil {
timestampsFile.MustClose() timestampsFile.MustClose()
valuesFile.MustClose() valuesFile.MustClose()
fs.MustRemoveAll(path) fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create index file: %w", err) return fmt.Errorf("cannot create index file: %w", err)
} }
@ -139,7 +139,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
timestampsFile.MustClose() timestampsFile.MustClose()
valuesFile.MustClose() valuesFile.MustClose()
indexFile.MustClose() indexFile.MustClose()
fs.MustRemoveAll(path) fs.MustRemoveDirAtomic(path)
return fmt.Errorf("cannot create metaindex file: %w", err) return fmt.Errorf("cannot create metaindex file: %w", err)
} }

View file

@ -308,7 +308,7 @@ func (db *indexDB) decRef() {
} }
logger.Infof("dropping indexDB %q", tbPath) logger.Infof("dropping indexDB %q", tbPath)
fs.MustRemoveAll(tbPath) fs.MustRemoveDirAtomic(tbPath)
logger.Infof("indexDB %q has been dropped", tbPath) logger.Infof("indexDB %q has been dropped", tbPath)
} }

View file

@ -2080,5 +2080,5 @@ func stopTestStorage(s *Storage) {
s.metricIDCache.Stop() s.metricIDCache.Stop()
s.metricNameCache.Stop() s.metricNameCache.Stop()
s.tsidCache.Stop() s.tsidCache.Stop()
fs.MustRemoveAll(s.cachePath) fs.MustRemoveDirAtomic(s.cachePath)
} }