diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index 5a1153f6b..6763bcbda 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -22,7 +22,7 @@ func InitTmpBlocksDir(tmpDirPath string) { tmpDirPath = os.TempDir() } tmpBlocksDir = tmpDirPath + "/searchResults" - if err := os.RemoveAll(tmpBlocksDir); err != nil { + if err := fs.RemoveAllHard(tmpBlocksDir); err != nil { logger.Panicf("FATAL: cannot remove %q: %s", tmpBlocksDir, err) } if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil { diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 233afa514..8a5ba51d8 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -5,6 +5,8 @@ import ( "io" "os" "path/filepath" + "strings" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -159,7 +161,7 @@ func RemoveDirContents(dir string) { continue } fullPath := dir + "/" + name - if err := os.RemoveAll(fullPath); err != nil { + if err := RemoveAllHard(fullPath); err != nil { logger.Panicf("FATAL: cannot remove %q: %s", fullPath, err) } } @@ -188,10 +190,66 @@ func IsPathExist(path string) bool { // MustRemoveAllSynced removes path with all the contents // and syncs the parent directory, so it no longer contains the path. func MustRemoveAllSynced(path string) { - if err := os.RemoveAll(path); err != nil { + MustRemoveAll(path) + SyncPath(filepath.Dir(path)) +} + +// MustRemoveAll removes path with all the contents. +func MustRemoveAll(path string) { + if err := RemoveAllHard(path); err != nil { logger.Panicf("FATAL: cannot remove %q: %s", path, err) } - SyncPath(filepath.Dir(path)) +} + +// RemoveAllHard removes path with all the contents. +// +// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . +func RemoveAllHard(path string) error { + err := os.RemoveAll(path) + if err == nil { + return nil + } + if !strings.Contains(err.Error(), "directory not empty") { + return err + } + // This may be NFS-related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . + // Schedule for later directory removal. + select { + case removeDirCh <- path: + default: + return fmt.Errorf("cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh)) + } + return nil +} + +var removeDirCh = make(chan string, 1024) + +func dirRemover() { + for path := range removeDirCh { + attempts := 0 + for { + err := os.RemoveAll(path) + if err == nil { + break + } + if !strings.Contains(err.Error(), "directory not empty") { + logger.Errorf("cannot remove %q: %s", path, err) + break + } + // NFS-related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . + // Sleep for a while and try again. + attempts++ + if attempts > 50 { + logger.Errorf("cannot remove %q in %d attempts: %s", path, attempts, err) + break + } + time.Sleep(10 * time.Millisecond) + } + } +} + +func init() { + go dirRemover() } // HardLinkFiles makes hard links for all the files from srcDir in dstDir. diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index 6f1ec4985..b6682a686 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -2,7 +2,6 @@ package mergeset import ( "fmt" - "os" "path/filepath" "sync" @@ -92,7 +91,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre metaindexPath := path + "/metaindex.bin" metaindexFile, err := filestream.Create(metaindexPath, false) if err != nil { - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create metaindex file: %s", err) } @@ -100,7 +99,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre indexFile, err := filestream.Create(indexPath, nocache) if err != nil { metaindexFile.MustClose() - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create index file: %s", err) } @@ -109,7 +108,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre if err != nil { metaindexFile.MustClose() indexFile.MustClose() - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create items file: %s", err) } @@ -119,7 +118,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre metaindexFile.MustClose() indexFile.MustClose() itemsFile.MustClose() - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create lens file: %s", err) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 3450acaba..d1b295e73 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -846,7 +846,7 @@ func openParts(path string) ([]*partWrapper, error) { } txnDir := path + "/txn" - if err := os.RemoveAll(txnDir); err != nil { + if err := fs.RemoveAllHard(txnDir); err != nil { return nil, fmt.Errorf("cannot remove %q: %s", txnDir, err) } if err := fs.MkdirAllFailIfExist(txnDir); err != nil { @@ -854,7 +854,7 @@ func openParts(path string) ([]*partWrapper, error) { } tmpDir := path + "/tmp" - if err := os.RemoveAll(tmpDir); err != nil { + if err := fs.RemoveAllHard(tmpDir); err != nil { return nil, fmt.Errorf("cannot remove %q: %s", tmpDir, err) } if err := fs.MkdirAllFailIfExist(tmpDir); err != nil { @@ -1033,7 +1033,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { if err != nil { return fmt.Errorf("invalid path to remove: %s", err) } - if err := os.RemoveAll(path); err != nil { + if err := fs.RemoveAllHard(path); err != nil { return fmt.Errorf("cannot remove %q: %s", path, err) } } diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index 6c00e1400..89e098d7d 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -2,7 +2,6 @@ package storage import ( "fmt" - "os" "path/filepath" "sync" "sync/atomic" @@ -85,7 +84,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre timestampsPath := path + "/timestamps.bin" timestampsFile, err := filestream.Create(timestampsPath, nocache) if err != nil { - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create timestamps file: %s", err) } @@ -93,7 +92,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre valuesFile, err := filestream.Create(valuesPath, nocache) if err != nil { timestampsFile.MustClose() - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create values file: %s", err) } @@ -102,7 +101,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre if err != nil { timestampsFile.MustClose() valuesFile.MustClose() - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create index file: %s", err) } @@ -114,7 +113,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre timestampsFile.MustClose() valuesFile.MustClose() indexFile.MustClose() - _ = os.RemoveAll(path) + fs.MustRemoveAll(path) return fmt.Errorf("cannot create metaindex file: %s", err) } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 511c98eca..f8dcdd76b 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "os" "path/filepath" "sort" "sync" @@ -15,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" @@ -266,7 +266,7 @@ func (db *indexDB) decRef() { } logger.Infof("dropping indexDB %q", tbPath) - if err := os.RemoveAll(tbPath); err != nil { + if err := fs.RemoveAllHard(tbPath); err != nil { logger.Panicf("FATAL: cannot remove %q: %s", tbPath, err) } logger.Infof("indexDB %q has been dropped", tbPath) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 8dc827d06..224b4ccbf 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -213,10 +213,10 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str func (pt *partition) Drop() { logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath) - if err := os.RemoveAll(pt.smallPartsPath); err != nil { + if err := fs.RemoveAllHard(pt.smallPartsPath); err != nil { logger.Panicf("FATAL: cannot remove small parts directory %q: %s", pt.smallPartsPath, err) } - if err := os.RemoveAll(pt.bigPartsPath); err != nil { + if err := fs.RemoveAllHard(pt.bigPartsPath); err != nil { logger.Panicf("FATAL: cannot remove big parts directory %q: %s", pt.bigPartsPath, err) } @@ -1223,11 +1223,11 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { } txnDir := path + "/txn" - if err := os.RemoveAll(txnDir); err != nil { + if err := fs.RemoveAllHard(txnDir); err != nil { return nil, fmt.Errorf("cannot delete transaction directory %q: %s", txnDir, err) } tmpDir := path + "/tmp" - if err := os.RemoveAll(tmpDir); err != nil { + if err := fs.RemoveAllHard(tmpDir); err != nil { return nil, fmt.Errorf("cannot remove temporary directory %q: %s", tmpDir, err) } if err := createPartitionDirs(path); err != nil { @@ -1408,7 +1408,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str if err != nil { return fmt.Errorf("invalid path to remove: %s", err) } - if err := os.RemoveAll(path); err != nil { + if err := fs.RemoveAllHard(path); err != nil { return fmt.Errorf("cannot remove %q: %s", path, err) } } @@ -1438,7 +1438,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str } } else { // Just remove srcPath. - if err := os.RemoveAll(srcPath); err != nil { + if err := fs.RemoveAllHard(srcPath); err != nil { return fmt.Errorf("cannot remove %q: %s", srcPath, err) } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 4b8e77e8f..e643444d6 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -893,7 +893,7 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Ca for _, tn := range tableNames[:len(tableNames)-2] { pathToRemove := path + "/" + tn logger.Infof("removing obsolete indexdb dir %q...", pathToRemove) - if err := os.RemoveAll(pathToRemove); err != nil { + if err := fs.RemoveAllHard(pathToRemove); err != nil { return nil, nil, fmt.Errorf("cannot remove obsolete indexdb dir %q: %s", pathToRemove, err) } logger.Infof("removed obsolete indexdb dir %q", pathToRemove)