app/vmselect: reduce memory usage when query touches big number of time series

This commit is contained in:
Aliaksandr Valialkin 2020-11-04 16:46:10 +02:00
parent ae91a6883c
commit caeb74f068
9 changed files with 320 additions and 11 deletions

View file

@ -1,6 +1,9 @@
# tip # tip
* FEATURE: reduce memory usage when query touches big number of time series.
# [v1.45.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.45.0) # [v1.45.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.45.0)
* FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now. * FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now.

View file

@ -10,10 +10,12 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
@ -45,6 +47,9 @@ func getDefaultMaxConcurrentRequests() int {
// Init initializes vmselect // Init initializes vmselect
func Init() { func Init() {
tmpDirPath := *vmstorage.DataPath + "/tmp"
fs.RemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult") promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
concurrencyCh = make(chan struct{}, *maxConcurrentRequests) concurrencyCh = make(chan struct{}, *maxConcurrentRequests)

View file

@ -57,6 +57,7 @@ type Results struct {
packedTimeseries []packedTimeseries packedTimeseries []packedTimeseries
sr *storage.Search sr *storage.Search
tbf *tmpBlocksFile
} }
// Len returns the number of results in rss. // Len returns the number of results in rss.
@ -72,6 +73,8 @@ func (rss *Results) Cancel() {
func (rss *Results) mustClose() { func (rss *Results) mustClose() {
putStorageSearch(rss.sr) putStorageSearch(rss.sr)
rss.sr = nil rss.sr = nil
putTmpBlocksFile(rss.tbf)
rss.tbf = nil
} }
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16) var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
@ -105,7 +108,7 @@ func timeseriesWorker(workerID uint) {
tsw.doneCh <- nil tsw.doneCh <- nil
continue continue
} }
if err := tsw.pts.Unpack(&rs, rss.tr, rss.fetchData); err != nil { if err := tsw.pts.Unpack(&rs, rss.tbf, rss.tr, rss.fetchData); err != nil {
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err) tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
continue continue
} }
@ -179,27 +182,29 @@ var gomaxprocs = runtime.GOMAXPROCS(-1)
type packedTimeseries struct { type packedTimeseries struct {
metricName string metricName string
brs []storage.BlockRef brs []blockRef
} }
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128) var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
type unpackWorkItem struct { type unpackWorkItem struct {
br storage.BlockRef br blockRef
tr storage.TimeRange tr storage.TimeRange
} }
type unpackWork struct { type unpackWork struct {
tbf *tmpBlocksFile
ws []unpackWorkItem ws []unpackWorkItem
sbs []*sortBlock sbs []*sortBlock
doneCh chan error doneCh chan error
} }
func (upw *unpackWork) reset() { func (upw *unpackWork) reset() {
upw.tbf = nil
ws := upw.ws ws := upw.ws
for i := range ws { for i := range ws {
w := &ws[i] w := &ws[i]
w.br = storage.BlockRef{} w.br = blockRef{}
w.tr = storage.TimeRange{} w.tr = storage.TimeRange{}
} }
upw.ws = upw.ws[:0] upw.ws = upw.ws[:0]
@ -216,7 +221,7 @@ func (upw *unpackWork) reset() {
func (upw *unpackWork) unpack(tmpBlock *storage.Block) { func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
for _, w := range upw.ws { for _, w := range upw.ws {
sb := getSortBlock() sb := getSortBlock()
if err := sb.unpackFrom(tmpBlock, w.br, w.tr); err != nil { if err := sb.unpackFrom(tmpBlock, upw.tbf, w.br, w.tr); err != nil {
putSortBlock(sb) putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
return return
@ -262,7 +267,7 @@ func unpackWorker() {
var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1) var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
// Unpack unpacks pts to dst. // Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error { func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error {
dst.reset() dst.reset()
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err) return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
@ -276,11 +281,13 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData
brsLen := len(pts.brs) brsLen := len(pts.brs)
upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize) upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize)
upw := getUnpackWork() upw := getUnpackWork()
upw.tbf = tbf
for _, br := range pts.brs { for _, br := range pts.brs {
if len(upw.ws) >= unpackBatchSize { if len(upw.ws) >= unpackBatchSize {
unpackWorkCh <- upw unpackWorkCh <- upw
upws = append(upws, upw) upws = append(upws, upw)
upw = getUnpackWork() upw = getUnpackWork()
upw.tbf = tbf
} }
upw.ws = append(upw.ws, unpackWorkItem{ upw.ws = append(upw.ws, unpackWorkItem{
br: br, br: br,
@ -397,9 +404,10 @@ func (sb *sortBlock) reset() {
sb.NextIdx = 0 sb.NextIdx = 0
} }
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange) error { func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, br blockRef, tr storage.TimeRange) error {
tmpBlock.Reset() tmpBlock.Reset()
br.MustReadBlock(tmpBlock, true) brReal := tbf.MustReadBlockRefAt(br.partRef, br.addr)
brReal.MustReadBlock(tmpBlock, true)
if err := tmpBlock.UnmarshalData(); err != nil { if err := tmpBlock.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %w", err) return fmt.Errorf("cannot unmarshal block: %w", err)
} }
@ -709,19 +717,31 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
sr := getStorageSearch() sr := getStorageSearch()
maxSeriesCount := sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) maxSeriesCount := sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline())
m := make(map[string][]blockRef, maxSeriesCount)
m := make(map[string][]storage.BlockRef, maxSeriesCount)
orderedMetricNames := make([]string, 0, maxSeriesCount) orderedMetricNames := make([]string, 0, maxSeriesCount)
blocksRead := 0 blocksRead := 0
tbf := getTmpBlocksFile()
var buf []byte
for sr.NextMetricBlock() { for sr.NextMetricBlock() {
blocksRead++ blocksRead++
if deadline.Exceeded() { if deadline.Exceeded() {
putTmpBlocksFile(tbf)
putStorageSearch(sr) putStorageSearch(sr)
return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String()) return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
} }
buf = sr.MetricBlockRef.BlockRef.Marshal(buf[:0])
addr, err := tbf.WriteBlockRefData(buf)
if err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
return nil, fmt.Errorf("cannot write %d bytes to temporary file: %w", len(buf), err)
}
metricName := sr.MetricBlockRef.MetricName metricName := sr.MetricBlockRef.MetricName
brs := m[string(metricName)] brs := m[string(metricName)]
brs = append(brs, *sr.MetricBlockRef.BlockRef) brs = append(brs, blockRef{
partRef: sr.MetricBlockRef.BlockRef.PartRef(),
addr: addr,
})
if len(brs) > 1 { if len(brs) > 1 {
// An optimization: do not allocate a string for already existing metricName key in m // An optimization: do not allocate a string for already existing metricName key in m
m[string(metricName)] = brs m[string(metricName)] = brs
@ -733,12 +753,18 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
} }
} }
if err := sr.Error(); err != nil { if err := sr.Error(); err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr) putStorageSearch(sr)
if errors.Is(err, storage.ErrDeadlineExceeded) { if errors.Is(err, storage.ErrDeadlineExceeded) {
return nil, fmt.Errorf("timeout exceeded during the query: %s", deadline.String()) return nil, fmt.Errorf("timeout exceeded during the query: %s", deadline.String())
} }
return nil, fmt.Errorf("search error after reading %d data blocks: %w", blocksRead, err) return nil, fmt.Errorf("search error after reading %d data blocks: %w", blocksRead, err)
} }
if err := tbf.Finalize(); err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
return nil, fmt.Errorf("cannot finalize temporary file: %w", err)
}
var rss Results var rss Results
rss.tr = tr rss.tr = tr
@ -753,9 +779,15 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
} }
rss.packedTimeseries = pts rss.packedTimeseries = pts
rss.sr = sr rss.sr = sr
rss.tbf = tbf
return &rss, nil return &rss, nil
} }
type blockRef struct {
partRef storage.PartRef
addr tmpBlockAddr
}
func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) { func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(tagFilterss)) tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
for _, tagFilters := range tagFilterss { for _, tagFilters := range tagFilterss {

View file

@ -0,0 +1,188 @@
package netstorage
import (
"fmt"
"io/ioutil"
"os"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// InitTmpBlocksDir initializes directory to store temporary search results.
//
// It stores data in system-defined temporary directory if tmpDirPath is empty.
func InitTmpBlocksDir(tmpDirPath string) {
if len(tmpDirPath) == 0 {
tmpDirPath = os.TempDir()
}
tmpBlocksDir = tmpDirPath + "/searchResults"
fs.MustRemoveAll(tmpBlocksDir)
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err)
}
}
var tmpBlocksDir string
func maxInmemoryTmpBlocksFile() int {
mem := memory.Allowed()
maxLen := mem / 1024
if maxLen < 64*1024 {
return 64 * 1024
}
if maxLen > 4*1024*1024 {
return 4 * 1024 * 1024
}
return maxLen
}
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
return float64(maxInmemoryTmpBlocksFile())
})
type tmpBlocksFile struct {
buf []byte
f *os.File
r *fs.ReaderAt
offset uint64
}
func getTmpBlocksFile() *tmpBlocksFile {
v := tmpBlocksFilePool.Get()
if v == nil {
return &tmpBlocksFile{
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
}
}
return v.(*tmpBlocksFile)
}
func putTmpBlocksFile(tbf *tmpBlocksFile) {
tbf.MustClose()
tbf.buf = tbf.buf[:0]
tbf.f = nil
tbf.r = nil
tbf.offset = 0
tmpBlocksFilePool.Put(tbf)
}
var tmpBlocksFilePool sync.Pool
type tmpBlockAddr struct {
offset uint64
size int
}
func (addr tmpBlockAddr) String() string {
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
}
var (
tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
_ = metrics.NewGauge(`vm_tmp_blocks_files_directory_free_bytes`, func() float64 {
return float64(fs.MustGetFreeSpace(tmpBlocksDir))
})
)
// WriteBlockRefData writes br to tbf.
//
// It returns errors since the operation may fail on space shortage
// and this must be handled.
func (tbf *tmpBlocksFile) WriteBlockRefData(b []byte) (tmpBlockAddr, error) {
var addr tmpBlockAddr
addr.offset = tbf.offset
addr.size = len(b)
tbf.offset += uint64(addr.size)
if len(tbf.buf)+len(b) <= cap(tbf.buf) {
// Fast path - the data fits tbf.buf
tbf.buf = append(tbf.buf, b...)
return addr, nil
}
// Slow path: flush the data from tbf.buf to file.
if tbf.f == nil {
f, err := ioutil.TempFile(tmpBlocksDir, "")
if err != nil {
return addr, err
}
tbf.f = f
tmpBlocksFilesCreated.Inc()
}
_, err := tbf.f.Write(tbf.buf)
tbf.buf = append(tbf.buf[:0], b...)
if err != nil {
return addr, fmt.Errorf("cannot write block to %q: %w", tbf.f.Name(), err)
}
return addr, nil
}
func (tbf *tmpBlocksFile) Finalize() error {
if tbf.f == nil {
return nil
}
fname := tbf.f.Name()
if _, err := tbf.f.Write(tbf.buf); err != nil {
return fmt.Errorf("cannot write the remaining %d bytes to %q: %w", len(tbf.buf), fname, err)
}
tbf.buf = tbf.buf[:0]
r, err := fs.OpenReaderAt(fname)
if err != nil {
logger.Panicf("FATAL: cannot open %q: %s", fname, err)
}
// Hint the OS that the file is read almost sequentiallly.
// This should reduce the number of disk seeks, which is important
// for HDDs.
r.MustFadviseSequentialRead(true)
tbf.r = r
return nil
}
func (tbf *tmpBlocksFile) MustReadBlockRefAt(partRef storage.PartRef, addr tmpBlockAddr) storage.BlockRef {
var buf []byte
if tbf.f == nil {
buf = tbf.buf[addr.offset : addr.offset+uint64(addr.size)]
} else {
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
bb.B = bytesutil.Resize(bb.B, addr.size)
tbf.r.MustReadAt(bb.B, int64(addr.offset))
buf = bb.B
}
var br storage.BlockRef
if err := br.Init(partRef, buf); err != nil {
logger.Panicf("FATAL: cannot initialize BlockRef: %s", err)
}
return br
}
var tmpBufPool bytesutil.ByteBufferPool
func (tbf *tmpBlocksFile) MustClose() {
if tbf.f == nil {
return
}
if tbf.r != nil {
// tbf.r could be nil if Finalize wasn't called.
tbf.r.MustClose()
}
fname := tbf.f.Name()
// Remove the file at first, then close it.
// This way the OS shouldn't try to flush file contents to storage
// on close.
if err := os.Remove(fname); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
}
if err := tbf.f.Close(); err != nil {
logger.Panicf("FATAL: cannot close %q: %s", fname, err)
}
tbf.f = nil
}

10
lib/fs/fadvise_darwin.go Normal file
View file

@ -0,0 +1,10 @@
package fs
import (
"os"
)
func fadviseSequentialRead(f *os.File, prefetch bool) error {
// TODO: implement this properly
return nil
}

10
lib/fs/fadvise_openbsd.go Normal file
View file

@ -0,0 +1,10 @@
package fs
import (
"os"
)
func fadviseSequentialRead(f *os.File, prefetch bool) error {
// TODO: implement this properly
return nil
}

22
lib/fs/fadvise_unix.go Normal file
View file

@ -0,0 +1,22 @@
// +build linux freebsd
package fs
import (
"fmt"
"os"
"golang.org/x/sys/unix"
)
func fadviseSequentialRead(f *os.File, prefetch bool) error {
fd := int(f.Fd())
mode := unix.FADV_SEQUENTIAL
if prefetch {
mode |= unix.FADV_WILLNEED
}
if err := unix.Fadvise(int(fd), 0, 0, mode); err != nil {
return fmt.Errorf("error returned from unix.Fadvise(%d): %w", mode, err)
}
return nil
}

View file

@ -149,6 +149,15 @@ func (r *ReaderAt) MustClose() {
readersCount.Dec() readersCount.Dec()
} }
// MustFadviseSequentialRead hints the OS that f is read mostly sequentially.
//
// if prefetch is set, then the OS is hinted to prefetch f data.
func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
if err := fadviseSequentialRead(r.f, prefetch); err != nil {
logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.f.Name(), prefetch, err)
}
}
// OpenReaderAt opens ReaderAt for reading from filename. // OpenReaderAt opens ReaderAt for reading from filename.
// //
// MustClose must be called on the returned ReaderAt when it is no longer needed. // MustClose must be called on the returned ReaderAt when it is no longer needed.

View file

@ -30,6 +30,36 @@ func (br *BlockRef) init(p *part, bh *blockHeader) {
br.bh = *bh br.bh = *bh
} }
// Init initializes br from pr and data
func (br *BlockRef) Init(pr PartRef, data []byte) error {
br.p = pr.p
tail, err := br.bh.Unmarshal(data)
if err != nil {
return err
}
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling blockHeader; len(tail)=%d; tail=%q", len(tail), tail)
}
return nil
}
// Marshal marshals br to dst.
func (br *BlockRef) Marshal(dst []byte) []byte {
return br.bh.Marshal(dst)
}
// PartRef returns PartRef from br.
func (br *BlockRef) PartRef() PartRef {
return PartRef{
p: br.p,
}
}
// PartRef is Part reference.
type PartRef struct {
p *part
}
// MustReadBlock reads block from br to dst. // MustReadBlock reads block from br to dst.
// //
// if fetchData is false, then only block header is read, otherwise all the data is read. // if fetchData is false, then only block header is read, otherwise all the data is read.