From cb5c39ee709bcc2eb03661c432e1182f48ae6378 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 30 Jan 2020 15:03:24 +0200 Subject: [PATCH] lib/fs: optimize small reads for `ReaderAt.MustReadAt` by reading from memory-mapped space instead of reading from file descriptor This should improve performance when reading many small blocks. --- app/vmselect/netstorage/tmp_blocks_file.go | 25 ++-- lib/fs/copy_mmap_cgo.go | 34 ++++++ lib/fs/copy_mmap_nocgo.go | 12 ++ lib/fs/fadvise_darwin.go | 6 +- lib/fs/fadvise_unix.go | 10 +- lib/fs/fs.go | 60 ---------- lib/fs/reader_at.go | 133 +++++++++++++++++++++ 7 files changed, 199 insertions(+), 81 deletions(-) create mode 100644 lib/fs/copy_mmap_cgo.go create mode 100644 lib/fs/copy_mmap_nocgo.go create mode 100644 lib/fs/reader_at.go diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index 8e5542f12..7a278898f 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -50,6 +50,7 @@ type tmpBlocksFile struct { buf []byte f *os.File + r *fs.ReaderAt offset uint64 } @@ -68,6 +69,7 @@ func putTmpBlocksFile(tbf *tmpBlocksFile) { tbf.MustClose() tbf.buf = tbf.buf[:0] tbf.f = nil + tbf.r = nil tbf.offset = 0 tmpBlocksFilePool.Put(tbf) } @@ -121,17 +123,20 @@ func (tbf *tmpBlocksFile) Finalize() error { if tbf.f == nil { return nil } + fname := tbf.f.Name() if _, err := tbf.f.Write(tbf.buf); err != nil { - return fmt.Errorf("cannot flush the remaining %d bytes to tmpBlocksFile: %s", len(tbf.buf), err) + return fmt.Errorf("cannot write the remaining %d bytes to %q: %s", len(tbf.buf), fname, err) } tbf.buf = tbf.buf[:0] - if _, err := tbf.f.Seek(0, 0); err != nil { - logger.Panicf("FATAL: cannot seek to the start of file: %s", err) + r, err := fs.OpenReaderAt(fname) + if err != nil { + logger.Panicf("FATAL: cannot open %q: %s", fname, err) } // Hint the OS that the file is read almost sequentiallly. // This should reduce the number of disk seeks, which is important // for HDDs. - fs.MustFadviseSequentialRead(tbf.f, true) + r.MustFadviseSequentialRead(true) + tbf.r = r return nil } @@ -143,13 +148,7 @@ func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) bb := tmpBufPool.Get() defer tmpBufPool.Put(bb) bb.B = bytesutil.Resize(bb.B, addr.size) - n, err := tbf.f.ReadAt(bb.B, int64(addr.offset)) - if err != nil { - logger.Panicf("FATAL: cannot read from %q at %s: %s", tbf.f.Name(), addr, err) - } - if n != len(bb.B) { - logger.Panicf("FATAL: too short number of bytes read at %s; got %d; want %d", addr, n, len(bb.B)) - } + tbf.r.MustReadAt(bb.B, int64(addr.offset)) buf = bb.B } tail, err := storage.UnmarshalBlock(dst, buf) @@ -167,6 +166,10 @@ func (tbf *tmpBlocksFile) MustClose() { if tbf.f == nil { return } + if tbf.r != nil { + // tbf.r could be nil if Finalize wasn't called. + tbf.r.MustClose() + } fname := tbf.f.Name() // Remove the file at first, then close it. diff --git a/lib/fs/copy_mmap_cgo.go b/lib/fs/copy_mmap_cgo.go new file mode 100644 index 000000000..a62c61e12 --- /dev/null +++ b/lib/fs/copy_mmap_cgo.go @@ -0,0 +1,34 @@ +// +build cgo + +package fs + +// #cgo CFLAGS: -O3 +// +// #include // for uintptr_t +// #include // for memcpy +// +// // The memcpy_wrapper allows avoiding memory allocations during calls from Go. +// // See https://github.com/golang/go/issues/24450 . +// static void memcpy_wrapper(uintptr_t dst, uintptr_t src, size_t n) { +// memcpy((void*)dst, (void*)src, n); +// } +import "C" + +import ( + "runtime" + "unsafe" +) + +// copyMmap copies len(dst) bytes from src to dst. +func copyMmap(dst, src []byte) { + // Copy data from mmap'ed src via cgo call in order to protect from goroutine stalls + // when the copied data isn't available in RAM, so the OS triggers reading the data from file. + // See https://medium.com/@valyala/mmap-in-go-considered-harmful-d92a25cb161d for details. + dstPtr := C.uintptr_t(uintptr(unsafe.Pointer(&dst[0]))) + srcPtr := C.uintptr_t(uintptr(unsafe.Pointer(&src[0]))) + C.memcpy_wrapper(dstPtr, srcPtr, C.size_t(len(dst))) + + // Prevent from GC'ing src or dst during C.memcpy_wrapper call. + runtime.KeepAlive(src) + runtime.KeepAlive(dst) +} diff --git a/lib/fs/copy_mmap_nocgo.go b/lib/fs/copy_mmap_nocgo.go new file mode 100644 index 000000000..b0772220f --- /dev/null +++ b/lib/fs/copy_mmap_nocgo.go @@ -0,0 +1,12 @@ +// +build !cgo + +package fs + +// copyMmap copies len(dst) bytes from src to dst. +func copyMmap(dst, src []byte) { + // This may lead to goroutines stalls when the copied data isn't available in RAM. + // In this case the OS triggers reading the data from file. + // See https://medium.com/@valyala/mmap-in-go-considered-harmful-d92a25cb161d for details. + // TODO: fix this + copy(dst, src) +} diff --git a/lib/fs/fadvise_darwin.go b/lib/fs/fadvise_darwin.go index feeb8a721..73cfe81a7 100644 --- a/lib/fs/fadvise_darwin.go +++ b/lib/fs/fadvise_darwin.go @@ -4,9 +4,7 @@ import ( "os" ) -// MustFadviseSequentialRead hints the OS that f is read mostly sequentially. -// -// if prefetch is set, then the OS is hinted to prefetch f data. -func MustFadviseSequentialRead(f *os.File, prefetch bool) { +func fadviseSequentialRead(f *os.File, prefetch bool) error { // TODO: implement this properly + return nil } diff --git a/lib/fs/fadvise_unix.go b/lib/fs/fadvise_unix.go index e578a4aad..21018796e 100644 --- a/lib/fs/fadvise_unix.go +++ b/lib/fs/fadvise_unix.go @@ -3,22 +3,20 @@ package fs import ( + "fmt" "os" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "golang.org/x/sys/unix" ) -// MustFadviseSequentialRead hints the OS that f is read mostly sequentially. -// -// if prefetch is set, then the OS is hinted to prefetch f data. -func MustFadviseSequentialRead(f *os.File, prefetch bool) { +func fadviseSequentialRead(f *os.File, prefetch bool) error { fd := int(f.Fd()) mode := unix.FADV_SEQUENTIAL if prefetch { mode |= unix.FADV_WILLNEED } if err := unix.Fadvise(int(fd), 0, 0, mode); err != nil { - logger.Panicf("FATAL: error returned from unix.Fadvise(%d): %s", mode, err) + return fmt.Errorf("error returned from unix.Fadvise(%d): %s", mode, err) } + return nil } diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 20c4a0b6f..c8ef483b4 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -10,69 +10,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/metrics" "golang.org/x/sys/unix" ) -// MustReadAtCloser is rand-access read interface. -type MustReadAtCloser interface { - // MustReadAt must read len(p) bytes from offset off to p. - MustReadAt(p []byte, off int64) - - // MustClose must close the reader. - MustClose() -} - -// ReaderAt implements rand-access read. -type ReaderAt struct { - f *os.File -} - -// MustReadAt reads len(p) bytes from off to p. -func (ra *ReaderAt) MustReadAt(p []byte, off int64) { - if len(p) == 0 { - return - } - n, err := ra.f.ReadAt(p, off) - if err != nil { - logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, ra.f.Name(), err) - } - if n != len(p) { - logger.Panicf("FATAL: unexpected number of bytes read; got %d; want %d", n, len(p)) - } - readCalls.Inc() - readBytes.Add(len(p)) -} - -// MustClose closes ra. -func (ra *ReaderAt) MustClose() { - if err := ra.f.Close(); err != nil { - logger.Panicf("FATAL: cannot close file %q: %s", ra.f.Name(), err) - } - readersCount.Dec() -} - -// OpenReaderAt opens a file on the given path for random-read access. -// -// The file must be closed with MustClose when no longer needed. -func OpenReaderAt(path string) (*ReaderAt, error) { - f, err := os.Open(path) - if err != nil { - return nil, err - } - readersCount.Inc() - ra := &ReaderAt{ - f: f, - } - return ra, nil -} - -var ( - readCalls = metrics.NewCounter(`vm_fs_read_calls_total`) - readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`) - readersCount = metrics.NewCounter(`vm_fs_readers`) -) - // MustSyncPath syncs contents of the given path. func MustSyncPath(path string) { d, err := os.Open(path) diff --git a/lib/fs/reader_at.go b/lib/fs/reader_at.go new file mode 100644 index 000000000..85e24eb5c --- /dev/null +++ b/lib/fs/reader_at.go @@ -0,0 +1,133 @@ +package fs + +import ( + "flag" + "fmt" + "os" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" + "golang.org/x/sys/unix" +) + +var disableMmap = flag.Bool("fs.disableMmap", false, "Whether to use `pread` instead of `mmap` for reading data files") + +// MustReadAtCloser is rand-access read interface. +type MustReadAtCloser interface { + // MustReadAt must read len(p) bytes from offset off to p. + MustReadAt(p []byte, off int64) + + // MustClose must close the reader. + MustClose() +} + +type readerAt interface { + ReadAt(p []byte, off int64) (int, error) + Close() error +} + +// ReaderAt implements rand-access reader. +type ReaderAt struct { + f *os.File + mmapData []byte +} + +// MustReadAt reads len(p) bytes at off from r. +func (r *ReaderAt) MustReadAt(p []byte, off int64) { + if len(p) == 0 { + return + } + if len(r.mmapData) == 0 || len(p) > 8*1024*1024 { + // Read big blocks directly from file. + // This could be faster than reading these blocks from mmap, + // since it triggers less page faults. + n, err := r.f.ReadAt(p, off) + if err != nil { + logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, r.f.Name(), err) + } + if n != len(p) { + logger.Panicf("FATAL: unexpected number of bytes read; got %d; want %d", n, len(p)) + } + } else { + if off < 0 || off > int64(len(r.mmapData)-len(p)) { + logger.Panicf("off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(r.mmapData)-len(p), len(p)) + } + copyMmap(p, r.mmapData[off:]) + } + readCalls.Inc() + readBytes.Add(len(p)) +} + +// MustClose closes r. +func (r *ReaderAt) MustClose() { + fname := r.f.Name() + if len(r.mmapData) > 0 { + if err := unix.Munmap(r.mmapData); err != nil { + logger.Panicf("FATAL: cannot unmap data for file %q: %s", fname, err) + } + r.mmapData = nil + } + MustClose(r.f) + r.f = nil + readersCount.Dec() +} + +// MustFadviseSequentialRead hints the OS that f is read mostly sequentially. +// +// if prefetch is set, then the OS is hinted to prefetch f data. +func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) { + if err := fadviseSequentialRead(r.f, prefetch); err != nil { + logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.f.Name(), prefetch, err) + } +} + +// OpenReaderAt opens ReaderAt for reading from filename. +// +// MustClose must be called on the returned ReaderAt when it is no longer needed. +func OpenReaderAt(path string) (*ReaderAt, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("cannot open file %q for reader: %s", path, err) + } + var r ReaderAt + r.f = f + if !*disableMmap { + data, err := mmapFile(f) + if err != nil { + MustClose(f) + return nil, fmt.Errorf("cannot init reader for %q: %s", path, err) + } + r.mmapData = data + } + r.MustFadviseSequentialRead(false) + readersCount.Inc() + return &r, nil +} + +var ( + readCalls = metrics.NewCounter(`vm_fs_read_calls_total`) + readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`) + readersCount = metrics.NewCounter(`vm_fs_readers`) +) + +func mmapFile(f *os.File) ([]byte, error) { + fi, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("error in stat: %s", err) + } + size := fi.Size() + if size == 0 { + return nil, nil + } + if size < 0 { + return nil, fmt.Errorf("got negative file size: %d bytes", size) + } + if int64(int(size)) != size { + return nil, fmt.Errorf("file is too big to be mmap'ed: %d bytes", size) + } + data, err := unix.Mmap(int(f.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED) + if err != nil { + return nil, fmt.Errorf("cannot mmap file with size %d: %s", size, err) + } + return data, nil +}