diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index bf7274a8e..e6f840571 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -773,45 +773,41 @@ func needAssistedMerge(pws []*partWrapper, maxParts int) bool { } func (tb *Table) assistedMergeForInmemoryParts() { - for { - tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) - err := tb.mergeInmemoryParts() - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err) + tb.partsLock.Lock() + needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) + tb.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) + err := tb.mergeInmemoryParts() + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { + return + } + logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err) } func (tb *Table) assistedMergeForFileParts() { - for { - tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.fileParts, maxFileParts) - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.fileAssistedMerges, 1) - err := tb.mergeExistingParts(false) - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) + tb.partsLock.Lock() + needMerge := needAssistedMerge(tb.fileParts, maxFileParts) + tb.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&tb.fileAssistedMerges, 1) + err := tb.mergeExistingParts(false) + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { + return + } + logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) } func getNotInMergePartsCount(pws []*partWrapper) int { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index c9d7860c1..d0c0a29ef 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -634,45 +634,41 @@ func needAssistedMerge(pws []*partWrapper, maxParts int) bool { } func (pt *partition) assistedMergeForInmemoryParts() { - for { - pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition) - pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) - err := pt.mergeInmemoryParts() - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot merge inmemory parts: %s", err) + pt.partsLock.Lock() + needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition) + pt.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) + err := pt.mergeInmemoryParts() + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { + return + } + logger.Panicf("FATAL: cannot merge inmemory parts: %s", err) } func (pt *partition) assistedMergeForSmallParts() { - for { - pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition) - pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.smallAssistedMerges, 1) - err := pt.mergeExistingParts(false) - if err == nil { - continue - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot merge small parts: %s", err) + pt.partsLock.Lock() + needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition) + pt.partsLock.Unlock() + if !needMerge { + return } + + atomic.AddUint64(&pt.smallAssistedMerges, 1) + err := pt.mergeExistingParts(false) + if err == nil { + return + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { + return + } + logger.Panicf("FATAL: cannot merge small parts: %s", err) } func getNotInMergePartsCount(pws []*partWrapper) int {