From a27e034a40165d4938f4ad1a9358dadc9e15b233 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 30 Oct 2019 00:07:31 +0200 Subject: [PATCH] lib/storage: get parts to merge after applying the limit on the number of concurrent merges This should reduce write amplification under high ingestion rate. --- lib/storage/partition.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 676a0fd66b..f2984830bb 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -868,6 +868,11 @@ type freeSpaceEntry struct { } func (pt *partition) mergeBigParts(isFinal bool) error { + bigMergeConcurrencyLimitCh <- struct{}{} + defer func() { + <-bigMergeConcurrencyLimitCh + }() + maxRows := maxRowsByPath(pt.bigPartsPath) pt.partsLock.Lock() @@ -880,15 +885,18 @@ func (pt *partition) mergeBigParts(isFinal bool) error { atomic.AddUint64(&pt.bigMergesCount, 1) atomic.AddUint64(&pt.activeBigMerges, 1) - bigMergeConcurrencyLimitCh <- struct{}{} err := pt.mergeParts(pws, pt.stopCh) - <-bigMergeConcurrencyLimitCh atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) return err } func (pt *partition) mergeSmallParts(isFinal bool) error { + smallMergeConcurrencyLimitCh <- struct{}{} + defer func() { + <-smallMergeConcurrencyLimitCh + }() + maxRows := maxRowsByPath(pt.smallPartsPath) if maxRows > maxRowsPerSmallPart() { // The output part may go to big part, @@ -909,9 +917,7 @@ func (pt *partition) mergeSmallParts(isFinal bool) error { atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) - smallMergeConcurrencyLimitCh <- struct{}{} err := pt.mergeParts(pws, pt.stopCh) - <-smallMergeConcurrencyLimitCh atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0)) return err