lib/fs: lazily open the file at ReaderAt on the first access

This should significantly reduce the number of open ReaderAt files
on VictoriaMetrics and VictoriaLogs startup.

The open files can be tracked via vm_fs_readers metric
This commit is contained in:
Aliaksandr Valialkin 2024-02-06 20:40:10 +02:00
parent 28f9fe5f65
commit e159cc30df
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -4,6 +4,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"sync"
"sync/atomic" "sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -34,15 +35,19 @@ type ReaderAt struct {
readCalls uint64 readCalls uint64
readBytes uint64 readBytes uint64
f *os.File // path contains the path to the file for reading
mmapData []byte path string
// mr is used for lazy opening of the file at path on the first access.
mr atomic.Pointer[mmapReader]
mrLock sync.Mutex
useLocalStats bool useLocalStats bool
} }
// Path returns path to r. // Path returns path to r.
func (r *ReaderAt) Path() string { func (r *ReaderAt) Path() string {
return r.f.Name() return r.path
} }
// MustReadAt reads len(p) bytes at off from r. // MustReadAt reads len(p) bytes at off from r.
@ -53,19 +58,24 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
if off < 0 { if off < 0 {
logger.Panicf("BUG: off=%d cannot be negative", off) logger.Panicf("BUG: off=%d cannot be negative", off)
} }
if len(r.mmapData) == 0 {
n, err := r.f.ReadAt(p, off) // Lazily open the file at r.path on the first access
mr := r.getMmapReader()
// Read len(p) bytes at offset off to p.
if len(mr.mmapData) == 0 {
n, err := mr.f.ReadAt(p, off)
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, r.Path(), err) logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, r.path, err)
} }
if n != len(p) { if n != len(p) {
logger.Panicf("FATAL: unexpected number of bytes read from file %q; got %d; want %d", r.Path(), n, len(p)) logger.Panicf("FATAL: unexpected number of bytes read from file %q; got %d; want %d", r.path, n, len(p))
} }
} else { } else {
if off > int64(len(r.mmapData)-len(p)) { if off > int64(len(mr.mmapData)-len(p)) {
logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(r.mmapData)-len(p), len(p)) logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(mr.mmapData)-len(p), len(p))
} }
src := r.mmapData[off:] src := mr.mmapData[off:]
// The copy() below may result in thread block as described at https://valyala.medium.com/mmap-in-go-considered-harmful-d92a25cb161d . // The copy() below may result in thread block as described at https://valyala.medium.com/mmap-in-go-considered-harmful-d92a25cb161d .
// But production workload proved this is OK in most cases, so use it without fear :) // But production workload proved this is OK in most cases, so use it without fear :)
copy(p, src) copy(p, src)
@ -79,17 +89,34 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
} }
} }
func (r *ReaderAt) getMmapReader() *mmapReader {
mr := r.mr.Load()
if mr != nil {
return mr
}
r.mrLock.Lock()
mr = r.mr.Load()
if mr == nil {
mr = newMmapReaderFromPath(r.path)
r.mr.Store(mr)
}
r.mrLock.Unlock()
return mr
}
var (
readCalls = metrics.NewCounter(`vm_fs_read_calls_total`)
readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`)
readersCount = metrics.NewCounter(`vm_fs_readers`)
)
// MustClose closes r. // MustClose closes r.
func (r *ReaderAt) MustClose() { func (r *ReaderAt) MustClose() {
fname := r.Path() mr := r.mr.Load()
if len(r.mmapData) > 0 { if mr != nil {
if err := mUnmap(r.mmapData[:cap(r.mmapData)]); err != nil { mr.mustClose()
logger.Panicf("FATAL: cannot unmap data for file %q: %s", fname, err) r.mr.Store(nil)
}
r.mmapData = nil
} }
MustClose(r.f)
r.f = nil
if r.useLocalStats { if r.useLocalStats {
readCalls.Add(int(r.readCalls)) readCalls.Add(int(r.readCalls))
@ -98,7 +125,6 @@ func (r *ReaderAt) MustClose() {
r.readBytes = 0 r.readBytes = 0
r.useLocalStats = false r.useLocalStats = false
} }
readersCount.Dec()
} }
// SetUseLocalStats switches to local stats collection instead of global stats collection. // SetUseLocalStats switches to local stats collection instead of global stats collection.
@ -116,8 +142,9 @@ func (r *ReaderAt) SetUseLocalStats() {
// //
// if prefetch is set, then the OS is hinted to prefetch f data. // if prefetch is set, then the OS is hinted to prefetch f data.
func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) { func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
if err := fadviseSequentialRead(r.f, prefetch); err != nil { mr := r.getMmapReader()
logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.Path(), prefetch, err) if err := fadviseSequentialRead(mr.f, prefetch); err != nil {
logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.path, prefetch, err)
} }
} }
@ -125,11 +152,9 @@ func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
// //
// MustClose must be called on the returned ReaderAt when it is no longer needed. // MustClose must be called on the returned ReaderAt when it is no longer needed.
func MustOpenReaderAt(path string) *ReaderAt { func MustOpenReaderAt(path string) *ReaderAt {
f, err := os.Open(path) var r ReaderAt
if err != nil { r.path = path
logger.Panicf("FATAL: cannot open file for reading: %s", err) return &r
}
return NewReaderAt(f)
} }
// NewReaderAt returns ReaderAt for reading from f. // NewReaderAt returns ReaderAt for reading from f.
@ -138,8 +163,28 @@ func MustOpenReaderAt(path string) *ReaderAt {
// //
// MustClose must be called on the returned ReaderAt when it is no longer needed. // MustClose must be called on the returned ReaderAt when it is no longer needed.
func NewReaderAt(f *os.File) *ReaderAt { func NewReaderAt(f *os.File) *ReaderAt {
mr := newMmapReaderFromFile(f)
var r ReaderAt var r ReaderAt
r.f = f r.path = f.Name()
r.mr.Store(mr)
return &r
}
type mmapReader struct {
f *os.File
mmapData []byte
}
func newMmapReaderFromPath(path string) *mmapReader {
f, err := os.Open(path)
if err != nil {
logger.Panicf("FATAL: cannot open file for reading: %s", err)
}
return newMmapReaderFromFile(f)
}
func newMmapReaderFromFile(f *os.File) *mmapReader {
var mmapData []byte
if !*disableMmap { if !*disableMmap {
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
@ -154,17 +199,28 @@ func NewReaderAt(f *os.File) *ReaderAt {
MustClose(f) MustClose(f)
logger.Panicf("FATAL: cannot mmap %q: %s", path, err) logger.Panicf("FATAL: cannot mmap %q: %s", path, err)
} }
r.mmapData = data mmapData = data
} }
readersCount.Inc() readersCount.Inc()
return &r return &mmapReader{
f: f,
mmapData: mmapData,
}
} }
var ( func (mr *mmapReader) mustClose() {
readCalls = metrics.NewCounter(`vm_fs_read_calls_total`) fname := mr.f.Name()
readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`) if len(mr.mmapData) > 0 {
readersCount = metrics.NewCounter(`vm_fs_readers`) if err := mUnmap(mr.mmapData[:cap(mr.mmapData)]); err != nil {
) logger.Panicf("FATAL: cannot unmap data for file %q: %s", fname, err)
}
mr.mmapData = nil
}
MustClose(mr.f)
mr.f = nil
readersCount.Dec()
}
func mmapFile(f *os.File, size int64) ([]byte, error) { func mmapFile(f *os.File, size int64) ([]byte, error) {
if size == 0 { if size == 0 {