diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 676a0fd66b..f2984830bb 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -868,6 +868,11 @@ type freeSpaceEntry struct { } func (pt *partition) mergeBigParts(isFinal bool) error { + bigMergeConcurrencyLimitCh <- struct{}{} + defer func() { + <-bigMergeConcurrencyLimitCh + }() + maxRows := maxRowsByPath(pt.bigPartsPath) pt.partsLock.Lock() @@ -880,15 +885,18 @@ func (pt *partition) mergeBigParts(isFinal bool) error { atomic.AddUint64(&pt.bigMergesCount, 1) atomic.AddUint64(&pt.activeBigMerges, 1) - bigMergeConcurrencyLimitCh <- struct{}{} err := pt.mergeParts(pws, pt.stopCh) - <-bigMergeConcurrencyLimitCh atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) return err } func (pt *partition) mergeSmallParts(isFinal bool) error { + smallMergeConcurrencyLimitCh <- struct{}{} + defer func() { + <-smallMergeConcurrencyLimitCh + }() + maxRows := maxRowsByPath(pt.smallPartsPath) if maxRows > maxRowsPerSmallPart() { // The output part may go to big part, @@ -909,9 +917,7 @@ func (pt *partition) mergeSmallParts(isFinal bool) error { atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) - smallMergeConcurrencyLimitCh <- struct{}{} err := pt.mergeParts(pws, pt.stopCh) - <-smallMergeConcurrencyLimitCh atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0)) return err