From c5ef0e632790dfa30f905e79ad0074906ea5304c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 23 Sep 2020 02:17:28 +0300 Subject: [PATCH] lib/persistentqueue: protect from multiple concurrent opening for the same persistent queue --- lib/persistentqueue/persistentqueue.go | 29 ++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index c003a3507..87e4b1a55 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -35,6 +35,8 @@ type Queue struct { dir string name string + flockF *os.File + // mu protects all the fields below. mu sync.Mutex @@ -156,7 +158,16 @@ func mustOpen(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes ui return q } +func mustCreateFlockFile(path string) *os.File { + f, err := fs.CreateFlockFile(path) + if err != nil { + logger.Panicf("FATAL: %s", err) + } + return f +} + func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*Queue, error) { + // Protect from concurrent opens. var q Queue q.chunkFileSize = chunkFileSize q.maxBlockSize = maxBlockSize @@ -187,6 +198,13 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, fmt.Errorf("cannot create directory %q: %w", path, err) } + q.flockF = mustCreateFlockFile(path) + mustCloseFlockF := true + defer func() { + if mustCloseFlockF { + _ = q.flockF.Close() + } + }() // Read metainfo. var mi metainfo @@ -230,6 +248,10 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB // skip metainfo file continue } + if fname == "flock.lock" { + // skip flock file + continue + } if !chunkFileNameRegex.MatchString(fname) { logger.Errorf("skipping unknown file %q", filepath) continue @@ -314,6 +336,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB cleanOnError() return nil, fmt.Errorf("couldn't find chunk file for writing in %q", q.dir) } + mustCloseFlockF = false return &q, nil } @@ -342,6 +365,12 @@ func (q *Queue) MustClose() { if err := q.flushMetainfoLocked(); err != nil { logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err) } + + // Close flockF + if err := q.flockF.Close(); err != nil { + logger.Panicf("FATAL: cannot close flock file: %s", err) + } + q.flockF = nil } func (q *Queue) chunkFilePath(offset uint64) string {