mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/fs: optimize small reads for ReaderAt.MustReadAt
by reading from memory-mapped space instead of reading from file descriptor
This should improve performance when reading many small blocks.
This commit is contained in:
parent
da19fffa08
commit
cb5c39ee70
7 changed files with 199 additions and 81 deletions
|
@ -50,6 +50,7 @@ type tmpBlocksFile struct {
|
|||
buf []byte
|
||||
|
||||
f *os.File
|
||||
r *fs.ReaderAt
|
||||
|
||||
offset uint64
|
||||
}
|
||||
|
@ -68,6 +69,7 @@ func putTmpBlocksFile(tbf *tmpBlocksFile) {
|
|||
tbf.MustClose()
|
||||
tbf.buf = tbf.buf[:0]
|
||||
tbf.f = nil
|
||||
tbf.r = nil
|
||||
tbf.offset = 0
|
||||
tmpBlocksFilePool.Put(tbf)
|
||||
}
|
||||
|
@ -121,17 +123,20 @@ func (tbf *tmpBlocksFile) Finalize() error {
|
|||
if tbf.f == nil {
|
||||
return nil
|
||||
}
|
||||
fname := tbf.f.Name()
|
||||
if _, err := tbf.f.Write(tbf.buf); err != nil {
|
||||
return fmt.Errorf("cannot flush the remaining %d bytes to tmpBlocksFile: %s", len(tbf.buf), err)
|
||||
return fmt.Errorf("cannot write the remaining %d bytes to %q: %s", len(tbf.buf), fname, err)
|
||||
}
|
||||
tbf.buf = tbf.buf[:0]
|
||||
if _, err := tbf.f.Seek(0, 0); err != nil {
|
||||
logger.Panicf("FATAL: cannot seek to the start of file: %s", err)
|
||||
r, err := fs.OpenReaderAt(fname)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open %q: %s", fname, err)
|
||||
}
|
||||
// Hint the OS that the file is read almost sequentiallly.
|
||||
// This should reduce the number of disk seeks, which is important
|
||||
// for HDDs.
|
||||
fs.MustFadviseSequentialRead(tbf.f, true)
|
||||
r.MustFadviseSequentialRead(true)
|
||||
tbf.r = r
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -143,13 +148,7 @@ func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr)
|
|||
bb := tmpBufPool.Get()
|
||||
defer tmpBufPool.Put(bb)
|
||||
bb.B = bytesutil.Resize(bb.B, addr.size)
|
||||
n, err := tbf.f.ReadAt(bb.B, int64(addr.offset))
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read from %q at %s: %s", tbf.f.Name(), addr, err)
|
||||
}
|
||||
if n != len(bb.B) {
|
||||
logger.Panicf("FATAL: too short number of bytes read at %s; got %d; want %d", addr, n, len(bb.B))
|
||||
}
|
||||
tbf.r.MustReadAt(bb.B, int64(addr.offset))
|
||||
buf = bb.B
|
||||
}
|
||||
tail, err := storage.UnmarshalBlock(dst, buf)
|
||||
|
@ -167,6 +166,10 @@ func (tbf *tmpBlocksFile) MustClose() {
|
|||
if tbf.f == nil {
|
||||
return
|
||||
}
|
||||
if tbf.r != nil {
|
||||
// tbf.r could be nil if Finalize wasn't called.
|
||||
tbf.r.MustClose()
|
||||
}
|
||||
fname := tbf.f.Name()
|
||||
|
||||
// Remove the file at first, then close it.
|
||||
|
|
34
lib/fs/copy_mmap_cgo.go
Normal file
34
lib/fs/copy_mmap_cgo.go
Normal file
|
@ -0,0 +1,34 @@
|
|||
// +build cgo
|
||||
|
||||
package fs
|
||||
|
||||
// #cgo CFLAGS: -O3
|
||||
//
|
||||
// #include <stdint.h> // for uintptr_t
|
||||
// #include <string.h> // for memcpy
|
||||
//
|
||||
// // The memcpy_wrapper allows avoiding memory allocations during calls from Go.
|
||||
// // See https://github.com/golang/go/issues/24450 .
|
||||
// static void memcpy_wrapper(uintptr_t dst, uintptr_t src, size_t n) {
|
||||
// memcpy((void*)dst, (void*)src, n);
|
||||
// }
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// copyMmap copies len(dst) bytes from src to dst.
|
||||
func copyMmap(dst, src []byte) {
|
||||
// Copy data from mmap'ed src via cgo call in order to protect from goroutine stalls
|
||||
// when the copied data isn't available in RAM, so the OS triggers reading the data from file.
|
||||
// See https://medium.com/@valyala/mmap-in-go-considered-harmful-d92a25cb161d for details.
|
||||
dstPtr := C.uintptr_t(uintptr(unsafe.Pointer(&dst[0])))
|
||||
srcPtr := C.uintptr_t(uintptr(unsafe.Pointer(&src[0])))
|
||||
C.memcpy_wrapper(dstPtr, srcPtr, C.size_t(len(dst)))
|
||||
|
||||
// Prevent from GC'ing src or dst during C.memcpy_wrapper call.
|
||||
runtime.KeepAlive(src)
|
||||
runtime.KeepAlive(dst)
|
||||
}
|
12
lib/fs/copy_mmap_nocgo.go
Normal file
12
lib/fs/copy_mmap_nocgo.go
Normal file
|
@ -0,0 +1,12 @@
|
|||
// +build !cgo
|
||||
|
||||
package fs
|
||||
|
||||
// copyMmap copies len(dst) bytes from src to dst.
|
||||
func copyMmap(dst, src []byte) {
|
||||
// This may lead to goroutines stalls when the copied data isn't available in RAM.
|
||||
// In this case the OS triggers reading the data from file.
|
||||
// See https://medium.com/@valyala/mmap-in-go-considered-harmful-d92a25cb161d for details.
|
||||
// TODO: fix this
|
||||
copy(dst, src)
|
||||
}
|
|
@ -4,9 +4,7 @@ import (
|
|||
"os"
|
||||
)
|
||||
|
||||
// MustFadviseSequentialRead hints the OS that f is read mostly sequentially.
|
||||
//
|
||||
// if prefetch is set, then the OS is hinted to prefetch f data.
|
||||
func MustFadviseSequentialRead(f *os.File, prefetch bool) {
|
||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
||||
// TODO: implement this properly
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,22 +3,20 @@
|
|||
package fs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// MustFadviseSequentialRead hints the OS that f is read mostly sequentially.
|
||||
//
|
||||
// if prefetch is set, then the OS is hinted to prefetch f data.
|
||||
func MustFadviseSequentialRead(f *os.File, prefetch bool) {
|
||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
||||
fd := int(f.Fd())
|
||||
mode := unix.FADV_SEQUENTIAL
|
||||
if prefetch {
|
||||
mode |= unix.FADV_WILLNEED
|
||||
}
|
||||
if err := unix.Fadvise(int(fd), 0, 0, mode); err != nil {
|
||||
logger.Panicf("FATAL: error returned from unix.Fadvise(%d): %s", mode, err)
|
||||
return fmt.Errorf("error returned from unix.Fadvise(%d): %s", mode, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
60
lib/fs/fs.go
60
lib/fs/fs.go
|
@ -10,69 +10,9 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// MustReadAtCloser is rand-access read interface.
|
||||
type MustReadAtCloser interface {
|
||||
// MustReadAt must read len(p) bytes from offset off to p.
|
||||
MustReadAt(p []byte, off int64)
|
||||
|
||||
// MustClose must close the reader.
|
||||
MustClose()
|
||||
}
|
||||
|
||||
// ReaderAt implements rand-access read.
|
||||
type ReaderAt struct {
|
||||
f *os.File
|
||||
}
|
||||
|
||||
// MustReadAt reads len(p) bytes from off to p.
|
||||
func (ra *ReaderAt) MustReadAt(p []byte, off int64) {
|
||||
if len(p) == 0 {
|
||||
return
|
||||
}
|
||||
n, err := ra.f.ReadAt(p, off)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, ra.f.Name(), err)
|
||||
}
|
||||
if n != len(p) {
|
||||
logger.Panicf("FATAL: unexpected number of bytes read; got %d; want %d", n, len(p))
|
||||
}
|
||||
readCalls.Inc()
|
||||
readBytes.Add(len(p))
|
||||
}
|
||||
|
||||
// MustClose closes ra.
|
||||
func (ra *ReaderAt) MustClose() {
|
||||
if err := ra.f.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close file %q: %s", ra.f.Name(), err)
|
||||
}
|
||||
readersCount.Dec()
|
||||
}
|
||||
|
||||
// OpenReaderAt opens a file on the given path for random-read access.
|
||||
//
|
||||
// The file must be closed with MustClose when no longer needed.
|
||||
func OpenReaderAt(path string) (*ReaderAt, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
readersCount.Inc()
|
||||
ra := &ReaderAt{
|
||||
f: f,
|
||||
}
|
||||
return ra, nil
|
||||
}
|
||||
|
||||
var (
|
||||
readCalls = metrics.NewCounter(`vm_fs_read_calls_total`)
|
||||
readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`)
|
||||
readersCount = metrics.NewCounter(`vm_fs_readers`)
|
||||
)
|
||||
|
||||
// MustSyncPath syncs contents of the given path.
|
||||
func MustSyncPath(path string) {
|
||||
d, err := os.Open(path)
|
||||
|
|
133
lib/fs/reader_at.go
Normal file
133
lib/fs/reader_at.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package fs
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
var disableMmap = flag.Bool("fs.disableMmap", false, "Whether to use `pread` instead of `mmap` for reading data files")
|
||||
|
||||
// MustReadAtCloser is rand-access read interface.
|
||||
type MustReadAtCloser interface {
|
||||
// MustReadAt must read len(p) bytes from offset off to p.
|
||||
MustReadAt(p []byte, off int64)
|
||||
|
||||
// MustClose must close the reader.
|
||||
MustClose()
|
||||
}
|
||||
|
||||
type readerAt interface {
|
||||
ReadAt(p []byte, off int64) (int, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// ReaderAt implements rand-access reader.
|
||||
type ReaderAt struct {
|
||||
f *os.File
|
||||
mmapData []byte
|
||||
}
|
||||
|
||||
// MustReadAt reads len(p) bytes at off from r.
|
||||
func (r *ReaderAt) MustReadAt(p []byte, off int64) {
|
||||
if len(p) == 0 {
|
||||
return
|
||||
}
|
||||
if len(r.mmapData) == 0 || len(p) > 8*1024*1024 {
|
||||
// Read big blocks directly from file.
|
||||
// This could be faster than reading these blocks from mmap,
|
||||
// since it triggers less page faults.
|
||||
n, err := r.f.ReadAt(p, off)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read %d bytes at offset %d of file %q: %s", len(p), off, r.f.Name(), err)
|
||||
}
|
||||
if n != len(p) {
|
||||
logger.Panicf("FATAL: unexpected number of bytes read; got %d; want %d", n, len(p))
|
||||
}
|
||||
} else {
|
||||
if off < 0 || off > int64(len(r.mmapData)-len(p)) {
|
||||
logger.Panicf("off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(r.mmapData)-len(p), len(p))
|
||||
}
|
||||
copyMmap(p, r.mmapData[off:])
|
||||
}
|
||||
readCalls.Inc()
|
||||
readBytes.Add(len(p))
|
||||
}
|
||||
|
||||
// MustClose closes r.
|
||||
func (r *ReaderAt) MustClose() {
|
||||
fname := r.f.Name()
|
||||
if len(r.mmapData) > 0 {
|
||||
if err := unix.Munmap(r.mmapData); err != nil {
|
||||
logger.Panicf("FATAL: cannot unmap data for file %q: %s", fname, err)
|
||||
}
|
||||
r.mmapData = nil
|
||||
}
|
||||
MustClose(r.f)
|
||||
r.f = nil
|
||||
readersCount.Dec()
|
||||
}
|
||||
|
||||
// MustFadviseSequentialRead hints the OS that f is read mostly sequentially.
|
||||
//
|
||||
// if prefetch is set, then the OS is hinted to prefetch f data.
|
||||
func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
|
||||
if err := fadviseSequentialRead(r.f, prefetch); err != nil {
|
||||
logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.f.Name(), prefetch, err)
|
||||
}
|
||||
}
|
||||
|
||||
// OpenReaderAt opens ReaderAt for reading from filename.
|
||||
//
|
||||
// MustClose must be called on the returned ReaderAt when it is no longer needed.
|
||||
func OpenReaderAt(path string) (*ReaderAt, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open file %q for reader: %s", path, err)
|
||||
}
|
||||
var r ReaderAt
|
||||
r.f = f
|
||||
if !*disableMmap {
|
||||
data, err := mmapFile(f)
|
||||
if err != nil {
|
||||
MustClose(f)
|
||||
return nil, fmt.Errorf("cannot init reader for %q: %s", path, err)
|
||||
}
|
||||
r.mmapData = data
|
||||
}
|
||||
r.MustFadviseSequentialRead(false)
|
||||
readersCount.Inc()
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
var (
|
||||
readCalls = metrics.NewCounter(`vm_fs_read_calls_total`)
|
||||
readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`)
|
||||
readersCount = metrics.NewCounter(`vm_fs_readers`)
|
||||
)
|
||||
|
||||
func mmapFile(f *os.File) ([]byte, error) {
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in stat: %s", err)
|
||||
}
|
||||
size := fi.Size()
|
||||
if size == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if size < 0 {
|
||||
return nil, fmt.Errorf("got negative file size: %d bytes", size)
|
||||
}
|
||||
if int64(int(size)) != size {
|
||||
return nil, fmt.Errorf("file is too big to be mmap'ed: %d bytes", size)
|
||||
}
|
||||
data, err := unix.Mmap(int(f.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot mmap file with size %d: %s", size, err)
|
||||
}
|
||||
return data, nil
|
||||
}
|
Loading…
Reference in a new issue