diff --git a/lib/fs/reader_at.go b/lib/fs/reader_at.go index 3a9c5f439..7dcdbbd04 100644 --- a/lib/fs/reader_at.go +++ b/lib/fs/reader_at.go @@ -4,6 +4,9 @@ import ( "flag" "fmt" "os" + "sync" + "sync/atomic" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" @@ -28,6 +31,18 @@ type MustReadAtCloser interface { type ReaderAt struct { f *os.File mmapData []byte + + // pageCacheBitmap holds a bitmap for recently touched pages in mmapData. + // This bitmap allows using simple copy() instead of copyMmap() for reading recently touched pages, + // which is up to 4x faster when reading small chunks of data via MustReadAt. + pageCacheBitmap atomic.Value + pageCacheBitmapWG sync.WaitGroup + + stopCh chan struct{} +} + +type pageCacheBitmap struct { + m []uint64 } // MustReadAt reads len(p) bytes at off from r. @@ -35,7 +50,11 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) { if len(p) == 0 { return } - if len(r.mmapData) == 0 || len(p) > 8*1024 { + if off < 0 { + logger.Panicf("off=%d cannot be negative", off) + } + end := off + int64(len(p)) + if len(r.mmapData) == 0 || (len(p) > 8*1024 && !r.isInPageCache(off, end)) { // Read big blocks directly from file. // This could be faster than reading these blocks from mmap, // since it triggers less page faults. @@ -46,18 +65,72 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) { if n != len(p) { logger.Panicf("FATAL: unexpected number of bytes read; got %d; want %d", n, len(p)) } + if len(r.mmapData) > 0 { + r.markInPageCache(off, end) + } } else { - if off < 0 || off > int64(len(r.mmapData)-len(p)) { + if off > int64(len(r.mmapData)-len(p)) { logger.Panicf("off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(r.mmapData)-len(p), len(p)) } - copyMmap(p, r.mmapData[off:]) + src := r.mmapData[off:] + if r.isInPageCache(off, end) { + // It is safe copying the data with copy(), since it is likely it is in the page cache. + // This is up to 4x faster than copyMmap() below. + copy(p, src) + } else { + // The data may be missing in the page cache, so it is better to copy it via cgo trick + // in order to avoid P stalls in Go runtime. + // See https://medium.com/@valyala/mmap-in-go-considered-harmful-d92a25cb161d for details. + copyMmap(p, src) + r.markInPageCache(off, end) + } } readCalls.Inc() readBytes.Add(len(p)) } +func (r *ReaderAt) isInPageCache(start, end int64) bool { + startBit := uint64(start) / pageSize + endBit := uint64(end) / pageSize + m := r.pageCacheBitmap.Load().(*pageCacheBitmap).m + for startBit <= endBit { + idx := startBit / 64 + off := startBit % 64 + if idx >= uint64(len(m)) { + return true + } + n := atomic.LoadUint64(&m[idx]) + if (n>>off)&1 != 1 { + return false + } + startBit++ + } + return true +} + +func (r *ReaderAt) markInPageCache(start, end int64) { + startBit := uint64(start) / pageSize + endBit := uint64(end) / pageSize + m := r.pageCacheBitmap.Load().(*pageCacheBitmap).m + for startBit <= endBit { + idx := startBit / 64 + off := startBit % 64 + n := atomic.LoadUint64(&m[idx]) + n |= 1 << off + // It is OK if multiple concurrent goroutines store the same m[idx]. + atomic.StoreUint64(&m[idx], n) + startBit++ + } +} + +// Assume page size is 4KB +const pageSize = 4 * 1024 + // MustClose closes r. func (r *ReaderAt) MustClose() { + close(r.stopCh) + r.pageCacheBitmapWG.Wait() + fname := r.f.Name() if len(r.mmapData) > 0 { if err := unix.Munmap(r.mmapData); err != nil { @@ -80,8 +153,24 @@ func OpenReaderAt(path string) (*ReaderAt, error) { } var r ReaderAt r.f = f + r.stopCh = make(chan struct{}) if !*disableMmap { - data, err := mmapFile(f) + fi, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("error in stat: %s", err) + } + size := fi.Size() + bm := &pageCacheBitmap{ + m: make([]uint64, 1+size/pageSize/64), + } + r.pageCacheBitmap.Store(bm) + r.pageCacheBitmapWG.Add(1) + go func() { + defer r.pageCacheBitmapWG.Done() + pageCacheBitmapCleaner(&r.pageCacheBitmap, r.stopCh) + }() + + data, err := mmapFile(f, size) if err != nil { MustClose(f) return nil, fmt.Errorf("cannot init reader for %q: %s", path, err) @@ -92,18 +181,30 @@ func OpenReaderAt(path string) (*ReaderAt, error) { return &r, nil } +func pageCacheBitmapCleaner(pcbm *atomic.Value, stopCh <-chan struct{}) { + t := time.NewTimer(time.Minute) + for { + select { + case <-stopCh: + t.Stop() + return + case <-t.C: + } + bmOld := pcbm.Load().(*pageCacheBitmap) + bm := &pageCacheBitmap{ + m: make([]uint64, len(bmOld.m)), + } + pcbm.Store(bm) + } +} + var ( readCalls = metrics.NewCounter(`vm_fs_read_calls_total`) readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`) readersCount = metrics.NewCounter(`vm_fs_readers`) ) -func mmapFile(f *os.File) ([]byte, error) { - fi, err := f.Stat() - if err != nil { - return nil, fmt.Errorf("error in stat: %s", err) - } - size := fi.Size() +func mmapFile(f *os.File, size int64) ([]byte, error) { if size == 0 { return nil, nil } diff --git a/lib/fs/reader_at_test.go b/lib/fs/reader_at_test.go new file mode 100644 index 000000000..10d84c738 --- /dev/null +++ b/lib/fs/reader_at_test.go @@ -0,0 +1,37 @@ +package fs + +import ( + "fmt" + "io/ioutil" + "testing" +) + +func TestReaderAt(t *testing.T) { + for _, bufSize := range []int{1, 1e1, 1e2, 1e3, 1e4, 1e5} { + t.Run(fmt.Sprintf("%d", bufSize), func(t *testing.T) { + testReaderAt(t, bufSize) + }) + } +} + +func testReaderAt(t *testing.T, bufSize int) { + path := "TestReaderAt" + const fileSize = 8 * 1024 * 1024 + data := make([]byte, fileSize) + if err := ioutil.WriteFile(path, data, 0600); err != nil { + t.Fatalf("cannot create %q: %s", path, err) + } + defer MustRemoveAll(path) + r, err := OpenReaderAt(path) + if err != nil { + t.Fatalf("error in OpenReaderAt(%q): %s", path, err) + } + defer r.MustClose() + + buf := make([]byte, bufSize) + for i := 0; i < fileSize-bufSize; i += bufSize { + offset := int64(i) + r.MustReadAt(buf[:0], offset) + r.MustReadAt(buf, offset) + } +} diff --git a/lib/fs/reader_at_timing_test.go b/lib/fs/reader_at_timing_test.go index e28b45395..84eb1e65a 100644 --- a/lib/fs/reader_at_timing_test.go +++ b/lib/fs/reader_at_timing_test.go @@ -23,7 +23,7 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) { }() path := "BenchmarkReaderAtMustReadAt" - const fileSize = 1024 * 1024 + const fileSize = 8 * 1024 * 1024 data := make([]byte, fileSize) if err := ioutil.WriteFile(path, data, 0600); err != nil { b.Fatalf("cannot create %q: %s", path, err) @@ -36,7 +36,7 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) { defer r.MustClose() b.ResetTimer() - for _, bufSize := range []int{1, 10, 100, 1000, 10000} { + for _, bufSize := range []int{1, 1e1, 1e2, 1e3, 1e4, 1e5} { b.Run(fmt.Sprintf("%d", bufSize), func(b *testing.B) { b.ReportAllocs() b.SetBytes(int64(bufSize))