mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: calculate the maximum number of rows per small part from -memory.allowedPercent
This should improve query speed over recent data on machines with big amounts of RAM
This commit is contained in:
parent
1402a6b981
commit
4b688fffee
3 changed files with 42 additions and 27 deletions
|
@ -10,15 +10,14 @@ import (
|
|||
|
||||
var allowedMemPercent = flag.Float64("memory.allowedPercent", 60, "Allowed percent of system memory VictoriaMetrics caches may occupy")
|
||||
|
||||
var allowedMemory int
|
||||
var (
|
||||
allowedMemory int
|
||||
remainingMemory int
|
||||
)
|
||||
|
||||
var once sync.Once
|
||||
|
||||
// Allowed returns the amount of system memory allowed to use by the app.
|
||||
//
|
||||
// The function must be called only after flag.Parse is called.
|
||||
func Allowed() int {
|
||||
once.Do(func() {
|
||||
func initOnce() {
|
||||
if !flag.Parsed() {
|
||||
// Do not use logger.Panicf here, since logger may be uninitialized yet.
|
||||
panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call"))
|
||||
|
@ -30,7 +29,22 @@ func Allowed() int {
|
|||
|
||||
mem := sysTotalMemory()
|
||||
allowedMemory = int(float64(mem) * percent)
|
||||
logger.Infof("limiting caches to %d bytes of RAM according to -memory.allowedPercent=%g", allowedMemory, *allowedMemPercent)
|
||||
})
|
||||
remainingMemory = mem - allowedMemory
|
||||
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedMemPercent)
|
||||
}
|
||||
|
||||
// Allowed returns the amount of system memory allowed to use by the app.
|
||||
//
|
||||
// The function must be called only after flag.Parse is called.
|
||||
func Allowed() int {
|
||||
once.Do(initOnce)
|
||||
return allowedMemory
|
||||
}
|
||||
|
||||
// Remaining returns the amount of memory remaining to the OS.
|
||||
//
|
||||
// This function must be called only after flag.Parse is called.
|
||||
func Remaining() int {
|
||||
once.Do(initOnce)
|
||||
return remainingMemory
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ func (p *part) MustClose() {
|
|||
p.valuesFile.MustClose()
|
||||
p.indexFile.MustClose()
|
||||
|
||||
isBig := p.ph.RowsCount > maxRowsPerSmallPart
|
||||
isBig := p.ph.RowsCount > maxRowsPerSmallPart()
|
||||
p.ibCache.Reset(isBig)
|
||||
}
|
||||
|
||||
|
|
|
@ -22,15 +22,16 @@ import (
|
|||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// The maximum number of rows in a small part.
|
||||
//
|
||||
// This number limits the maximum size of small parts storage.
|
||||
// Production simultation shows that the required size of the storage
|
||||
// may be estimated as:
|
||||
//
|
||||
// maxRowsPerSmallPart * 2 * defaultPartsToMerge * mergeWorkers
|
||||
//
|
||||
const maxRowsPerSmallPart = 300e6
|
||||
func maxRowsPerSmallPart() uint64 {
|
||||
// Small parts are cached in the OS page cache,
|
||||
// so limit the number of rows for small part
|
||||
// by the remaining free RAM.
|
||||
mem := memory.Remaining()
|
||||
if mem <= 0 {
|
||||
return 100e6
|
||||
}
|
||||
return uint64(mem) / defaultPartsToMerge
|
||||
}
|
||||
|
||||
// The maximum number of rows per big part.
|
||||
//
|
||||
|
@ -885,7 +886,7 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
|
|||
|
||||
func (pt *partition) mergeSmallParts(isFinal bool) error {
|
||||
maxRows := maxRowsByPath(pt.smallPartsPath)
|
||||
if maxRows > maxRowsPerSmallPart {
|
||||
if maxRows > maxRowsPerSmallPart() {
|
||||
// The output part may go to big part,
|
||||
// so make sure it as enough space.
|
||||
maxBigPartRows := maxRowsByPath(pt.bigPartsPath)
|
||||
|
@ -955,7 +956,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
for _, pw := range pws {
|
||||
outRowsCount += pw.p.ph.RowsCount
|
||||
}
|
||||
isBigPart := outRowsCount > maxRowsPerSmallPart
|
||||
isBigPart := outRowsCount > maxRowsPerSmallPart()
|
||||
nocache := isBigPart
|
||||
|
||||
// Prepare BlockStreamWriter for destination part.
|
||||
|
|
Loading…
Reference in a new issue