lib/persistentqueue: make the persistent queue more durable against unclean shutdown (kill -9, OOM, hard reset)

The strategy is:

- Periodical flushing of inmemory blocks to files, so they aren't lost on unclean shutdown.
- Periodical syncing of metadata for persisted queues, so the metadata remains in sync with the persisted data.
- Automatic adjusting of too big chunk size when opening the queue. The chunk size may be bigger than the writer offset after unclean shutdown.
- Skipping of broken chunk file if it cannot be read.
- Fsyncing finalized chunk files.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/687
This commit is contained in:
Aliaksandr Valialkin 2020-09-16 17:30:04 +03:00
parent 6ce52e3702
commit eee6f1e56d
2 changed files with 87 additions and 11 deletions

View file

@ -2,8 +2,10 @@ package persistentqueue
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -26,6 +28,11 @@ type FastQueue struct {
pendingInmemoryBytes uint64
// stopCh is used for stopping background workers such as inmemoryBlocksFlusher.
stopCh chan struct{}
lastInmemoryBlockReadTime uint64
inmemoryBlocksFlusherWG sync.WaitGroup
mustStop bool
}
@ -43,6 +50,14 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
}
fq.cond.L = &fq.mu
fq.stopCh = make(chan struct{})
fq.inmemoryBlocksFlusherWG.Add(1)
go func() {
defer fq.inmemoryBlocksFlusherWG.Done()
fq.inmemoryBlocksFlusher()
}()
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks)
return fq
}
@ -51,6 +66,10 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
//
// It is expected no new writers during and after the call.
func (fq *FastQueue) MustClose() {
// Stop background flusher.
close(fq.stopCh)
fq.inmemoryBlocksFlusherWG.Wait()
fq.mu.Lock()
defer fq.mu.Unlock()
@ -67,12 +86,38 @@ func (fq *FastQueue) MustClose() {
logger.Infof("closed fast persistent queue at %q", fq.pq.dir)
}
func (fq *FastQueue) inmemoryBlocksFlusher() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-fq.stopCh:
return
case <-t.C:
}
fq.flushInmemoryBlocksToFileIfNeeded()
}
}
func (fq *FastQueue) flushInmemoryBlocksToFileIfNeeded() {
fq.mu.Lock()
defer fq.mu.Unlock()
if len(fq.ch) == 0 {
return
}
if fasttime.UnixTimestamp() < fq.lastInmemoryBlockReadTime+5 {
return
}
fq.flushInmemoryBlocksToFileLocked()
}
func (fq *FastQueue) flushInmemoryBlocksToFileLocked() {
// fq.mu must be locked by the caller.
for len(fq.ch) > 0 {
bb := <-fq.ch
fq.pq.MustWriteBlock(bb.B)
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
blockBufPool.Put(bb)
}
// Unblock all the potentially blocked readers, so they could proceed with reading file-based queue.
@ -143,6 +188,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst, true

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -52,6 +53,8 @@ type Queue struct {
writerLocalOffset uint64
writerFlushedOffset uint64
lastMetainfoFlushTime uint64
mustStop bool
blocksDropped *metrics.Counter
@ -110,7 +113,7 @@ func (q *Queue) ResetIfEmpty() {
}
q.reader = r
if err := q.flushMetainfo(); err != nil {
if err := q.flushMetainfoLocked(); err != nil {
logger.Panicf("FATAL: cannot flush metainfo: %s", err)
}
}
@ -236,7 +239,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
logger.Panicf("BUG: cannot parse hex %q: %s", fname, err)
}
if offset%q.chunkFileSize != 0 {
logger.Errorf("unexpected offset for chunk file %q: %d; it must divide by %d; removing the file", filepath, offset, q.chunkFileSize)
logger.Errorf("unexpected offset for chunk file %q: %d; it must be multiple of %d; removing the file", filepath, offset, q.chunkFileSize)
fs.MustRemoveAll(filepath)
continue
}
@ -284,10 +287,15 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
q.writerLocalOffset = mi.WriterOffset % q.chunkFileSize
q.writerFlushedOffset = mi.WriterOffset
if fileSize := fs.MustFileSize(q.writerPath); fileSize != q.writerLocalOffset {
logger.Errorf("chunk file %q size doesn't match writer offset; file size %d bytes; writer offset: %d bytes",
q.writerPath, fileSize, q.writerLocalOffset)
fs.MustRemoveAll(q.writerPath)
continue
if fileSize < q.writerLocalOffset {
logger.Errorf("%q size (%d bytes) is smaller than the writer offset (%d bytes); removing the file",
q.writerPath, fileSize, q.writerLocalOffset)
fs.MustRemoveAll(q.writerPath)
continue
}
logger.Warnf("%q size (%d bytes) is bigger than writer offset (%d bytes); "+
"this may be the case on unclean shutdown (OOM, `kill -9`, hardware reset); trying to fix it by adjusting fileSize to %d",
q.writerPath, fileSize, q.writerLocalOffset, q.writerLocalOffset)
}
w, err := filestream.OpenWriterAt(q.writerPath, int64(q.writerLocalOffset), false)
if err != nil {
@ -331,7 +339,7 @@ func (q *Queue) MustClose() {
q.reader = nil
// Store metainfo
if err := q.flushMetainfo(); err != nil {
if err := q.flushMetainfoLocked(); err != nil {
logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err)
}
}
@ -403,6 +411,7 @@ func (q *Queue) writeBlockLocked(block []byte) error {
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
// Finalize the current chunk and start new one.
q.writer.MustClose()
fs.MustSyncPath(q.writerPath)
if n := q.writerOffset % q.chunkFileSize; n > 0 {
q.writerOffset += (q.chunkFileSize - n)
}
@ -414,9 +423,10 @@ func (q *Queue) writeBlockLocked(block []byte) error {
return fmt.Errorf("cannot create chunk file %q: %w", q.writerPath, err)
}
q.writer = w
if err := q.flushMetainfo(); err != nil {
if err := q.flushMetainfoLocked(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err)
}
fs.MustSyncPath(q.dir)
}
// Write block len.
@ -435,7 +445,7 @@ func (q *Queue) writeBlockLocked(block []byte) error {
}
q.blocksWritten.Inc()
q.bytesWritten.Add(len(block))
return nil
return q.flushMetainfoIfNeededLocked()
}
// MustReadBlock appends the next block from q to dst and returns the result.
@ -462,6 +472,9 @@ func (q *Queue) MustReadBlock(dst []byte) ([]byte, bool) {
data, err := q.readBlockLocked(dst)
if err != nil {
// Skip the current chunk, since it may be broken.
q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize
_ = q.flushMetainfoLocked()
logger.Panicf("FATAL: %s", err)
}
return data, true
@ -482,9 +495,10 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
return dst, fmt.Errorf("cannot open chunk file %q: %w", q.readerPath, err)
}
q.reader = r
if err := q.flushMetainfo(); err != nil {
if err := q.flushMetainfoLocked(); err != nil {
return dst, fmt.Errorf("cannot flush metainfo: %w", err)
}
fs.MustSyncPath(q.dir)
}
// Read block len.
@ -508,6 +522,9 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
}
q.blocksRead.Inc()
q.bytesRead.Add(int(blockLen))
if err := q.flushMetainfoIfNeededLocked(); err != nil {
return dst, err
}
return dst, nil
}
@ -543,7 +560,19 @@ func (q *Queue) readFull(buf []byte) error {
return nil
}
func (q *Queue) flushMetainfo() error {
func (q *Queue) flushMetainfoIfNeededLocked() error {
t := fasttime.UnixTimestamp()
if t == q.lastMetainfoFlushTime {
return nil
}
if err := q.flushMetainfoLocked(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err)
}
q.lastMetainfoFlushTime = t
return nil
}
func (q *Queue) flushMetainfoLocked() error {
mi := &metainfo{
Name: q.name,
ReaderOffset: q.readerOffset,
@ -577,6 +606,7 @@ func (mi *metainfo) WriteToFile(path string) error {
if err := ioutil.WriteFile(path, data, 0600); err != nil {
return fmt.Errorf("cannot write persistent queue metainfo to %q: %w", path, err)
}
fs.MustSyncPath(path)
return nil
}