From e159cc30df0887444f9c9ced98c6d0fcfff85e17 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 6 Feb 2024 20:40:10 +0200 Subject: [PATCH] lib/fs: lazily open the file at ReaderAt on the first access This should significantly reduce the number of open ReaderAt files on VictoriaMetrics and VictoriaLogs startup. The open files can be tracked via vm_fs_readers metric --- lib/fs/reader_at.go | 124 ++++++++++++++++++++++++++++++++------------ 1 file changed, 90 insertions(+), 34 deletions(-) diff --git a/lib/fs/reader_at.go b/lib/fs/reader_at.go index 3fdd85bd14..379c05a9e1 100644 --- a/lib/fs/reader_at.go +++ b/lib/fs/reader_at.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "os" + "sync" "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -34,15 +35,19 @@ type ReaderAt struct { readCalls uint64 readBytes uint64 - f *os.File - mmapData []byte + // path contains the path to the file for reading + path string + + // mr is used for lazy opening of the file at path on the first access. + mr atomic.Pointer[mmapReader] + mrLock sync.Mutex useLocalStats bool } // Path returns path to r. func (r *ReaderAt) Path() string { - return r.f.Name() + return r.path } // MustReadAt reads len(p) bytes at off from r. @@ -53,19 +58,24 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) { if off < 0 { logger.Panicf("BUG: off=%d cannot be negative", off) } - if len(r.mmapData) == 0 { - n, err := r.f.ReadAt(p, off) + + // Lazily open the file at r.path on the first access + mr := r.getMmapReader() + + // Read len(p) bytes at offset off to p. + if len(mr.mmapData) == 0 { + n, err := mr.f.ReadAt(p, off) if err != nil { - logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, r.Path(), err) + logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, r.path, err) } if n != len(p) { - logger.Panicf("FATAL: unexpected number of bytes read from file %q; got %d; want %d", r.Path(), n, len(p)) + logger.Panicf("FATAL: unexpected number of bytes read from file %q; got %d; want %d", r.path, n, len(p)) } } else { - if off > int64(len(r.mmapData)-len(p)) { - logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(r.mmapData)-len(p), len(p)) + if off > int64(len(mr.mmapData)-len(p)) { + logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(mr.mmapData)-len(p), len(p)) } - src := r.mmapData[off:] + src := mr.mmapData[off:] // The copy() below may result in thread block as described at https://valyala.medium.com/mmap-in-go-considered-harmful-d92a25cb161d . // But production workload proved this is OK in most cases, so use it without fear :) copy(p, src) @@ -79,17 +89,34 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) { } } +func (r *ReaderAt) getMmapReader() *mmapReader { + mr := r.mr.Load() + if mr != nil { + return mr + } + r.mrLock.Lock() + mr = r.mr.Load() + if mr == nil { + mr = newMmapReaderFromPath(r.path) + r.mr.Store(mr) + } + r.mrLock.Unlock() + return mr +} + +var ( + readCalls = metrics.NewCounter(`vm_fs_read_calls_total`) + readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`) + readersCount = metrics.NewCounter(`vm_fs_readers`) +) + // MustClose closes r. func (r *ReaderAt) MustClose() { - fname := r.Path() - if len(r.mmapData) > 0 { - if err := mUnmap(r.mmapData[:cap(r.mmapData)]); err != nil { - logger.Panicf("FATAL: cannot unmap data for file %q: %s", fname, err) - } - r.mmapData = nil + mr := r.mr.Load() + if mr != nil { + mr.mustClose() + r.mr.Store(nil) } - MustClose(r.f) - r.f = nil if r.useLocalStats { readCalls.Add(int(r.readCalls)) @@ -98,7 +125,6 @@ func (r *ReaderAt) MustClose() { r.readBytes = 0 r.useLocalStats = false } - readersCount.Dec() } // SetUseLocalStats switches to local stats collection instead of global stats collection. @@ -116,8 +142,9 @@ func (r *ReaderAt) SetUseLocalStats() { // // if prefetch is set, then the OS is hinted to prefetch f data. func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) { - if err := fadviseSequentialRead(r.f, prefetch); err != nil { - logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.Path(), prefetch, err) + mr := r.getMmapReader() + if err := fadviseSequentialRead(mr.f, prefetch); err != nil { + logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.path, prefetch, err) } } @@ -125,11 +152,9 @@ func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) { // // MustClose must be called on the returned ReaderAt when it is no longer needed. func MustOpenReaderAt(path string) *ReaderAt { - f, err := os.Open(path) - if err != nil { - logger.Panicf("FATAL: cannot open file for reading: %s", err) - } - return NewReaderAt(f) + var r ReaderAt + r.path = path + return &r } // NewReaderAt returns ReaderAt for reading from f. @@ -138,8 +163,28 @@ func MustOpenReaderAt(path string) *ReaderAt { // // MustClose must be called on the returned ReaderAt when it is no longer needed. func NewReaderAt(f *os.File) *ReaderAt { + mr := newMmapReaderFromFile(f) var r ReaderAt - r.f = f + r.path = f.Name() + r.mr.Store(mr) + return &r +} + +type mmapReader struct { + f *os.File + mmapData []byte +} + +func newMmapReaderFromPath(path string) *mmapReader { + f, err := os.Open(path) + if err != nil { + logger.Panicf("FATAL: cannot open file for reading: %s", err) + } + return newMmapReaderFromFile(f) +} + +func newMmapReaderFromFile(f *os.File) *mmapReader { + var mmapData []byte if !*disableMmap { fi, err := f.Stat() if err != nil { @@ -154,17 +199,28 @@ func NewReaderAt(f *os.File) *ReaderAt { MustClose(f) logger.Panicf("FATAL: cannot mmap %q: %s", path, err) } - r.mmapData = data + mmapData = data } readersCount.Inc() - return &r + return &mmapReader{ + f: f, + mmapData: mmapData, + } } -var ( - readCalls = metrics.NewCounter(`vm_fs_read_calls_total`) - readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`) - readersCount = metrics.NewCounter(`vm_fs_readers`) -) +func (mr *mmapReader) mustClose() { + fname := mr.f.Name() + if len(mr.mmapData) > 0 { + if err := mUnmap(mr.mmapData[:cap(mr.mmapData)]); err != nil { + logger.Panicf("FATAL: cannot unmap data for file %q: %s", fname, err) + } + mr.mmapData = nil + } + MustClose(mr.f) + mr.f = nil + + readersCount.Dec() +} func mmapFile(f *os.File, size int64) ([]byte, error) { if size == 0 {