mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-19 15:30:17 +00:00
lib/storage: physically remove stale parts
Previously they were removed from partition struct, but the corresponding directories weren't removed.
This is a follow-up for 46dba00756
This commit is contained in:
parent
b480585905
commit
c0511144e3
1 changed files with 19 additions and 5 deletions
|
@ -1347,11 +1347,25 @@ func (pt *partition) removeStaleParts() {
|
||||||
logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedSmallParts+removedBigParts, len(m))
|
logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedSmallParts+removedBigParts, len(m))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove partition references from removed parts, so they are eventually deleted when nobody reads from them.
|
// Physically remove stale parts under snapshotLock in order to provide
|
||||||
|
// consistent snapshots with partition.CreateSnapshot().
|
||||||
|
pt.snapshotLock.RLock()
|
||||||
|
var removeWG sync.WaitGroup
|
||||||
for pw := range m {
|
for pw := range m {
|
||||||
|
removeWG.Add(1)
|
||||||
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000)
|
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000)
|
||||||
|
fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done)
|
||||||
|
}
|
||||||
|
removeWG.Wait()
|
||||||
|
fs.MustSyncPath(pt.smallPartsPath)
|
||||||
|
fs.MustSyncPath(pt.bigPartsPath)
|
||||||
|
pt.snapshotLock.RUnlock()
|
||||||
|
|
||||||
|
// Remove partition references from removed parts.
|
||||||
|
for pw := range m {
|
||||||
pw.decRef()
|
pw.decRef()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPartsToMerge returns optimal parts to merge from pws.
|
// getPartsToMerge returns optimal parts to merge from pws.
|
||||||
|
@ -1727,16 +1741,16 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
|
||||||
fs.MustRemoveAll(srcPath)
|
fs.MustRemoveAll(srcPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush pathPrefix* directory metadata to the underying storage.
|
|
||||||
fs.MustSyncPath(pathPrefix1)
|
|
||||||
fs.MustSyncPath(pathPrefix2)
|
|
||||||
|
|
||||||
pendingTxnDeletionsWG.Add(1)
|
pendingTxnDeletionsWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer pendingTxnDeletionsWG.Done()
|
defer pendingTxnDeletionsWG.Done()
|
||||||
// Remove the transaction file only after all the source paths are deleted.
|
// Remove the transaction file only after all the source paths are deleted.
|
||||||
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||||
removeWG.Wait()
|
removeWG.Wait()
|
||||||
|
|
||||||
|
// Flush pathPrefix* directory metadata to the underying storage.
|
||||||
|
fs.MustSyncPath(pathPrefix1)
|
||||||
|
fs.MustSyncPath(pathPrefix2)
|
||||||
if err := os.Remove(txnPath); err != nil {
|
if err := os.Remove(txnPath); err != nil {
|
||||||
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
|
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue