mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/fs: optimize queries that read recent samples for big number of time series
Use standard copy() func instead of mmap-aware copy func for reading recently touched mmap-ed data. This improves read performance by up to 4x.
This commit is contained in:
parent
4cb3af1a36
commit
d120197676
3 changed files with 150 additions and 12 deletions
|
@ -4,6 +4,9 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -28,6 +31,18 @@ type MustReadAtCloser interface {
|
|||
type ReaderAt struct {
|
||||
f *os.File
|
||||
mmapData []byte
|
||||
|
||||
// pageCacheBitmap holds a bitmap for recently touched pages in mmapData.
|
||||
// This bitmap allows using simple copy() instead of copyMmap() for reading recently touched pages,
|
||||
// which is up to 4x faster when reading small chunks of data via MustReadAt.
|
||||
pageCacheBitmap atomic.Value
|
||||
pageCacheBitmapWG sync.WaitGroup
|
||||
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
type pageCacheBitmap struct {
|
||||
m []uint64
|
||||
}
|
||||
|
||||
// MustReadAt reads len(p) bytes at off from r.
|
||||
|
@ -35,7 +50,11 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
|
|||
if len(p) == 0 {
|
||||
return
|
||||
}
|
||||
if len(r.mmapData) == 0 || len(p) > 8*1024 {
|
||||
if off < 0 {
|
||||
logger.Panicf("off=%d cannot be negative", off)
|
||||
}
|
||||
end := off + int64(len(p))
|
||||
if len(r.mmapData) == 0 || (len(p) > 8*1024 && !r.isInPageCache(off, end)) {
|
||||
// Read big blocks directly from file.
|
||||
// This could be faster than reading these blocks from mmap,
|
||||
// since it triggers less page faults.
|
||||
|
@ -46,18 +65,72 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
|
|||
if n != len(p) {
|
||||
logger.Panicf("FATAL: unexpected number of bytes read; got %d; want %d", n, len(p))
|
||||
}
|
||||
if len(r.mmapData) > 0 {
|
||||
r.markInPageCache(off, end)
|
||||
}
|
||||
} else {
|
||||
if off < 0 || off > int64(len(r.mmapData)-len(p)) {
|
||||
if off > int64(len(r.mmapData)-len(p)) {
|
||||
logger.Panicf("off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(r.mmapData)-len(p), len(p))
|
||||
}
|
||||
copyMmap(p, r.mmapData[off:])
|
||||
src := r.mmapData[off:]
|
||||
if r.isInPageCache(off, end) {
|
||||
// It is safe copying the data with copy(), since it is likely it is in the page cache.
|
||||
// This is up to 4x faster than copyMmap() below.
|
||||
copy(p, src)
|
||||
} else {
|
||||
// The data may be missing in the page cache, so it is better to copy it via cgo trick
|
||||
// in order to avoid P stalls in Go runtime.
|
||||
// See https://medium.com/@valyala/mmap-in-go-considered-harmful-d92a25cb161d for details.
|
||||
copyMmap(p, src)
|
||||
r.markInPageCache(off, end)
|
||||
}
|
||||
}
|
||||
readCalls.Inc()
|
||||
readBytes.Add(len(p))
|
||||
}
|
||||
|
||||
func (r *ReaderAt) isInPageCache(start, end int64) bool {
|
||||
startBit := uint64(start) / pageSize
|
||||
endBit := uint64(end) / pageSize
|
||||
m := r.pageCacheBitmap.Load().(*pageCacheBitmap).m
|
||||
for startBit <= endBit {
|
||||
idx := startBit / 64
|
||||
off := startBit % 64
|
||||
if idx >= uint64(len(m)) {
|
||||
return true
|
||||
}
|
||||
n := atomic.LoadUint64(&m[idx])
|
||||
if (n>>off)&1 != 1 {
|
||||
return false
|
||||
}
|
||||
startBit++
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *ReaderAt) markInPageCache(start, end int64) {
|
||||
startBit := uint64(start) / pageSize
|
||||
endBit := uint64(end) / pageSize
|
||||
m := r.pageCacheBitmap.Load().(*pageCacheBitmap).m
|
||||
for startBit <= endBit {
|
||||
idx := startBit / 64
|
||||
off := startBit % 64
|
||||
n := atomic.LoadUint64(&m[idx])
|
||||
n |= 1 << off
|
||||
// It is OK if multiple concurrent goroutines store the same m[idx].
|
||||
atomic.StoreUint64(&m[idx], n)
|
||||
startBit++
|
||||
}
|
||||
}
|
||||
|
||||
// Assume page size is 4KB
|
||||
const pageSize = 4 * 1024
|
||||
|
||||
// MustClose closes r.
|
||||
func (r *ReaderAt) MustClose() {
|
||||
close(r.stopCh)
|
||||
r.pageCacheBitmapWG.Wait()
|
||||
|
||||
fname := r.f.Name()
|
||||
if len(r.mmapData) > 0 {
|
||||
if err := unix.Munmap(r.mmapData); err != nil {
|
||||
|
@ -80,8 +153,24 @@ func OpenReaderAt(path string) (*ReaderAt, error) {
|
|||
}
|
||||
var r ReaderAt
|
||||
r.f = f
|
||||
r.stopCh = make(chan struct{})
|
||||
if !*disableMmap {
|
||||
data, err := mmapFile(f)
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in stat: %s", err)
|
||||
}
|
||||
size := fi.Size()
|
||||
bm := &pageCacheBitmap{
|
||||
m: make([]uint64, 1+size/pageSize/64),
|
||||
}
|
||||
r.pageCacheBitmap.Store(bm)
|
||||
r.pageCacheBitmapWG.Add(1)
|
||||
go func() {
|
||||
defer r.pageCacheBitmapWG.Done()
|
||||
pageCacheBitmapCleaner(&r.pageCacheBitmap, r.stopCh)
|
||||
}()
|
||||
|
||||
data, err := mmapFile(f, size)
|
||||
if err != nil {
|
||||
MustClose(f)
|
||||
return nil, fmt.Errorf("cannot init reader for %q: %s", path, err)
|
||||
|
@ -92,18 +181,30 @@ func OpenReaderAt(path string) (*ReaderAt, error) {
|
|||
return &r, nil
|
||||
}
|
||||
|
||||
func pageCacheBitmapCleaner(pcbm *atomic.Value, stopCh <-chan struct{}) {
|
||||
t := time.NewTimer(time.Minute)
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
bmOld := pcbm.Load().(*pageCacheBitmap)
|
||||
bm := &pageCacheBitmap{
|
||||
m: make([]uint64, len(bmOld.m)),
|
||||
}
|
||||
pcbm.Store(bm)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
readCalls = metrics.NewCounter(`vm_fs_read_calls_total`)
|
||||
readBytes = metrics.NewCounter(`vm_fs_read_bytes_total`)
|
||||
readersCount = metrics.NewCounter(`vm_fs_readers`)
|
||||
)
|
||||
|
||||
func mmapFile(f *os.File) ([]byte, error) {
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in stat: %s", err)
|
||||
}
|
||||
size := fi.Size()
|
||||
func mmapFile(f *os.File, size int64) ([]byte, error) {
|
||||
if size == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
37
lib/fs/reader_at_test.go
Normal file
37
lib/fs/reader_at_test.go
Normal file
|
@ -0,0 +1,37 @@
|
|||
package fs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestReaderAt(t *testing.T) {
|
||||
for _, bufSize := range []int{1, 1e1, 1e2, 1e3, 1e4, 1e5} {
|
||||
t.Run(fmt.Sprintf("%d", bufSize), func(t *testing.T) {
|
||||
testReaderAt(t, bufSize)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testReaderAt(t *testing.T, bufSize int) {
|
||||
path := "TestReaderAt"
|
||||
const fileSize = 8 * 1024 * 1024
|
||||
data := make([]byte, fileSize)
|
||||
if err := ioutil.WriteFile(path, data, 0600); err != nil {
|
||||
t.Fatalf("cannot create %q: %s", path, err)
|
||||
}
|
||||
defer MustRemoveAll(path)
|
||||
r, err := OpenReaderAt(path)
|
||||
if err != nil {
|
||||
t.Fatalf("error in OpenReaderAt(%q): %s", path, err)
|
||||
}
|
||||
defer r.MustClose()
|
||||
|
||||
buf := make([]byte, bufSize)
|
||||
for i := 0; i < fileSize-bufSize; i += bufSize {
|
||||
offset := int64(i)
|
||||
r.MustReadAt(buf[:0], offset)
|
||||
r.MustReadAt(buf, offset)
|
||||
}
|
||||
}
|
|
@ -23,7 +23,7 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) {
|
|||
}()
|
||||
|
||||
path := "BenchmarkReaderAtMustReadAt"
|
||||
const fileSize = 1024 * 1024
|
||||
const fileSize = 8 * 1024 * 1024
|
||||
data := make([]byte, fileSize)
|
||||
if err := ioutil.WriteFile(path, data, 0600); err != nil {
|
||||
b.Fatalf("cannot create %q: %s", path, err)
|
||||
|
@ -36,7 +36,7 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) {
|
|||
defer r.MustClose()
|
||||
|
||||
b.ResetTimer()
|
||||
for _, bufSize := range []int{1, 10, 100, 1000, 10000} {
|
||||
for _, bufSize := range []int{1, 1e1, 1e2, 1e3, 1e4, 1e5} {
|
||||
b.Run(fmt.Sprintf("%d", bufSize), func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(bufSize))
|
||||
|
|
Loading…
Reference in a new issue