lib/persistentqueue: code simplification after d455764a6f

This commit is contained in:
Aliaksandr Valialkin 2020-09-16 21:13:27 +03:00
parent a9205fe308
commit 9705ac5d7a

View file

@ -2,7 +2,6 @@ package persistentqueue
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -28,10 +27,7 @@ type FastQueue struct {
pendingInmemoryBytes uint64
// stopCh is used for stopping background workers such as inmemoryBlocksFlusher.
stopCh chan struct{}
lastInmemoryBlockReadTime uint64
inmemoryBlocksFlusherWG sync.WaitGroup
mustStop bool
}
@ -50,14 +46,7 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
}
fq.cond.L = &fq.mu
fq.stopCh = make(chan struct{})
fq.inmemoryBlocksFlusherWG.Add(1)
go func() {
defer fq.inmemoryBlocksFlusherWG.Done()
fq.inmemoryBlocksFlusher()
}()
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks)
return fq
}
@ -66,10 +55,6 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
//
// It is expected no new writers during and after the call.
func (fq *FastQueue) MustClose() {
// Stop background flusher.
close(fq.stopCh)
fq.inmemoryBlocksFlusherWG.Wait()
fq.mu.Lock()
defer fq.mu.Unlock()
@ -86,22 +71,7 @@ func (fq *FastQueue) MustClose() {
logger.Infof("closed fast persistent queue at %q", fq.pq.dir)
}
func (fq *FastQueue) inmemoryBlocksFlusher() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-fq.stopCh:
return
case <-t.C:
}
fq.flushInmemoryBlocksToFileIfNeeded()
}
}
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeeded() {
fq.mu.Lock()
defer fq.mu.Unlock()
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() {
if len(fq.ch) == 0 {
return
}
@ -147,6 +117,7 @@ func (fq *FastQueue) MustWriteBlock(block []byte) {
fq.mu.Lock()
defer fq.mu.Unlock()
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.