diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 158b96cd4c..4d0923981d 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -254,6 +254,14 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { tb.flushBlocksToParts(ibsToFlush, false) + if len(ibsToFlush) > 0 { + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + tb.assistedMergeForInmemoryParts() + tb.assistedMergeForFileParts() + <-flushConcurrencyCh + } + return tailItems } @@ -749,11 +757,6 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { tb.inmemoryParts = append(tb.inmemoryParts, pws...) tb.partsLock.Unlock() - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - if tb.flushCallback != nil { if isFinal { tb.flushCallback() diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 65616ee6b6..22815a77d7 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -515,6 +515,14 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { if rrb != nil { pt.flushRowsToParts(rrb.rows) putRawRowsBlock(rrb) + + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + pt.assistedMergeForInmemoryParts() + pt.assistedMergeForSmallParts() + // There is no need in assisted merges for big parts, + // since the bottleneck is possible only at inmemory and small parts. + <-flushConcurrencyCh } return rows @@ -582,13 +590,6 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { pt.partsLock.Lock() pt.inmemoryParts = append(pt.inmemoryParts, pws...) pt.partsLock.Unlock() - - flushConcurrencyCh <- struct{}{} - pt.assistedMergeForInmemoryParts() - pt.assistedMergeForSmallParts() - <-flushConcurrencyCh - // There is no need in assisted merges for small and big parts, - // since the bottleneck is possible only at inmemory parts. } var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())