From caeb74f0686a5da4cbfd3bbfc1affab0784dc3e4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 4 Nov 2020 16:46:10 +0200 Subject: [PATCH] app/vmselect: reduce memory usage when query touches big number of time series --- CHANGELOG.md | 3 + app/vmselect/main.go | 5 + app/vmselect/netstorage/netstorage.go | 54 ++++-- app/vmselect/netstorage/tmp_blocks_file.go | 188 +++++++++++++++++++++ lib/fs/fadvise_darwin.go | 10 ++ lib/fs/fadvise_openbsd.go | 10 ++ lib/fs/fadvise_unix.go | 22 +++ lib/fs/reader_at.go | 9 + lib/storage/search.go | 30 ++++ 9 files changed, 320 insertions(+), 11 deletions(-) create mode 100644 app/vmselect/netstorage/tmp_blocks_file.go create mode 100644 lib/fs/fadvise_darwin.go create mode 100644 lib/fs/fadvise_openbsd.go create mode 100644 lib/fs/fadvise_unix.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d2d765e3d..dcc365d3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ # tip +* FEATURE: reduce memory usage when query touches big number of time series. + + # [v1.45.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.45.0) * FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now. diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 761e75f39..d184eba77 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -10,10 +10,12 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" @@ -45,6 +47,9 @@ func getDefaultMaxConcurrentRequests() int { // Init initializes vmselect func Init() { + tmpDirPath := *vmstorage.DataPath + "/tmp" + fs.RemoveDirContents(tmpDirPath) + netstorage.InitTmpBlocksDir(tmpDirPath) promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult") concurrencyCh = make(chan struct{}, *maxConcurrentRequests) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 240e282b3..dd452770e 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -57,6 +57,7 @@ type Results struct { packedTimeseries []packedTimeseries sr *storage.Search + tbf *tmpBlocksFile } // Len returns the number of results in rss. @@ -72,6 +73,8 @@ func (rss *Results) Cancel() { func (rss *Results) mustClose() { putStorageSearch(rss.sr) rss.sr = nil + putTmpBlocksFile(rss.tbf) + rss.tbf = nil } var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16) @@ -105,7 +108,7 @@ func timeseriesWorker(workerID uint) { tsw.doneCh <- nil continue } - if err := tsw.pts.Unpack(&rs, rss.tr, rss.fetchData); err != nil { + if err := tsw.pts.Unpack(&rs, rss.tbf, rss.tr, rss.fetchData); err != nil { tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err) continue } @@ -179,27 +182,29 @@ var gomaxprocs = runtime.GOMAXPROCS(-1) type packedTimeseries struct { metricName string - brs []storage.BlockRef + brs []blockRef } var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128) type unpackWorkItem struct { - br storage.BlockRef + br blockRef tr storage.TimeRange } type unpackWork struct { + tbf *tmpBlocksFile ws []unpackWorkItem sbs []*sortBlock doneCh chan error } func (upw *unpackWork) reset() { + upw.tbf = nil ws := upw.ws for i := range ws { w := &ws[i] - w.br = storage.BlockRef{} + w.br = blockRef{} w.tr = storage.TimeRange{} } upw.ws = upw.ws[:0] @@ -216,7 +221,7 @@ func (upw *unpackWork) reset() { func (upw *unpackWork) unpack(tmpBlock *storage.Block) { for _, w := range upw.ws { sb := getSortBlock() - if err := sb.unpackFrom(tmpBlock, w.br, w.tr); err != nil { + if err := sb.unpackFrom(tmpBlock, upw.tbf, w.br, w.tr); err != nil { putSortBlock(sb) upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) return @@ -262,7 +267,7 @@ func unpackWorker() { var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1) // Unpack unpacks pts to dst. -func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error { +func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error { dst.reset() if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err) @@ -276,11 +281,13 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData brsLen := len(pts.brs) upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize) upw := getUnpackWork() + upw.tbf = tbf for _, br := range pts.brs { if len(upw.ws) >= unpackBatchSize { unpackWorkCh <- upw upws = append(upws, upw) upw = getUnpackWork() + upw.tbf = tbf } upw.ws = append(upw.ws, unpackWorkItem{ br: br, @@ -397,9 +404,10 @@ func (sb *sortBlock) reset() { sb.NextIdx = 0 } -func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange) error { +func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, br blockRef, tr storage.TimeRange) error { tmpBlock.Reset() - br.MustReadBlock(tmpBlock, true) + brReal := tbf.MustReadBlockRefAt(br.partRef, br.addr) + brReal.MustReadBlock(tmpBlock, true) if err := tmpBlock.UnmarshalData(); err != nil { return fmt.Errorf("cannot unmarshal block: %w", err) } @@ -709,19 +717,31 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search sr := getStorageSearch() maxSeriesCount := sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) - - m := make(map[string][]storage.BlockRef, maxSeriesCount) + m := make(map[string][]blockRef, maxSeriesCount) orderedMetricNames := make([]string, 0, maxSeriesCount) blocksRead := 0 + tbf := getTmpBlocksFile() + var buf []byte for sr.NextMetricBlock() { blocksRead++ if deadline.Exceeded() { + putTmpBlocksFile(tbf) putStorageSearch(sr) return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String()) } + buf = sr.MetricBlockRef.BlockRef.Marshal(buf[:0]) + addr, err := tbf.WriteBlockRefData(buf) + if err != nil { + putTmpBlocksFile(tbf) + putStorageSearch(sr) + return nil, fmt.Errorf("cannot write %d bytes to temporary file: %w", len(buf), err) + } metricName := sr.MetricBlockRef.MetricName brs := m[string(metricName)] - brs = append(brs, *sr.MetricBlockRef.BlockRef) + brs = append(brs, blockRef{ + partRef: sr.MetricBlockRef.BlockRef.PartRef(), + addr: addr, + }) if len(brs) > 1 { // An optimization: do not allocate a string for already existing metricName key in m m[string(metricName)] = brs @@ -733,12 +753,18 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search } } if err := sr.Error(); err != nil { + putTmpBlocksFile(tbf) putStorageSearch(sr) if errors.Is(err, storage.ErrDeadlineExceeded) { return nil, fmt.Errorf("timeout exceeded during the query: %s", deadline.String()) } return nil, fmt.Errorf("search error after reading %d data blocks: %w", blocksRead, err) } + if err := tbf.Finalize(); err != nil { + putTmpBlocksFile(tbf) + putStorageSearch(sr) + return nil, fmt.Errorf("cannot finalize temporary file: %w", err) + } var rss Results rss.tr = tr @@ -753,9 +779,15 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search } rss.packedTimeseries = pts rss.sr = sr + rss.tbf = tbf return &rss, nil } +type blockRef struct { + partRef storage.PartRef + addr tmpBlockAddr +} + func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) { tfss := make([]*storage.TagFilters, 0, len(tagFilterss)) for _, tagFilters := range tagFilterss { diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go new file mode 100644 index 000000000..71bd7561c --- /dev/null +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -0,0 +1,188 @@ +package netstorage + +import ( + "fmt" + "io/ioutil" + "os" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" +) + +// InitTmpBlocksDir initializes directory to store temporary search results. +// +// It stores data in system-defined temporary directory if tmpDirPath is empty. +func InitTmpBlocksDir(tmpDirPath string) { + if len(tmpDirPath) == 0 { + tmpDirPath = os.TempDir() + } + tmpBlocksDir = tmpDirPath + "/searchResults" + fs.MustRemoveAll(tmpBlocksDir) + if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil { + logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err) + } +} + +var tmpBlocksDir string + +func maxInmemoryTmpBlocksFile() int { + mem := memory.Allowed() + maxLen := mem / 1024 + if maxLen < 64*1024 { + return 64 * 1024 + } + if maxLen > 4*1024*1024 { + return 4 * 1024 * 1024 + } + return maxLen +} + +var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 { + return float64(maxInmemoryTmpBlocksFile()) +}) + +type tmpBlocksFile struct { + buf []byte + + f *os.File + r *fs.ReaderAt + + offset uint64 +} + +func getTmpBlocksFile() *tmpBlocksFile { + v := tmpBlocksFilePool.Get() + if v == nil { + return &tmpBlocksFile{ + buf: make([]byte, 0, maxInmemoryTmpBlocksFile()), + } + } + return v.(*tmpBlocksFile) +} + +func putTmpBlocksFile(tbf *tmpBlocksFile) { + tbf.MustClose() + tbf.buf = tbf.buf[:0] + tbf.f = nil + tbf.r = nil + tbf.offset = 0 + tmpBlocksFilePool.Put(tbf) +} + +var tmpBlocksFilePool sync.Pool + +type tmpBlockAddr struct { + offset uint64 + size int +} + +func (addr tmpBlockAddr) String() string { + return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size) +} + +var ( + tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`) + _ = metrics.NewGauge(`vm_tmp_blocks_files_directory_free_bytes`, func() float64 { + return float64(fs.MustGetFreeSpace(tmpBlocksDir)) + }) +) + +// WriteBlockRefData writes br to tbf. +// +// It returns errors since the operation may fail on space shortage +// and this must be handled. +func (tbf *tmpBlocksFile) WriteBlockRefData(b []byte) (tmpBlockAddr, error) { + var addr tmpBlockAddr + addr.offset = tbf.offset + addr.size = len(b) + tbf.offset += uint64(addr.size) + if len(tbf.buf)+len(b) <= cap(tbf.buf) { + // Fast path - the data fits tbf.buf + tbf.buf = append(tbf.buf, b...) + return addr, nil + } + + // Slow path: flush the data from tbf.buf to file. + if tbf.f == nil { + f, err := ioutil.TempFile(tmpBlocksDir, "") + if err != nil { + return addr, err + } + tbf.f = f + tmpBlocksFilesCreated.Inc() + } + _, err := tbf.f.Write(tbf.buf) + tbf.buf = append(tbf.buf[:0], b...) + if err != nil { + return addr, fmt.Errorf("cannot write block to %q: %w", tbf.f.Name(), err) + } + return addr, nil +} + +func (tbf *tmpBlocksFile) Finalize() error { + if tbf.f == nil { + return nil + } + fname := tbf.f.Name() + if _, err := tbf.f.Write(tbf.buf); err != nil { + return fmt.Errorf("cannot write the remaining %d bytes to %q: %w", len(tbf.buf), fname, err) + } + tbf.buf = tbf.buf[:0] + r, err := fs.OpenReaderAt(fname) + if err != nil { + logger.Panicf("FATAL: cannot open %q: %s", fname, err) + } + // Hint the OS that the file is read almost sequentiallly. + // This should reduce the number of disk seeks, which is important + // for HDDs. + r.MustFadviseSequentialRead(true) + tbf.r = r + return nil +} + +func (tbf *tmpBlocksFile) MustReadBlockRefAt(partRef storage.PartRef, addr tmpBlockAddr) storage.BlockRef { + var buf []byte + if tbf.f == nil { + buf = tbf.buf[addr.offset : addr.offset+uint64(addr.size)] + } else { + bb := tmpBufPool.Get() + defer tmpBufPool.Put(bb) + bb.B = bytesutil.Resize(bb.B, addr.size) + tbf.r.MustReadAt(bb.B, int64(addr.offset)) + buf = bb.B + } + var br storage.BlockRef + if err := br.Init(partRef, buf); err != nil { + logger.Panicf("FATAL: cannot initialize BlockRef: %s", err) + } + return br +} + +var tmpBufPool bytesutil.ByteBufferPool + +func (tbf *tmpBlocksFile) MustClose() { + if tbf.f == nil { + return + } + if tbf.r != nil { + // tbf.r could be nil if Finalize wasn't called. + tbf.r.MustClose() + } + fname := tbf.f.Name() + + // Remove the file at first, then close it. + // This way the OS shouldn't try to flush file contents to storage + // on close. + if err := os.Remove(fname); err != nil { + logger.Panicf("FATAL: cannot remove %q: %s", fname, err) + } + if err := tbf.f.Close(); err != nil { + logger.Panicf("FATAL: cannot close %q: %s", fname, err) + } + tbf.f = nil +} diff --git a/lib/fs/fadvise_darwin.go b/lib/fs/fadvise_darwin.go new file mode 100644 index 000000000..73cfe81a7 --- /dev/null +++ b/lib/fs/fadvise_darwin.go @@ -0,0 +1,10 @@ +package fs + +import ( + "os" +) + +func fadviseSequentialRead(f *os.File, prefetch bool) error { + // TODO: implement this properly + return nil +} diff --git a/lib/fs/fadvise_openbsd.go b/lib/fs/fadvise_openbsd.go new file mode 100644 index 000000000..73cfe81a7 --- /dev/null +++ b/lib/fs/fadvise_openbsd.go @@ -0,0 +1,10 @@ +package fs + +import ( + "os" +) + +func fadviseSequentialRead(f *os.File, prefetch bool) error { + // TODO: implement this properly + return nil +} diff --git a/lib/fs/fadvise_unix.go b/lib/fs/fadvise_unix.go new file mode 100644 index 000000000..1af313b04 --- /dev/null +++ b/lib/fs/fadvise_unix.go @@ -0,0 +1,22 @@ +// +build linux freebsd + +package fs + +import ( + "fmt" + "os" + + "golang.org/x/sys/unix" +) + +func fadviseSequentialRead(f *os.File, prefetch bool) error { + fd := int(f.Fd()) + mode := unix.FADV_SEQUENTIAL + if prefetch { + mode |= unix.FADV_WILLNEED + } + if err := unix.Fadvise(int(fd), 0, 0, mode); err != nil { + return fmt.Errorf("error returned from unix.Fadvise(%d): %w", mode, err) + } + return nil +} diff --git a/lib/fs/reader_at.go b/lib/fs/reader_at.go index ef30eb019..175292cc0 100644 --- a/lib/fs/reader_at.go +++ b/lib/fs/reader_at.go @@ -149,6 +149,15 @@ func (r *ReaderAt) MustClose() { readersCount.Dec() } +// MustFadviseSequentialRead hints the OS that f is read mostly sequentially. +// +// if prefetch is set, then the OS is hinted to prefetch f data. +func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) { + if err := fadviseSequentialRead(r.f, prefetch); err != nil { + logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.f.Name(), prefetch, err) + } +} + // OpenReaderAt opens ReaderAt for reading from filename. // // MustClose must be called on the returned ReaderAt when it is no longer needed. diff --git a/lib/storage/search.go b/lib/storage/search.go index 60c209906..0ce480a95 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -30,6 +30,36 @@ func (br *BlockRef) init(p *part, bh *blockHeader) { br.bh = *bh } +// Init initializes br from pr and data +func (br *BlockRef) Init(pr PartRef, data []byte) error { + br.p = pr.p + tail, err := br.bh.Unmarshal(data) + if err != nil { + return err + } + if len(tail) > 0 { + return fmt.Errorf("unexpected non-empty tail left after unmarshaling blockHeader; len(tail)=%d; tail=%q", len(tail), tail) + } + return nil +} + +// Marshal marshals br to dst. +func (br *BlockRef) Marshal(dst []byte) []byte { + return br.bh.Marshal(dst) +} + +// PartRef returns PartRef from br. +func (br *BlockRef) PartRef() PartRef { + return PartRef{ + p: br.p, + } +} + +// PartRef is Part reference. +type PartRef struct { + p *part +} + // MustReadBlock reads block from br to dst. // // if fetchData is false, then only block header is read, otherwise all the data is read.