lib/storage: atomically remove parts inside partitions

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038
This commit is contained in:
Aliaksandr Valialkin 2022-09-13 15:28:01 +03:00
parent db4f0fe6fc
commit 68e32b0764
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -1394,17 +1394,14 @@ func (pt *partition) removeStaleParts() {
}
// Physically remove stale parts under snapshotLock in order to provide
// consistent snapshots with partition.CreateSnapshot().
// consistent snapshots with table.CreateSnapshot().
pt.snapshotLock.RLock()
var removeWG sync.WaitGroup
for pw := range m {
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.retentionMsecs/1000)
removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done)
fs.MustRemoveDirAtomic(pw.p.path)
}
removeWG.Wait()
// There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath,
// since they should be automatically called inside fs.MustRemoveAllWithDoneCallback.
// since they should be automatically called inside fs.MustRemoveDirAtomic().
pt.snapshotLock.RUnlock()
@ -1554,6 +1551,7 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, err
}
fs.MustRemoveTemporaryDirs(path)
d, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("cannot open directory %q: %w", path, err)
@ -1568,9 +1566,9 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
}
txnDir := path + "/txn"
fs.MustRemoveAll(txnDir)
fs.MustRemoveDirAtomic(txnDir)
tmpDir := path + "/tmp"
fs.MustRemoveAll(tmpDir)
fs.MustRemoveDirAtomic(tmpDir)
if err := createPartitionDirs(path); err != nil {
return nil, fmt.Errorf("cannot create directories for partition %q: %w", path, err)
}
@ -1596,7 +1594,7 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
if fs.IsEmptyDir(partPath) {
// Remove empty directory, which can be left after unclean shutdown on NFS.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
fs.MustRemoveAll(partPath)
fs.MustRemoveDirAtomic(partPath)
continue
}
startTime := time.Now()
@ -1761,14 +1759,12 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
}
// Remove old paths. It is OK if certain paths don't exist.
var removeWG sync.WaitGroup
for _, path := range rmPaths {
path, err := validatePath(pathPrefix1, pathPrefix2, path)
if err != nil {
return fmt.Errorf("invalid path to remove: %w", err)
}
removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(path, removeWG.Done)
fs.MustRemoveDirAtomic(path)
}
// Move the new part to new directory.
@ -1797,8 +1793,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
}
} else {
// Just remove srcPath.
removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(srcPath, removeWG.Done)
fs.MustRemoveDirAtomic(srcPath)
}
// Flush pathPrefix* directory metadata to the underying storage,
@ -1809,12 +1804,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
pendingTxnDeletionsWG.Add(1)
go func() {
defer pendingTxnDeletionsWG.Done()
// Remove the transaction file only after all the source paths are deleted.
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
removeWG.Wait()
// There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal,
// since it is already called by fs.MustRemoveAllWithDoneCallback.
// since it is already called by fs.MustRemoveDirAtomic.
if err := os.Remove(txnPath); err != nil {
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)