mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/{mergeset,storage}: make sure pending transaction deletions are finished before and after runTransactions
call.
`runTransactions` call issues async deletions for transaction files. The previously issued transaction deletions can race with the next call to `runTransactions`. Prevent this by waiting until all the pending transaction deletions are funished in the beginning of `runTransactions`. Also make sure that all the pending transaction deletions are finished before returning from `runTransactions`.
This commit is contained in:
parent
7c0dd85a7c
commit
639967db59
2 changed files with 21 additions and 0 deletions
|
@ -1062,6 +1062,12 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func runTransactions(txnLock *sync.RWMutex, path string) error {
|
func runTransactions(txnLock *sync.RWMutex, path string) error {
|
||||||
|
// Wait until all the previous pending transaction deletions are finished.
|
||||||
|
pendingTxnDeletionsWG.Wait()
|
||||||
|
|
||||||
|
// Make sure all the current transaction deletions are finished before exiting.
|
||||||
|
defer pendingTxnDeletionsWG.Wait()
|
||||||
|
|
||||||
txnDir := path + "/txn"
|
txnDir := path + "/txn"
|
||||||
d, err := os.Open(txnDir)
|
d, err := os.Open(txnDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1156,7 +1162,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
||||||
// Flush pathPrefix directory metadata to the underying storage.
|
// Flush pathPrefix directory metadata to the underying storage.
|
||||||
fs.MustSyncPath(pathPrefix)
|
fs.MustSyncPath(pathPrefix)
|
||||||
|
|
||||||
|
pendingTxnDeletionsWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer pendingTxnDeletionsWG.Done()
|
||||||
// Remove the transaction file only after all the source paths are deleted.
|
// Remove the transaction file only after all the source paths are deleted.
|
||||||
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||||
removeWG.Wait()
|
removeWG.Wait()
|
||||||
|
@ -1168,6 +1176,8 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var pendingTxnDeletionsWG syncwg.WaitGroup
|
||||||
|
|
||||||
func validatePath(pathPrefix, path string) (string, error) {
|
func validatePath(pathPrefix, path string) (string, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1393,6 +1394,12 @@ func (pt *partition) createSnapshot(srcDir, dstDir string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path string) error {
|
func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path string) error {
|
||||||
|
// Wait until all the previous pending transaction deletions are finished.
|
||||||
|
pendingTxnDeletionsWG.Wait()
|
||||||
|
|
||||||
|
// Make sure all the current transaction deletions are finished before exiting.
|
||||||
|
defer pendingTxnDeletionsWG.Wait()
|
||||||
|
|
||||||
txnDir := path + "/txn"
|
txnDir := path + "/txn"
|
||||||
d, err := os.Open(txnDir)
|
d, err := os.Open(txnDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1494,7 +1501,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
|
||||||
fs.MustSyncPath(pathPrefix1)
|
fs.MustSyncPath(pathPrefix1)
|
||||||
fs.MustSyncPath(pathPrefix2)
|
fs.MustSyncPath(pathPrefix2)
|
||||||
|
|
||||||
|
pendingTxnDeletionsWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer pendingTxnDeletionsWG.Done()
|
||||||
// Remove the transaction file only after all the source paths are deleted.
|
// Remove the transaction file only after all the source paths are deleted.
|
||||||
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||||
removeWG.Wait()
|
removeWG.Wait()
|
||||||
|
@ -1506,6 +1515,8 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var pendingTxnDeletionsWG syncwg.WaitGroup
|
||||||
|
|
||||||
func validatePath(pathPrefix1, pathPrefix2, path string) (string, error) {
|
func validatePath(pathPrefix1, pathPrefix2, path string) (string, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue