mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/persistentqueue: make the persistent queue more durable against unclean shutdown (kill -9, OOM, hard reset)
The strategy is: - Periodical flushing of inmemory blocks to files, so they aren't lost on unclean shutdown. - Periodical syncing of metadata for persisted queues, so the metadata remains in sync with the persisted data. - Automatic adjusting of too big chunk size when opening the queue. The chunk size may be bigger than the writer offset after unclean shutdown. - Skipping of broken chunk file if it cannot be read. - Fsyncing finalized chunk files. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/687
This commit is contained in:
parent
ffadf035fa
commit
d455764a6f
2 changed files with 87 additions and 11 deletions
|
@ -2,8 +2,10 @@ package persistentqueue
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
|
@ -26,6 +28,11 @@ type FastQueue struct {
|
|||
|
||||
pendingInmemoryBytes uint64
|
||||
|
||||
// stopCh is used for stopping background workers such as inmemoryBlocksFlusher.
|
||||
stopCh chan struct{}
|
||||
lastInmemoryBlockReadTime uint64
|
||||
inmemoryBlocksFlusherWG sync.WaitGroup
|
||||
|
||||
mustStop bool
|
||||
}
|
||||
|
||||
|
@ -43,6 +50,14 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
|
|||
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
||||
}
|
||||
fq.cond.L = &fq.mu
|
||||
|
||||
fq.stopCh = make(chan struct{})
|
||||
fq.inmemoryBlocksFlusherWG.Add(1)
|
||||
go func() {
|
||||
defer fq.inmemoryBlocksFlusherWG.Done()
|
||||
fq.inmemoryBlocksFlusher()
|
||||
}()
|
||||
|
||||
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks)
|
||||
return fq
|
||||
}
|
||||
|
@ -51,6 +66,10 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
|
|||
//
|
||||
// It is expected no new writers during and after the call.
|
||||
func (fq *FastQueue) MustClose() {
|
||||
// Stop background flusher.
|
||||
close(fq.stopCh)
|
||||
fq.inmemoryBlocksFlusherWG.Wait()
|
||||
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
|
||||
|
@ -67,12 +86,38 @@ func (fq *FastQueue) MustClose() {
|
|||
logger.Infof("closed fast persistent queue at %q", fq.pq.dir)
|
||||
}
|
||||
|
||||
func (fq *FastQueue) inmemoryBlocksFlusher() {
|
||||
t := time.NewTicker(time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-fq.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
fq.flushInmemoryBlocksToFileIfNeeded()
|
||||
}
|
||||
}
|
||||
|
||||
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeeded() {
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
if len(fq.ch) == 0 {
|
||||
return
|
||||
}
|
||||
if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 {
|
||||
return
|
||||
}
|
||||
fq.flushInmemoryBlocksToFileLocked()
|
||||
}
|
||||
|
||||
func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
|
||||
// fq.mu must be locked by the caller.
|
||||
for len(fq.ch) > 0 {
|
||||
bb := <-fq.ch
|
||||
fq.pq.MustWriteBlock(bb.B)
|
||||
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
blockBufPool.Put(bb)
|
||||
}
|
||||
// Unblock all the potentially blocked readers, so they could proceed with reading file-based queue.
|
||||
|
@ -143,6 +188,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
|||
}
|
||||
bb := <-fq.ch
|
||||
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
dst = append(dst, bb.B...)
|
||||
blockBufPool.Put(bb)
|
||||
return dst, true
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -52,6 +53,8 @@ type Queue struct {
|
|||
writerLocalOffset uint64
|
||||
writerFlushedOffset uint64
|
||||
|
||||
lastMetainfoFlushTime uint64
|
||||
|
||||
mustStop bool
|
||||
|
||||
blocksDropped *metrics.Counter
|
||||
|
@ -110,7 +113,7 @@ func (q *Queue) ResetIfEmpty() {
|
|||
}
|
||||
q.reader = r
|
||||
|
||||
if err := q.flushMetainfo(); err != nil {
|
||||
if err := q.flushMetainfoLocked(); err != nil {
|
||||
logger.Panicf("FATAL: cannot flush metainfo: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -236,7 +239,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
logger.Panicf("BUG: cannot parse hex %q: %s", fname, err)
|
||||
}
|
||||
if offset%q.chunkFileSize != 0 {
|
||||
logger.Errorf("unexpected offset for chunk file %q: %d; it must divide by %d; removing the file", filepath, offset, q.chunkFileSize)
|
||||
logger.Errorf("unexpected offset for chunk file %q: %d; it must be multiple of %d; removing the file", filepath, offset, q.chunkFileSize)
|
||||
fs.MustRemoveAll(filepath)
|
||||
continue
|
||||
}
|
||||
|
@ -284,10 +287,15 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
q.writerLocalOffset = mi.WriterOffset % q.chunkFileSize
|
||||
q.writerFlushedOffset = mi.WriterOffset
|
||||
if fileSize := fs.MustFileSize(q.writerPath); fileSize != q.writerLocalOffset {
|
||||
logger.Errorf("chunk file %q size doesn't match writer offset; file size %d bytes; writer offset: %d bytes",
|
||||
q.writerPath, fileSize, q.writerLocalOffset)
|
||||
fs.MustRemoveAll(q.writerPath)
|
||||
continue
|
||||
if fileSize < q.writerLocalOffset {
|
||||
logger.Errorf("%q size (%d bytes) is smaller than the writer offset (%d bytes); removing the file",
|
||||
q.writerPath, fileSize, q.writerLocalOffset)
|
||||
fs.MustRemoveAll(q.writerPath)
|
||||
continue
|
||||
}
|
||||
logger.Warnf("%q size (%d bytes) is bigger than writer offset (%d bytes); "+
|
||||
"this may be the case on unclean shutdown (OOM, `kill -9`, hardware reset); trying to fix it by adjusting fileSize to %d",
|
||||
q.writerPath, fileSize, q.writerLocalOffset, q.writerLocalOffset)
|
||||
}
|
||||
w, err := filestream.OpenWriterAt(q.writerPath, int64(q.writerLocalOffset), false)
|
||||
if err != nil {
|
||||
|
@ -331,7 +339,7 @@ func (q *Queue) MustClose() {
|
|||
q.reader = nil
|
||||
|
||||
// Store metainfo
|
||||
if err := q.flushMetainfo(); err != nil {
|
||||
if err := q.flushMetainfoLocked(); err != nil {
|
||||
logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -403,6 +411,7 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
|||
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
||||
// Finalize the current chunk and start new one.
|
||||
q.writer.MustClose()
|
||||
fs.MustSyncPath(q.writerPath)
|
||||
if n := q.writerOffset % q.chunkFileSize; n > 0 {
|
||||
q.writerOffset += (q.chunkFileSize - n)
|
||||
}
|
||||
|
@ -414,9 +423,10 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
|||
return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err)
|
||||
}
|
||||
q.writer = w
|
||||
if err := q.flushMetainfo(); err != nil {
|
||||
if err := q.flushMetainfoLocked(); err != nil {
|
||||
return fmt.Errorf("cannot flush metainfo: %w", err)
|
||||
}
|
||||
fs.MustSyncPath(q.dir)
|
||||
}
|
||||
|
||||
// Write block len.
|
||||
|
@ -435,7 +445,7 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
|||
}
|
||||
q.blocksWritten.Inc()
|
||||
q.bytesWritten.Add(len(block))
|
||||
return nil
|
||||
return q.flushMetainfoIfNeededLocked()
|
||||
}
|
||||
|
||||
// MustReadBlock appends the next block from q to dst and returns the result.
|
||||
|
@ -462,6 +472,9 @@ func (q *Queue) MustReadBlock(dst []byte) ([]byte, bool) {
|
|||
|
||||
data, err := q.readBlockLocked(dst)
|
||||
if err != nil {
|
||||
// Skip the current chunk, since it may be broken.
|
||||
q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize
|
||||
_ = q.flushMetainfoLocked()
|
||||
logger.Panicf("FATAL: %s", err)
|
||||
}
|
||||
return data, true
|
||||
|
@ -482,9 +495,10 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
|
|||
return dst, fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err)
|
||||
}
|
||||
q.reader = r
|
||||
if err := q.flushMetainfo(); err != nil {
|
||||
if err := q.flushMetainfoLocked(); err != nil {
|
||||
return dst, fmt.Errorf("cannot flush metainfo: %w", err)
|
||||
}
|
||||
fs.MustSyncPath(q.dir)
|
||||
}
|
||||
|
||||
// Read block len.
|
||||
|
@ -508,6 +522,9 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
|
|||
}
|
||||
q.blocksRead.Inc()
|
||||
q.bytesRead.Add(int(blockLen))
|
||||
if err := q.flushMetainfoIfNeededLocked(); err != nil {
|
||||
return dst, err
|
||||
}
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
|
@ -543,7 +560,19 @@ func (q *Queue) readFull(buf []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *Queue) flushMetainfo() error {
|
||||
func (q *Queue) flushMetainfoIfNeededLocked() error {
|
||||
t := fasttime.UnixTimestamp()
|
||||
if t == q.lastMetainfoFlushTime {
|
||||
return nil
|
||||
}
|
||||
if err := q.flushMetainfoLocked(); err != nil {
|
||||
return fmt.Errorf("cannot flush metainfo: %w", err)
|
||||
}
|
||||
q.lastMetainfoFlushTime = t
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *Queue) flushMetainfoLocked() error {
|
||||
mi := &metainfo{
|
||||
Name: q.name,
|
||||
ReaderOffset: q.readerOffset,
|
||||
|
@ -577,6 +606,7 @@ func (mi *metainfo) WriteToFile(path string) error {
|
|||
if err := ioutil.WriteFile(path, data, 0600); err != nil {
|
||||
return fmt.Errorf("cannot write persistent queue metainfo to %q: %w", path, err)
|
||||
}
|
||||
fs.MustSyncPath(path)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue