diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 4df0b82d1e..2c2eb1166a 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" ) @@ -49,7 +50,19 @@ const maxItemsPerPart = 100e9 // // Such parts are usually frequently accessed, so it is good to cache their // contents in OS page cache. -const maxItemsPerCachedPart = 100e6 +func maxItemsPerCachedPart() uint64 { + mem := memory.Remaining() + // Production data shows that each item occupies ~4 bytes in the compressed part. + // It is expected no more than defaultPartsToMerge/2 parts exist + // in the OS page cache before they are merged into bigger part. + // Halft of the remaining RAM must be left for lib/storage parts, + // so the maxItems is calculated using the below code: + maxItems := uint64(mem) / (4 * defaultPartsToMerge) + if maxItems < 1e6 { + maxItems = 1e6 + } + return maxItems +} // The interval for flushing (converting) recent raw items into parts, // so they become visible to search. @@ -670,13 +683,10 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP outItemsCount += pw.p.ph.itemsCount } nocache := true - if outItemsCount < maxItemsPerCachedPart { + if outItemsCount < maxItemsPerCachedPart() { // Cache small (i.e. recent) output parts in OS file cache, // since there is high chance they will be read soon. nocache = false - - // Do not interrupt small merges. - stopCh = nil } // Prepare blockStreamWriter for destination part. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 6f53fc2f4a..3f77782a37 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -23,13 +23,18 @@ import ( func maxRowsPerSmallPart() uint64 { // Small parts are cached in the OS page cache, - // so limit the number of rows for small part - // by the remaining free RAM. + // so limit the number of rows for small part by the remaining free RAM. mem := memory.Remaining() - if mem <= 0 { - return 100e6 + // Production data shows that each row occupies ~1 byte in the compressed part. + // It is expected no more than defaultPartsToMerge/2 parts exist + // in the OS page cache before they are merged into bigger part. + // Halft of the remaining RAM must be left for lib/mergeset parts, + // so the maxItems is calculated using the below code: + maxRows := uint64(mem) / defaultPartsToMerge + if maxRows < 10e6 { + maxRows = 10e6 } - return uint64(mem) / defaultPartsToMerge + return maxRows } // The maximum number of rows per big part.