lib/{storage,mergeset}: tune the threshold for assisted merge

The https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3425#issuecomment-1359117221
reveals that CPU usage for incoming queries may significantly increase when the number
of in-memory parts becomes too big.

This commit reduces the maximum number of in-memory parts before starting the assisted merge
during data ingestion. This should reduce CPU usage for incoming queries,
since they need to inspect lower number of in-memory parts.

This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3425
This commit is contained in:
Aliaksandr Valialkin 2022-12-28 14:32:18 -08:00
parent 04d536c15a
commit 8dc04a86f6
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 25 additions and 11 deletions

View file

@ -27,7 +27,7 @@ import (
// This number may be reached when the insertion pace outreaches merger pace.
// If this number is reached, then assisted merges are performed
// during data ingestion.
const maxInmemoryParts = 64
const maxInmemoryParts = 30
// maxFileParts is the maximum number of file parts in the table.
//
@ -765,12 +765,19 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
if len(pws) < maxParts {
return false
}
return getNotInMergePartsCount(pws) >= defaultPartsToMerge
}
func (tb *Table) assistedMergeForInmemoryParts() {
for {
tb.partsLock.Lock()
ok := getNotInMergePartsCount(tb.inmemoryParts) < maxInmemoryParts
needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts)
tb.partsLock.Unlock()
if ok {
if !needMerge {
return
}
@ -792,9 +799,9 @@ func (tb *Table) assistedMergeForInmemoryParts() {
func (tb *Table) assistedMergeForFileParts() {
for {
tb.partsLock.Lock()
ok := getNotInMergePartsCount(tb.fileParts) < maxFileParts
needMerge := needAssistedMerge(tb.fileParts, maxFileParts)
tb.partsLock.Unlock()
if ok {
if !needMerge {
return
}

View file

@ -33,12 +33,12 @@ const maxBigPartSize = 1e12
// The maximum number of inmemory parts in the partition.
//
// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion.
const maxInmemoryPartsPerPartition = 32
const maxInmemoryPartsPerPartition = 20
// The maximum number of small parts in the partition.
//
// If the number of small parts reaches this value, then assisted merge runs during data ingestion.
const maxSmallPartsPerPartition = 64
const maxSmallPartsPerPartition = 30
// Default number of parts to merge at once.
//
@ -594,12 +594,19 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
if len(pws) < maxParts {
return false
}
return getNotInMergePartsCount(pws) >= defaultPartsToMerge
}
func (pt *partition) assistedMergeForInmemoryParts() {
for {
pt.partsLock.Lock()
ok := getNotInMergePartsCount(pt.inmemoryParts) < maxInmemoryPartsPerPartition
needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition)
pt.partsLock.Unlock()
if ok {
if !needMerge {
return
}
@ -624,9 +631,9 @@ func (pt *partition) assistedMergeForInmemoryParts() {
func (pt *partition) assistedMergeForSmallParts() {
for {
pt.partsLock.Lock()
ok := getNotInMergePartsCount(pt.smallParts) < maxSmallPartsPerPartition
needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition)
pt.partsLock.Unlock()
if ok {
if !needMerge {
return
}