diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 158b96cd4..ef403174e 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -137,6 +137,10 @@ type Table struct { // fileParts contains file-backed parts. fileParts []*partWrapper + // This channel is used for signaling the background mergers that there are parts, + // which may need to be merged. + needMergeCh chan struct{} + snapshotLock sync.RWMutex flockF *os.File @@ -254,6 +258,14 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { tb.flushBlocksToParts(ibsToFlush, false) + if len(ibsToFlush) > 0 { + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + tb.assistedMergeForInmemoryParts() + tb.assistedMergeForFileParts() + <-flushConcurrencyCh + } + return tailItems } @@ -332,12 +344,16 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb isReadOnly: isReadOnly, fileParts: pws, mergeIdx: uint64(time.Now().UnixNano()), + needMergeCh: make(chan struct{}, 1), flockF: flockF, stopCh: make(chan struct{}), } tb.rawItems.init() tb.startBackgroundWorkers() + // Wake up a single background merger, so it could start merging parts if needed. + tb.notifyBackgroundMergers() + var m TableMetrics tb.UpdateMetrics(&m) logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d", @@ -747,13 +763,13 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { tb.partsLock.Lock() tb.inmemoryParts = append(tb.inmemoryParts, pws...) + for range pws { + if !tb.notifyBackgroundMergers() { + break + } + } tb.partsLock.Unlock() - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - if tb.flushCallback != nil { if isFinal { tb.flushCallback() @@ -763,6 +779,15 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { } } +func (tb *Table) notifyBackgroundMergers() bool { + select { + case tb.needMergeCh <- struct{}{}: + return true + default: + return false + } +} + var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) func needAssistedMerge(pws []*partWrapper, maxParts int) bool { @@ -968,16 +993,9 @@ func (tb *Table) mergeExistingParts(isFinal bool) error { return tb.mergeParts(pws, tb.stopCh, isFinal) } -const ( - minMergeSleepTime = 10 * time.Millisecond - maxMergeSleepTime = 10 * time.Second -) - func (tb *Table) mergeWorker() { - sleepTime := minMergeSleepTime var lastMergeTime uint64 isFinal := false - t := time.NewTimer(sleepTime) for { // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers // across tables may exceed the the cap(mergeWorkersLimitCh). @@ -986,7 +1004,6 @@ func (tb *Table) mergeWorker() { <-mergeWorkersLimitCh if err == nil { // Try merging additional parts. - sleepTime = minMergeSleepTime lastMergeTime = fasttime.UnixTimestamp() isFinal = false continue @@ -1007,16 +1024,11 @@ func (tb *Table) mergeWorker() { continue } - // Nothing to merge. Sleep for a while and try again. - sleepTime *= 2 - if sleepTime > maxMergeSleepTime { - sleepTime = maxMergeSleepTime - } + // Nothing to merge. Wait for the notification of new merge. select { case <-tb.stopCh: return - case <-t.C: - t.Reset(sleepTime) + case <-tb.needMergeCh: } } } @@ -1337,6 +1349,7 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } + tb.notifyBackgroundMergers() } tb.partsLock.Unlock() diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 76ccdd9cf..e2076baa4 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -154,6 +154,10 @@ type partition struct { // Contains file-based parts with big number of items. bigParts []*partWrapper + // This channel is used for signaling the background mergers that there are parts, + // which may need to be merged. + needMergeCh chan struct{} + snapshotLock sync.RWMutex stopCh chan struct{} @@ -280,6 +284,9 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, } pt.startBackgroundWorkers() + // Wake up a single background merger, so it could start merging parts if needed. + pt.notifyBackgroundMergers() + return pt, nil } @@ -291,8 +298,10 @@ func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partit s: s, - mergeIdx: uint64(time.Now().UnixNano()), - stopCh: make(chan struct{}), + mergeIdx: uint64(time.Now().UnixNano()), + needMergeCh: make(chan struct{}, cgroup.AvailableCPUs()), + + stopCh: make(chan struct{}), } p.rawRows.init() return p @@ -515,6 +524,14 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { if rrb != nil { pt.flushRowsToParts(rrb.rows) putRawRowsBlock(rrb) + + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + pt.assistedMergeForInmemoryParts() + pt.assistedMergeForSmallParts() + // There is no need in assisted merges for big parts, + // since the bottleneck is possible only at inmemory and small parts. + <-flushConcurrencyCh } return rows @@ -581,14 +598,21 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { pt.partsLock.Lock() pt.inmemoryParts = append(pt.inmemoryParts, pws...) + for range pws { + if !pt.notifyBackgroundMergers() { + break + } + } pt.partsLock.Unlock() +} - flushConcurrencyCh <- struct{}{} - pt.assistedMergeForInmemoryParts() - pt.assistedMergeForSmallParts() - <-flushConcurrencyCh - // There is no need in assisted merges for small and big parts, - // since the bottleneck is possible only at inmemory parts. +func (pt *partition) notifyBackgroundMergers() bool { + select { + case pt.needMergeCh <- struct{}{}: + return true + default: + return false + } } var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) @@ -1025,16 +1049,9 @@ func (pt *partition) startMergeWorkers() { } } -const ( - minMergeSleepTime = 10 * time.Millisecond - maxMergeSleepTime = 10 * time.Second -) - func (pt *partition) mergeWorker() { - sleepTime := minMergeSleepTime var lastMergeTime uint64 isFinal := false - t := time.NewTimer(sleepTime) for { // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers // across partitions may exceed the the cap(mergeWorkersLimitCh). @@ -1043,7 +1060,6 @@ func (pt *partition) mergeWorker() { <-mergeWorkersLimitCh if err == nil { // Try merging additional parts. - sleepTime = minMergeSleepTime lastMergeTime = fasttime.UnixTimestamp() isFinal = false continue @@ -1064,16 +1080,11 @@ func (pt *partition) mergeWorker() { continue } - // Nothing to merge. Sleep for a while and try again. - sleepTime *= 2 - if sleepTime > maxMergeSleepTime { - sleepTime = maxMergeSleepTime - } + // Nothing to merge. Wait for the notification of new merge. select { case <-pt.stopCh: return - case <-t.C: - t.Reset(sleepTime) + case <-pt.needMergeCh: } } } @@ -1565,6 +1576,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } + pt.notifyBackgroundMergers() } pt.partsLock.Unlock()