From 4e850cd6a7a556471ec956aa370fabceda4cb77c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 16 May 2020 09:31:46 +0300 Subject: [PATCH] lib/persistentqueue: a follow-up for https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 --- lib/persistentqueue/fastqueue.go | 12 +++++++----- lib/persistentqueue/persistentqueue.go | 8 ++++---- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 5d7f464d0..48330aa76 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -75,6 +75,8 @@ func (fq *FastQueue) flushInmemoryBlocksToFileLocked() { fq.pendingInmemoryBytes -= uint64(len(bb.B)) blockBufPool.Put(bb) } + // Unblock all the potentially blocked readers, so they could proceed with reading file-based queue. + fq.cond.Broadcast() } // GetPendingBytes returns the number of pending bytes in the fq. @@ -120,10 +122,10 @@ func (fq *FastQueue) MustWriteBlock(block []byte) { bb.B = append(bb.B[:0], block...) fq.ch <- bb fq.pendingInmemoryBytes += uint64(len(block)) - if len(fq.ch) >= 1 { - // Notify potentially blocked reader - fq.cond.Signal() - } + + // Notify potentially blocked reader. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for the context. + fq.cond.Signal() } // MustReadBlock reads the next block from fq to dst and returns it. @@ -137,7 +139,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { } if len(fq.ch) > 0 { if n := fq.pq.GetPendingBytes(); n > 0 { - logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is empty; it contains %d pending bytes", n) + logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n) } bb := <-fq.ch fq.pendingInmemoryBytes -= uint64(len(bb.B)) diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 56e13aada..a6db60ee9 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -383,13 +383,13 @@ func (q *Queue) MustWriteBlock(block []byte) { return } } - mustNotifyReader := q.readerOffset == q.writerOffset if err := q.writeBlockLocked(block); err != nil { logger.Panicf("FATAL: %s", err) } - if mustNotifyReader { - q.cond.Signal() - } + + // Notify blocked reader if any. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for details. + q.cond.Signal() } var blockBufPool bytesutil.ByteBufferPool