From e22fdc1073e1220f513b7be1d25d0f51961b91eb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 28 Feb 2020 19:57:39 +0200 Subject: [PATCH] lib/persistentqueue: reset chunk file when the persistent queue is empty --- lib/persistentqueue/fastqueue.go | 33 ++++++++--------- lib/persistentqueue/persistentqueue.go | 49 ++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 9cd217eff..7f578d7ce 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -18,8 +18,8 @@ type FastQueue struct { // or when MustClose is called. cond sync.Cond - // q is file-based queue - q *Queue + // pq is file-based queue + pq *Queue // ch is in-memory queue ch chan *bytesutil.ByteBuffer @@ -33,9 +33,9 @@ type FastQueue struct { // // It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence. func MustOpenFastQueue(path, name string, maxInmemoryBlocks int) *FastQueue { - q := MustOpen(path, name) + pq := MustOpen(path, name) fq := &FastQueue{ - q: q, + pq: pq, ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), } fq.cond.L = &fq.mu @@ -54,20 +54,20 @@ func (fq *FastQueue) MustClose() { fq.mustStop = true fq.cond.Broadcast() - // flush blocks from fq.ch to fq.q, so they can be persisted + // flush blocks from fq.ch to fq.pq, so they can be persisted fq.flushInmemoryBlocksToFileLocked() - // Close fq.q - fq.q.MustClose() + // Close fq.pq + fq.pq.MustClose() - logger.Infof("closed fast persistent queue at %q", fq.q.dir) + logger.Infof("closed fast persistent queue at %q", fq.pq.dir) } func (fq *FastQueue) flushInmemoryBlocksToFileLocked() { // fq.mu must be locked by the caller. for len(fq.ch) > 0 { bb := <-fq.ch - fq.q.MustWriteBlock(bb.B) + fq.pq.MustWriteBlock(bb.B) fq.pendingInmemoryBytes -= uint64(len(bb.B)) blockBufPool.Put(bb) } @@ -79,7 +79,7 @@ func (fq *FastQueue) GetPendingBytes() uint64 { defer fq.mu.Unlock() n := fq.pendingInmemoryBytes - n += fq.q.GetPendingBytes() + n += fq.pq.GetPendingBytes() return n } @@ -96,19 +96,19 @@ func (fq *FastQueue) MustWriteBlock(block []byte) { fq.mu.Lock() defer fq.mu.Unlock() - if n := fq.q.GetPendingBytes(); n > 0 { + if n := fq.pq.GetPendingBytes(); n > 0 { // The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet. // So put the block to file-based queue. if len(fq.ch) > 0 { logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n) } - fq.q.MustWriteBlock(block) + fq.pq.MustWriteBlock(block) return } if len(fq.ch) == cap(fq.ch) { // There is no space in the in-memory queue. Put the data to file-based queue. fq.flushInmemoryBlocksToFileLocked() - fq.q.MustWriteBlock(block) + fq.pq.MustWriteBlock(block) return } // There is enough space in the in-memory queue. @@ -132,7 +132,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { return dst, false } if len(fq.ch) > 0 { - if n := fq.q.GetPendingBytes(); n > 0 { + if n := fq.pq.GetPendingBytes(); n > 0 { logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is empty; it contains %d pending bytes", n) } bb := <-fq.ch @@ -141,11 +141,12 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { blockBufPool.Put(bb) return dst, true } - if n := fq.q.GetPendingBytes(); n > 0 { - return fq.q.MustReadBlock(dst) + if n := fq.pq.GetPendingBytes(); n > 0 { + return fq.pq.MustReadBlock(dst) } // There are no blocks. Wait for new block. + fq.pq.ResetIfEmpty() fq.cond.Wait() } } diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index bb41f4563..e54f99802 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -53,6 +53,55 @@ type Queue struct { mustStop bool } +// ResetIfEmpty resets q if it is empty. +// +// This is needed in order to remove chunk file associated with empty q. +func (q *Queue) ResetIfEmpty() { + q.mu.Lock() + defer q.mu.Unlock() + + if q.readerOffset != q.writerOffset { + // The queue isn't empty. + return + } + if q.readerOffset < 16*1024*1024 { + // The file is too small to drop. Leave it as is in order to reduce filesystem load. + return + } + if q.readerPath != q.writerPath { + logger.Panicf("BUG: readerPath=%q doesn't match writerPath=%q", q.readerPath, q.writerPath) + } + + q.reader.MustClose() + q.writer.MustClose() + fs.MustRemoveAll(q.readerPath) + + q.writerOffset = 0 + q.writerLocalOffset = 0 + q.writerFlushedOffset = 0 + + q.readerOffset = 0 + q.readerLocalOffset = 0 + + q.writerPath = q.chunkFilePath(q.writerOffset) + w, err := filestream.Create(q.writerPath, false) + if err != nil { + logger.Panicf("FATAL: cannot create chunk file %q: %s", q.writerPath, err) + } + q.writer = w + + q.readerPath = q.writerPath + r, err := filestream.Open(q.readerPath, true) + if err != nil { + logger.Panicf("FATAL: cannot open chunk file %q: %s", q.readerPath, err) + } + q.reader = r + + if err := q.flushMetainfo(); err != nil { + logger.Panicf("FATAL: cannot flush metainfo: %s", err) + } +} + // GetPendingBytes returns the number of pending bytes in the queue. func (q *Queue) GetPendingBytes() uint64 { q.mu.Lock()