mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/persistentqueue: reset chunk file when the persistent queue is empty
This commit is contained in:
parent
6cdc97a53f
commit
c2e602286c
2 changed files with 66 additions and 16 deletions
|
@ -18,8 +18,8 @@ type FastQueue struct {
|
||||||
// or when MustClose is called.
|
// or when MustClose is called.
|
||||||
cond sync.Cond
|
cond sync.Cond
|
||||||
|
|
||||||
// q is file-based queue
|
// pq is file-based queue
|
||||||
q *Queue
|
pq *Queue
|
||||||
|
|
||||||
// ch is in-memory queue
|
// ch is in-memory queue
|
||||||
ch chan *bytesutil.ByteBuffer
|
ch chan *bytesutil.ByteBuffer
|
||||||
|
@ -33,9 +33,9 @@ type FastQueue struct {
|
||||||
//
|
//
|
||||||
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
|
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
|
||||||
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int) *FastQueue {
|
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int) *FastQueue {
|
||||||
q := MustOpen(path, name)
|
pq := MustOpen(path, name)
|
||||||
fq := &FastQueue{
|
fq := &FastQueue{
|
||||||
q: q,
|
pq: pq,
|
||||||
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
||||||
}
|
}
|
||||||
fq.cond.L = &fq.mu
|
fq.cond.L = &fq.mu
|
||||||
|
@ -54,20 +54,20 @@ func (fq *FastQueue) MustClose() {
|
||||||
fq.mustStop = true
|
fq.mustStop = true
|
||||||
fq.cond.Broadcast()
|
fq.cond.Broadcast()
|
||||||
|
|
||||||
// flush blocks from fq.ch to fq.q, so they can be persisted
|
// flush blocks from fq.ch to fq.pq, so they can be persisted
|
||||||
fq.flushInmemoryBlocksToFileLocked()
|
fq.flushInmemoryBlocksToFileLocked()
|
||||||
|
|
||||||
// Close fq.q
|
// Close fq.pq
|
||||||
fq.q.MustClose()
|
fq.pq.MustClose()
|
||||||
|
|
||||||
logger.Infof("closed fast persistent queue at %q", fq.q.dir)
|
logger.Infof("closed fast persistent queue at %q", fq.pq.dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
|
func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
|
||||||
// fq.mu must be locked by the caller.
|
// fq.mu must be locked by the caller.
|
||||||
for len(fq.ch) > 0 {
|
for len(fq.ch) > 0 {
|
||||||
bb := <-fq.ch
|
bb := <-fq.ch
|
||||||
fq.q.MustWriteBlock(bb.B)
|
fq.pq.MustWriteBlock(bb.B)
|
||||||
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
||||||
blockBufPool.Put(bb)
|
blockBufPool.Put(bb)
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ func (fq *FastQueue) GetPendingBytes() uint64 {
|
||||||
defer fq.mu.Unlock()
|
defer fq.mu.Unlock()
|
||||||
|
|
||||||
n := fq.pendingInmemoryBytes
|
n := fq.pendingInmemoryBytes
|
||||||
n += fq.q.GetPendingBytes()
|
n += fq.pq.GetPendingBytes()
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,19 +96,19 @@ func (fq *FastQueue) MustWriteBlock(block []byte) {
|
||||||
fq.mu.Lock()
|
fq.mu.Lock()
|
||||||
defer fq.mu.Unlock()
|
defer fq.mu.Unlock()
|
||||||
|
|
||||||
if n := fq.q.GetPendingBytes(); n > 0 {
|
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||||
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
|
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
|
||||||
// So put the block to file-based queue.
|
// So put the block to file-based queue.
|
||||||
if len(fq.ch) > 0 {
|
if len(fq.ch) > 0 {
|
||||||
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
|
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
|
||||||
}
|
}
|
||||||
fq.q.MustWriteBlock(block)
|
fq.pq.MustWriteBlock(block)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(fq.ch) == cap(fq.ch) {
|
if len(fq.ch) == cap(fq.ch) {
|
||||||
// There is no space in the in-memory queue. Put the data to file-based queue.
|
// There is no space in the in-memory queue. Put the data to file-based queue.
|
||||||
fq.flushInmemoryBlocksToFileLocked()
|
fq.flushInmemoryBlocksToFileLocked()
|
||||||
fq.q.MustWriteBlock(block)
|
fq.pq.MustWriteBlock(block)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// There is enough space in the in-memory queue.
|
// There is enough space in the in-memory queue.
|
||||||
|
@ -132,7 +132,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||||
return dst, false
|
return dst, false
|
||||||
}
|
}
|
||||||
if len(fq.ch) > 0 {
|
if len(fq.ch) > 0 {
|
||||||
if n := fq.q.GetPendingBytes(); n > 0 {
|
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||||
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is empty; it contains %d pending bytes", n)
|
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is empty; it contains %d pending bytes", n)
|
||||||
}
|
}
|
||||||
bb := <-fq.ch
|
bb := <-fq.ch
|
||||||
|
@ -141,11 +141,12 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||||
blockBufPool.Put(bb)
|
blockBufPool.Put(bb)
|
||||||
return dst, true
|
return dst, true
|
||||||
}
|
}
|
||||||
if n := fq.q.GetPendingBytes(); n > 0 {
|
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||||
return fq.q.MustReadBlock(dst)
|
return fq.pq.MustReadBlock(dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are no blocks. Wait for new block.
|
// There are no blocks. Wait for new block.
|
||||||
|
fq.pq.ResetIfEmpty()
|
||||||
fq.cond.Wait()
|
fq.cond.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,55 @@ type Queue struct {
|
||||||
mustStop bool
|
mustStop bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetIfEmpty resets q if it is empty.
|
||||||
|
//
|
||||||
|
// This is needed in order to remove chunk file associated with empty q.
|
||||||
|
func (q *Queue) ResetIfEmpty() {
|
||||||
|
q.mu.Lock()
|
||||||
|
defer q.mu.Unlock()
|
||||||
|
|
||||||
|
if q.readerOffset != q.writerOffset {
|
||||||
|
// The queue isn't empty.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if q.readerOffset < 16*1024*1024 {
|
||||||
|
// The file is too small to drop. Leave it as is in order to reduce filesystem load.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if q.readerPath != q.writerPath {
|
||||||
|
logger.Panicf("BUG: readerPath=%q doesn't match writerPath=%q", q.readerPath, q.writerPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.reader.MustClose()
|
||||||
|
q.writer.MustClose()
|
||||||
|
fs.MustRemoveAll(q.readerPath)
|
||||||
|
|
||||||
|
q.writerOffset = 0
|
||||||
|
q.writerLocalOffset = 0
|
||||||
|
q.writerFlushedOffset = 0
|
||||||
|
|
||||||
|
q.readerOffset = 0
|
||||||
|
q.readerLocalOffset = 0
|
||||||
|
|
||||||
|
q.writerPath = q.chunkFilePath(q.writerOffset)
|
||||||
|
w, err := filestream.Create(q.writerPath, false)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("FATAL: cannot create chunk file %q: %s", q.writerPath, err)
|
||||||
|
}
|
||||||
|
q.writer = w
|
||||||
|
|
||||||
|
q.readerPath = q.writerPath
|
||||||
|
r, err := filestream.Open(q.readerPath, true)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("FATAL: cannot open chunk file %q: %s", q.readerPath, err)
|
||||||
|
}
|
||||||
|
q.reader = r
|
||||||
|
|
||||||
|
if err := q.flushMetainfo(); err != nil {
|
||||||
|
logger.Panicf("FATAL: cannot flush metainfo: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetPendingBytes returns the number of pending bytes in the queue.
|
// GetPendingBytes returns the number of pending bytes in the queue.
|
||||||
func (q *Queue) GetPendingBytes() uint64 {
|
func (q *Queue) GetPendingBytes() uint64 {
|
||||||
q.mu.Lock()
|
q.mu.Lock()
|
||||||
|
|
Loading…
Reference in a new issue