mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/persistentqueue: code simplification after d455764a6f
This commit is contained in:
parent
d455764a6f
commit
39dee12ed7
1 changed files with 3 additions and 32 deletions
|
@ -2,7 +2,6 @@ package persistentqueue
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
|
@ -28,10 +27,7 @@ type FastQueue struct {
|
|||
|
||||
pendingInmemoryBytes uint64
|
||||
|
||||
// stopCh is used for stopping background workers such as inmemoryBlocksFlusher.
|
||||
stopCh chan struct{}
|
||||
lastInmemoryBlockReadTime uint64
|
||||
inmemoryBlocksFlusherWG sync.WaitGroup
|
||||
|
||||
mustStop bool
|
||||
}
|
||||
|
@ -50,14 +46,7 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
|
|||
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
||||
}
|
||||
fq.cond.L = &fq.mu
|
||||
|
||||
fq.stopCh = make(chan struct{})
|
||||
fq.inmemoryBlocksFlusherWG.Add(1)
|
||||
go func() {
|
||||
defer fq.inmemoryBlocksFlusherWG.Done()
|
||||
fq.inmemoryBlocksFlusher()
|
||||
}()
|
||||
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks)
|
||||
return fq
|
||||
}
|
||||
|
@ -66,10 +55,6 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
|
|||
//
|
||||
// It is expected no new writers during and after the call.
|
||||
func (fq *FastQueue) MustClose() {
|
||||
// Stop background flusher.
|
||||
close(fq.stopCh)
|
||||
fq.inmemoryBlocksFlusherWG.Wait()
|
||||
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
|
||||
|
@ -86,22 +71,7 @@ func (fq *FastQueue) MustClose() {
|
|||
logger.Infof("closed fast persistent queue at %q", fq.pq.dir)
|
||||
}
|
||||
|
||||
func (fq *FastQueue) inmemoryBlocksFlusher() {
|
||||
t := time.NewTicker(time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-fq.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
fq.flushInmemoryBlocksToFileIfNeeded()
|
||||
}
|
||||
}
|
||||
|
||||
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeeded() {
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() {
|
||||
if len(fq.ch) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -147,6 +117,7 @@ func (fq *FastQueue) MustWriteBlock(block []byte) {
|
|||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
|
||||
fq.flushInmemoryBlocksToFileIfNeededLocked()
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
|
||||
// So put the block to file-based queue.
|
||||
|
|
Loading…
Reference in a new issue