This commit is contained in:
Aliaksandr Valialkin 2020-05-16 09:31:46 +03:00
parent 8cb35974af
commit 4e850cd6a7
2 changed files with 11 additions and 9 deletions

View file

@ -75,6 +75,8 @@ func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
fq.pendingInmemoryBytes -= uint64(len(bb.B))
blockBufPool.Put(bb)
}
// Unblock all the potentially blocked readers, so they could proceed with reading file-based queue.
fq.cond.Broadcast()
}
// GetPendingBytes returns the number of pending bytes in the fq.
@ -120,10 +122,10 @@ func (fq *FastQueue) MustWriteBlock(block []byte) {
bb.B = append(bb.B[:0], block...)
fq.ch <- bb
fq.pendingInmemoryBytes += uint64(len(block))
if len(fq.ch) >= 1 {
// Notify potentially blocked reader
fq.cond.Signal()
}
// Notify potentially blocked reader.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for the context.
fq.cond.Signal()
}
// MustReadBlock reads the next block from fq to dst and returns it.
@ -137,7 +139,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
if len(fq.ch) > 0 {
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is empty; it contains %d pending bytes", n)
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))

View file

@ -383,13 +383,13 @@ func (q *Queue) MustWriteBlock(block []byte) {
return
}
}
mustNotifyReader := q.readerOffset == q.writerOffset
if err := q.writeBlockLocked(block); err != nil {
logger.Panicf("FATAL: %s", err)
}
if mustNotifyReader {
q.cond.Signal()
}
// Notify blocked reader if any.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/484 for details.
q.cond.Signal()
}
var blockBufPool bytesutil.ByteBufferPool