lib/persistentqueue: sync data to file inside filestream.Writer.MustFlush

This commit is contained in:
Aliaksandr Valialkin 2020-09-19 12:51:32 +03:00
parent 26115891db
commit f00e0e0103
2 changed files with 10 additions and 5 deletions

View file

@ -254,11 +254,17 @@ func (w *Writer) Write(p []byte) (int, error) {
} }
// MustFlush flushes all the buffered data to file. // MustFlush flushes all the buffered data to file.
func (w *Writer) MustFlush() { //
// if isSync is true, then the flushed data is fsynced to the underlying storage.
func (w *Writer) MustFlush(isSync bool) {
if err := w.bw.Flush(); err != nil { if err := w.bw.Flush(); err != nil {
logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err) logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err)
} }
// Do not call w.f.Sync() for performance reasons. if isSync {
if err := w.f.Sync(); err != nil {
logger.Panicf("FATAL: cannot fsync data to the underlying storage for file %q: %s", w.f.Name(), err)
}
}
} }
type statWriter struct { type statWriter struct {

View file

@ -546,7 +546,7 @@ func (q *Queue) write(buf []byte) error {
func (q *Queue) readFull(buf []byte) error { func (q *Queue) readFull(buf []byte) error {
bufLen := uint64(len(buf)) bufLen := uint64(len(buf))
if q.readerOffset+bufLen > q.writerFlushedOffset { if q.readerOffset+bufLen > q.writerFlushedOffset {
q.writer.MustFlush() q.writer.MustFlush(false)
q.writerFlushedOffset = q.writerOffset q.writerFlushedOffset = q.writerOffset
} }
n, err := io.ReadFull(q.reader, buf) n, err := io.ReadFull(q.reader, buf)
@ -567,8 +567,7 @@ func (q *Queue) flushMetainfoIfNeededLocked(flushData bool) error {
return nil return nil
} }
if flushData { if flushData {
q.writer.MustFlush() q.writer.MustFlush(true)
fs.MustSyncPath(q.writerPath)
} }
if err := q.flushMetainfoLocked(); err != nil { if err := q.flushMetainfoLocked(); err != nil {
return fmt.Errorf("cannot flush metainfo: %w", err) return fmt.Errorf("cannot flush metainfo: %w", err)