diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 48330aa76..bb5276e9a 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -2,8 +2,10 @@ package persistentqueue import ( "sync" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -26,6 +28,11 @@ type FastQueue struct { pendingInmemoryBytes uint64 + // stopCh is used for stopping background workers such as inmemoryBlocksFlusher. + stopCh chan struct{} + lastInmemoryBlockReadTime uint64 + inmemoryBlocksFlusherWG sync.WaitGroup + mustStop bool } @@ -43,6 +50,14 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks), } fq.cond.L = &fq.mu + + fq.stopCh = make(chan struct{}) + fq.inmemoryBlocksFlusherWG.Add(1) + go func() { + defer fq.inmemoryBlocksFlusherWG.Done() + fq.inmemoryBlocksFlusher() + }() + logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks) return fq } @@ -51,6 +66,10 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int // // It is expected no new writers during and after the call. func (fq *FastQueue) MustClose() { + // Stop background flusher. + close(fq.stopCh) + fq.inmemoryBlocksFlusherWG.Wait() + fq.mu.Lock() defer fq.mu.Unlock() @@ -67,12 +86,38 @@ func (fq *FastQueue) MustClose() { logger.Infof("closed fast persistent queue at %q", fq.pq.dir) } +func (fq *FastQueue) inmemoryBlocksFlusher() { + t := time.NewTicker(time.Second) + defer t.Stop() + for { + select { + case <-fq.stopCh: + return + case <-t.C: + } + fq.flushInmemoryBlocksToFileIfNeeded() + } +} + +func (fq *FastQueue) flushInmemoryBlocksToFileIfNeeded() { + fq.mu.Lock() + defer fq.mu.Unlock() + if len(fq.ch) == 0 { + return + } + if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 { + return + } + fq.flushInmemoryBlocksToFileLocked() +} + func (fq *FastQueue) flushInmemoryBlocksToFileLocked() { // fq.mu must be locked by the caller. for len(fq.ch) > 0 { bb := <-fq.ch fq.pq.MustWriteBlock(bb.B) fq.pendingInmemoryBytes -= uint64(len(bb.B)) + fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() blockBufPool.Put(bb) } // Unblock all the potentially blocked readers, so they could proceed with reading file-based queue. @@ -143,6 +188,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { } bb := <-fq.ch fq.pendingInmemoryBytes -= uint64(len(bb.B)) + fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() dst = append(dst, bb.B...) blockBufPool.Put(bb) return dst, true diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 24e80858a..159f21c33 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -52,6 +53,8 @@ type Queue struct { writerLocalOffset uint64 writerFlushedOffset uint64 + lastMetainfoFlushTime uint64 + mustStop bool blocksDropped *metrics.Counter @@ -110,7 +113,7 @@ func (q *Queue) ResetIfEmpty() { } q.reader = r - if err := q.flushMetainfo(); err != nil { + if err := q.flushMetainfoLocked(); err != nil { logger.Panicf("FATAL: cannot flush metainfo: %s", err) } } @@ -236,7 +239,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB logger.Panicf("BUG: cannot parse hex %q: %s", fname, err) } if offset%q.chunkFileSize != 0 { - logger.Errorf("unexpected offset for chunk file %q: %d; it must divide by %d; removing the file", filepath, offset, q.chunkFileSize) + logger.Errorf("unexpected offset for chunk file %q: %d; it must be multiple of %d; removing the file", filepath, offset, q.chunkFileSize) fs.MustRemoveAll(filepath) continue } @@ -284,10 +287,15 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB q.writerLocalOffset = mi.WriterOffset % q.chunkFileSize q.writerFlushedOffset = mi.WriterOffset if fileSize := fs.MustFileSize(q.writerPath); fileSize != q.writerLocalOffset { - logger.Errorf("chunk file %q size doesn't match writer offset; file size %d bytes; writer offset: %d bytes", - q.writerPath, fileSize, q.writerLocalOffset) - fs.MustRemoveAll(q.writerPath) - continue + if fileSize < q.writerLocalOffset { + logger.Errorf("%q size (%d bytes) is smaller than the writer offset (%d bytes); removing the file", + q.writerPath, fileSize, q.writerLocalOffset) + fs.MustRemoveAll(q.writerPath) + continue + } + logger.Warnf("%q size (%d bytes) is bigger than writer offset (%d bytes); "+ + "this may be the case on unclean shutdown (OOM, `kill -9`, hardware reset); trying to fix it by adjusting fileSize to %d", + q.writerPath, fileSize, q.writerLocalOffset, q.writerLocalOffset) } w, err := filestream.OpenWriterAt(q.writerPath, int64(q.writerLocalOffset), false) if err != nil { @@ -331,7 +339,7 @@ func (q *Queue) MustClose() { q.reader = nil // Store metainfo - if err := q.flushMetainfo(); err != nil { + if err := q.flushMetainfoLocked(); err != nil { logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err) } } @@ -403,6 +411,7 @@ func (q *Queue) writeBlockLocked(block []byte) error { if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize { // Finalize the current chunk and start new one. q.writer.MustClose() + fs.MustSyncPath(q.writerPath) if n := q.writerOffset % q.chunkFileSize; n > 0 { q.writerOffset += (q.chunkFileSize - n) } @@ -414,9 +423,10 @@ func (q *Queue) writeBlockLocked(block []byte) error { return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err) } q.writer = w - if err := q.flushMetainfo(); err != nil { + if err := q.flushMetainfoLocked(); err != nil { return fmt.Errorf("cannot flush metainfo: %w", err) } + fs.MustSyncPath(q.dir) } // Write block len. @@ -435,7 +445,7 @@ func (q *Queue) writeBlockLocked(block []byte) error { } q.blocksWritten.Inc() q.bytesWritten.Add(len(block)) - return nil + return q.flushMetainfoIfNeededLocked() } // MustReadBlock appends the next block from q to dst and returns the result. @@ -462,6 +472,9 @@ func (q *Queue) MustReadBlock(dst []byte) ([]byte, bool) { data, err := q.readBlockLocked(dst) if err != nil { + // Skip the current chunk, since it may be broken. + q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize + _ = q.flushMetainfoLocked() logger.Panicf("FATAL: %s", err) } return data, true @@ -482,9 +495,10 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) { return dst, fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err) } q.reader = r - if err := q.flushMetainfo(); err != nil { + if err := q.flushMetainfoLocked(); err != nil { return dst, fmt.Errorf("cannot flush metainfo: %w", err) } + fs.MustSyncPath(q.dir) } // Read block len. @@ -508,6 +522,9 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) { } q.blocksRead.Inc() q.bytesRead.Add(int(blockLen)) + if err := q.flushMetainfoIfNeededLocked(); err != nil { + return dst, err + } return dst, nil } @@ -543,7 +560,19 @@ func (q *Queue) readFull(buf []byte) error { return nil } -func (q *Queue) flushMetainfo() error { +func (q *Queue) flushMetainfoIfNeededLocked() error { + t := fasttime.UnixTimestamp() + if t == q.lastMetainfoFlushTime { + return nil + } + if err := q.flushMetainfoLocked(); err != nil { + return fmt.Errorf("cannot flush metainfo: %w", err) + } + q.lastMetainfoFlushTime = t + return nil +} + +func (q *Queue) flushMetainfoLocked() error { mi := &metainfo{ Name: q.name, ReaderOffset: q.readerOffset, @@ -577,6 +606,7 @@ func (mi *metainfo) WriteToFile(path string) error { if err := ioutil.WriteFile(path, data, 0600); err != nil { return fmt.Errorf("cannot write persistent queue metainfo to %q: %w", path, err) } + fs.MustSyncPath(path) return nil }