mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/mergeset: dynamically calculate the maximum number of items per part, which can be cached in OS page cache
This commit is contained in:
parent
dc6e4151b0
commit
568ff61dcf
2 changed files with 25 additions and 10 deletions
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
)
|
||||
|
||||
|
@ -49,7 +50,19 @@ const maxItemsPerPart = 100e9
|
|||
//
|
||||
// Such parts are usually frequently accessed, so it is good to cache their
|
||||
// contents in OS page cache.
|
||||
const maxItemsPerCachedPart = 100e6
|
||||
func maxItemsPerCachedPart() uint64 {
|
||||
mem := memory.Remaining()
|
||||
// Production data shows that each item occupies ~4 bytes in the compressed part.
|
||||
// It is expected no more than defaultPartsToMerge/2 parts exist
|
||||
// in the OS page cache before they are merged into bigger part.
|
||||
// Halft of the remaining RAM must be left for lib/storage parts,
|
||||
// so the maxItems is calculated using the below code:
|
||||
maxItems := uint64(mem) / (4 * defaultPartsToMerge)
|
||||
if maxItems < 1e6 {
|
||||
maxItems = 1e6
|
||||
}
|
||||
return maxItems
|
||||
}
|
||||
|
||||
// The interval for flushing (converting) recent raw items into parts,
|
||||
// so they become visible to search.
|
||||
|
@ -670,13 +683,10 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||
outItemsCount += pw.p.ph.itemsCount
|
||||
}
|
||||
nocache := true
|
||||
if outItemsCount < maxItemsPerCachedPart {
|
||||
if outItemsCount < maxItemsPerCachedPart() {
|
||||
// Cache small (i.e. recent) output parts in OS file cache,
|
||||
// since there is high chance they will be read soon.
|
||||
nocache = false
|
||||
|
||||
// Do not interrupt small merges.
|
||||
stopCh = nil
|
||||
}
|
||||
|
||||
// Prepare blockStreamWriter for destination part.
|
||||
|
|
|
@ -23,13 +23,18 @@ import (
|
|||
|
||||
func maxRowsPerSmallPart() uint64 {
|
||||
// Small parts are cached in the OS page cache,
|
||||
// so limit the number of rows for small part
|
||||
// by the remaining free RAM.
|
||||
// so limit the number of rows for small part by the remaining free RAM.
|
||||
mem := memory.Remaining()
|
||||
if mem <= 0 {
|
||||
return 100e6
|
||||
// Production data shows that each row occupies ~1 byte in the compressed part.
|
||||
// It is expected no more than defaultPartsToMerge/2 parts exist
|
||||
// in the OS page cache before they are merged into bigger part.
|
||||
// Halft of the remaining RAM must be left for lib/mergeset parts,
|
||||
// so the maxItems is calculated using the below code:
|
||||
maxRows := uint64(mem) / defaultPartsToMerge
|
||||
if maxRows < 10e6 {
|
||||
maxRows = 10e6
|
||||
}
|
||||
return uint64(mem) / defaultPartsToMerge
|
||||
return maxRows
|
||||
}
|
||||
|
||||
// The maximum number of rows per big part.
|
||||
|
|
Loading…
Reference in a new issue