From 9705ac5d7a087847375b35eca7970d5843e0d336 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 16 Sep 2020 21:13:27 +0300 Subject: [PATCH] lib/persistentqueue: code simplification after d455764a6f0351645803f630e51122b900a6051d --- lib/persistentqueue/fastqueue.go | 35 +++----------------------------- 1 file changed, 3 insertions(+), 32 deletions(-) diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index bb5276e9a..18aba7fbc 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -2,7 +2,6 @@ package persistentqueue import ( "sync" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -28,10 +27,7 @@ type FastQueue struct { pendingInmemoryBytes uint64 - // stopCh is used for stopping background workers such as inmemoryBlocksFlusher. - stopCh chan struct{} lastInmemoryBlockReadTime uint64 - inmemoryBlocksFlusherWG sync.WaitGroup mustStop bool } @@ -50,14 +46,7 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), } fq.cond.L = &fq.mu - - fq.stopCh = make(chan struct{}) - fq.inmemoryBlocksFlusherWG.Add(1) - go func() { - defer fq.inmemoryBlocksFlusherWG.Done() - fq.inmemoryBlocksFlusher() - }() - + fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks) return fq } @@ -66,10 +55,6 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int // // It is expected no new writers during and after the call. func (fq *FastQueue) MustClose() { - // Stop background flusher. - close(fq.stopCh) - fq.inmemoryBlocksFlusherWG.Wait() - fq.mu.Lock() defer fq.mu.Unlock() @@ -86,22 +71,7 @@ func (fq *FastQueue) MustClose() { logger.Infof("closed fast persistent queue at %q", fq.pq.dir) } -func (fq *FastQueue) inmemoryBlocksFlusher() { - t := time.NewTicker(time.Second) - defer t.Stop() - for { - select { - case <-fq.stopCh: - return - case <-t.C: - } - fq.flushInmemoryBlocksToFileIfNeeded() - } -} - -func (fq *FastQueue) flushInmemoryBlocksToFileIfNeeded() { - fq.mu.Lock() - defer fq.mu.Unlock() +func (fq *FastQueue) flushInmemoryBlocksToFileIfNeededLocked() { if len(fq.ch) == 0 { return } @@ -147,6 +117,7 @@ func (fq *FastQueue) MustWriteBlock(block []byte) { fq.mu.Lock() defer fq.mu.Unlock() + fq.flushInmemoryBlocksToFileIfNeededLocked() if n := fq.pq.GetPendingBytes(); n > 0 { // The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet. // So put the block to file-based queue.