From 3ca6fea8580dfdae0dca754abb530bf11bed77b7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 1 Oct 2023 22:17:38 +0200 Subject: [PATCH] lib/{mergeset,storage}: perform at most one assisted merge per each call to addRows/addItems This should reduce tail latency during data ingestion. This shouldn't slow down data ingestion in the worst case, since assisted merges are spread among distinct addRows/addItems calls after this change. --- lib/mergeset/table.go | 64 +++++++++++++++++++--------------------- lib/storage/partition.go | 64 +++++++++++++++++++--------------------- 2 files changed, 60 insertions(+), 68 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index bf7274a8ee..e6f840571d 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -773,45 +773,41 @@ func needAssistedMerge(pws []*partWrapper, maxParts int) bool { } func (tb *Table) assistedMergeForInmemoryParts() { - for { - tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) - err := tb.mergeInmemoryParts() - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err) + tb.partsLock.Lock() + needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) + tb.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) + err := tb.mergeInmemoryParts() + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { + return + } + logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err) } func (tb *Table) assistedMergeForFileParts() { - for { - tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.fileParts, maxFileParts) - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.fileAssistedMerges, 1) - err := tb.mergeExistingParts(false) - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) + tb.partsLock.Lock() + needMerge := needAssistedMerge(tb.fileParts, maxFileParts) + tb.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&tb.fileAssistedMerges, 1) + err := tb.mergeExistingParts(false) + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { + return + } + logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) } func getNotInMergePartsCount(pws []*partWrapper) int { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index c9d7860c12..d0c0a29ef6 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -634,45 +634,41 @@ func needAssistedMerge(pws []*partWrapper, maxParts int) bool { } func (pt *partition) assistedMergeForInmemoryParts() { - for { - pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition) - pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) - err := pt.mergeInmemoryParts() - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot merge inmemory parts: %s", err) + pt.partsLock.Lock() + needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition) + pt.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) + err := pt.mergeInmemoryParts() + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { + return + } + logger.Panicf("FATAL: cannot merge inmemory parts: %s", err) } func (pt *partition) assistedMergeForSmallParts() { - for { - pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition) - pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.smallAssistedMerges, 1) - err := pt.mergeExistingParts(false) - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot merge small parts: %s", err) + pt.partsLock.Lock() + needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition) + pt.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&pt.smallAssistedMerges, 1) + err := pt.mergeExistingParts(false) + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { + return + } + logger.Panicf("FATAL: cannot merge small parts: %s", err) } func getNotInMergePartsCount(pws []*partWrapper) int {