mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/persistentqueue: protect from multiple concurrent opening for the same persistent queue
This commit is contained in:
parent
bed25e3c24
commit
c5ef0e6327
1 changed files with 29 additions and 0 deletions
|
@ -35,6 +35,8 @@ type Queue struct {
|
|||
dir string
|
||||
name string
|
||||
|
||||
flockF *os.File
|
||||
|
||||
// mu protects all the fields below.
|
||||
mu sync.Mutex
|
||||
|
||||
|
@ -156,7 +158,16 @@ func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes ui
|
|||
return q
|
||||
}
|
||||
|
||||
func mustCreateFlockFile(path string) *os.File {
|
||||
f, err := fs.CreateFlockFile(path)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: %s", err)
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) {
|
||||
// Protect from concurrent opens.
|
||||
var q Queue
|
||||
q.chunkFileSize = chunkFileSize
|
||||
q.maxBlockSize = maxBlockSize
|
||||
|
@ -187,6 +198,13 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
q.flockF = mustCreateFlockFile(path)
|
||||
mustCloseFlockF := true
|
||||
defer func() {
|
||||
if mustCloseFlockF {
|
||||
_ = q.flockF.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// Read metainfo.
|
||||
var mi metainfo
|
||||
|
@ -230,6 +248,10 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
// skip metainfo file
|
||||
continue
|
||||
}
|
||||
if fname == "flock.lock" {
|
||||
// skip flock file
|
||||
continue
|
||||
}
|
||||
if !chunkFileNameRegex.MatchString(fname) {
|
||||
logger.Errorf("skipping unknown file %q", filepath)
|
||||
continue
|
||||
|
@ -314,6 +336,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
cleanOnError()
|
||||
return nil, fmt.Errorf("couldn't find chunk file for writing in %q", q.dir)
|
||||
}
|
||||
mustCloseFlockF = false
|
||||
return &q, nil
|
||||
}
|
||||
|
||||
|
@ -342,6 +365,12 @@ func (q *Queue) MustClose() {
|
|||
if err := q.flushMetainfoLocked(); err != nil {
|
||||
logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err)
|
||||
}
|
||||
|
||||
// Close flockF
|
||||
if err := q.flockF.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close flock file: %s", err)
|
||||
}
|
||||
q.flockF = nil
|
||||
}
|
||||
|
||||
func (q *Queue) chunkFilePath(offset uint64) string {
|
||||
|
|
Loading…
Reference in a new issue