From 70c721c01b2771f45877ca377c7ae856420d35b6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 18 Sep 2020 13:03:39 +0300 Subject: [PATCH] lib/persistentqueue: flush data to disk every second Previously small amounts of data may be left unflushed for extended periods of time if vmagent collects small amounts of data. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/687 --- lib/persistentqueue/fastqueue.go | 3 ++- lib/persistentqueue/persistentqueue.go | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 18aba7fbc..5a3dd33e2 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -47,7 +47,8 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int } fq.cond.L = &fq.mu fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() - logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks) + pendingBytes := fq.GetPendingBytes() + logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes) return fq } diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 159f21c33..72b63ab40 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -411,7 +411,8 @@ func (q *Queue) writeBlockLocked(block []byte) error { if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize { // Finalize the current chunk and start new one. q.writer.MustClose() - fs.MustSyncPath(q.writerPath) + // There is no need to do fs.MustSyncPath(q.writerPath) here, + // since MustClose already does this. if n := q.writerOffset % q.chunkFileSize; n > 0 { q.writerOffset += (q.chunkFileSize - n) } @@ -445,7 +446,7 @@ func (q *Queue) writeBlockLocked(block []byte) error { } q.blocksWritten.Inc() q.bytesWritten.Add(len(block)) - return q.flushMetainfoIfNeededLocked() + return q.flushMetainfoIfNeededLocked(true) } // MustReadBlock appends the next block from q to dst and returns the result. @@ -522,7 +523,7 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) { } q.blocksRead.Inc() q.bytesRead.Add(int(blockLen)) - if err := q.flushMetainfoIfNeededLocked(); err != nil { + if err := q.flushMetainfoIfNeededLocked(false); err != nil { return dst, err } return dst, nil @@ -560,11 +561,15 @@ func (q *Queue) readFull(buf []byte) error { return nil } -func (q *Queue) flushMetainfoIfNeededLocked() error { +func (q *Queue) flushMetainfoIfNeededLocked(flushData bool) error { t := fasttime.UnixTimestamp() if t == q.lastMetainfoFlushTime { return nil } + if flushData { + q.writer.MustFlush() + fs.MustSyncPath(q.writerPath) + } if err := q.flushMetainfoLocked(); err != nil { return fmt.Errorf("cannot flush metainfo: %w", err) }