mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/persistentqueue: flush data to disk every second
Previously small amounts of data may be left unflushed for extended periods of time if vmagent collects small amounts of data. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/687
This commit is contained in:
parent
74e3198281
commit
70c721c01b
2 changed files with 11 additions and 5 deletions
|
@ -47,7 +47,8 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
|
||||||
}
|
}
|
||||||
fq.cond.L = &fq.mu
|
fq.cond.L = &fq.mu
|
||||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||||
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d", path, maxInmemoryBlocks)
|
pendingBytes := fq.GetPendingBytes()
|
||||||
|
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes)
|
||||||
return fq
|
return fq
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -411,7 +411,8 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
||||||
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
||||||
// Finalize the current chunk and start new one.
|
// Finalize the current chunk and start new one.
|
||||||
q.writer.MustClose()
|
q.writer.MustClose()
|
||||||
fs.MustSyncPath(q.writerPath)
|
// There is no need to do fs.MustSyncPath(q.writerPath) here,
|
||||||
|
// since MustClose already does this.
|
||||||
if n := q.writerOffset % q.chunkFileSize; n > 0 {
|
if n := q.writerOffset % q.chunkFileSize; n > 0 {
|
||||||
q.writerOffset += (q.chunkFileSize - n)
|
q.writerOffset += (q.chunkFileSize - n)
|
||||||
}
|
}
|
||||||
|
@ -445,7 +446,7 @@ func (q *Queue) writeBlockLocked(block []byte) error {
|
||||||
}
|
}
|
||||||
q.blocksWritten.Inc()
|
q.blocksWritten.Inc()
|
||||||
q.bytesWritten.Add(len(block))
|
q.bytesWritten.Add(len(block))
|
||||||
return q.flushMetainfoIfNeededLocked()
|
return q.flushMetainfoIfNeededLocked(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustReadBlock appends the next block from q to dst and returns the result.
|
// MustReadBlock appends the next block from q to dst and returns the result.
|
||||||
|
@ -522,7 +523,7 @@ func (q *Queue) readBlockLocked(dst []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
q.blocksRead.Inc()
|
q.blocksRead.Inc()
|
||||||
q.bytesRead.Add(int(blockLen))
|
q.bytesRead.Add(int(blockLen))
|
||||||
if err := q.flushMetainfoIfNeededLocked(); err != nil {
|
if err := q.flushMetainfoIfNeededLocked(false); err != nil {
|
||||||
return dst, err
|
return dst, err
|
||||||
}
|
}
|
||||||
return dst, nil
|
return dst, nil
|
||||||
|
@ -560,11 +561,15 @@ func (q *Queue) readFull(buf []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queue) flushMetainfoIfNeededLocked() error {
|
func (q *Queue) flushMetainfoIfNeededLocked(flushData bool) error {
|
||||||
t := fasttime.UnixTimestamp()
|
t := fasttime.UnixTimestamp()
|
||||||
if t == q.lastMetainfoFlushTime {
|
if t == q.lastMetainfoFlushTime {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if flushData {
|
||||||
|
q.writer.MustFlush()
|
||||||
|
fs.MustSyncPath(q.writerPath)
|
||||||
|
}
|
||||||
if err := q.flushMetainfoLocked(); err != nil {
|
if err := q.flushMetainfoLocked(); err != nil {
|
||||||
return fmt.Errorf("cannot flush metainfo: %w", err)
|
return fmt.Errorf("cannot flush metainfo: %w", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue