mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/persistentqueue: flush data to disk every second
Previously small amounts of data may be left unflushed for extended periods of time if vmagent collects small amounts of data. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/687
This commit is contained in:
parent
8fd791d399
commit
adb4d3b75c
2 changed files with 11 additions and 5 deletions
|
@ -47,7 +47,8 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
|
|||
}
|
||||
fq.cond.L = &fq.mu
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks)
|
||||
pendingBytes := fq.GetPendingBytes()
|
||||
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes)
|
||||
return fq
|
||||
}
|
||||
|
||||
|
|
|
@ -411,7 +411,8 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
|||
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
||||
// Finalize the current chunk and start new one.
|
||||
q.writer.MustClose()
|
||||
fs.MustSyncPath(q.writerPath)
|
||||
// There is no need to do fs.MustSyncPath(q.writerPath) here,
|
||||
// since MustClose already does this.
|
||||
if n := q.writerOffset % q.chunkFileSize; n > 0 {
|
||||
q.writerOffset += (q.chunkFileSize - n)
|
||||
}
|
||||
|
@ -445,7 +446,7 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
|||
}
|
||||
q.blocksWritten.Inc()
|
||||
q.bytesWritten.Add(len(block))
|
||||
return q.flushMetainfoIfNeededLocked()
|
||||
return q.flushMetainfoIfNeededLocked(true)
|
||||
}
|
||||
|
||||
// MustReadBlock appends the next block from q to dst and returns the result.
|
||||
|
@ -522,7 +523,7 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
|
|||
}
|
||||
q.blocksRead.Inc()
|
||||
q.bytesRead.Add(int(blockLen))
|
||||
if err := q.flushMetainfoIfNeededLocked(); err != nil {
|
||||
if err := q.flushMetainfoIfNeededLocked(false); err != nil {
|
||||
return dst, err
|
||||
}
|
||||
return dst, nil
|
||||
|
@ -560,11 +561,15 @@ func (q *Queue) readFull(buf []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *Queue) flushMetainfoIfNeededLocked() error {
|
||||
func (q *Queue) flushMetainfoIfNeededLocked(flushData bool) error {
|
||||
t := fasttime.UnixTimestamp()
|
||||
if t == q.lastMetainfoFlushTime {
|
||||
return nil
|
||||
}
|
||||
if flushData {
|
||||
q.writer.MustFlush()
|
||||
fs.MustSyncPath(q.writerPath)
|
||||
}
|
||||
if err := q.flushMetainfoLocked(); err != nil {
|
||||
return fmt.Errorf("cannot flush metainfo: %w", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue