From cb449767162a5b1ed1bf19ee58d3419236d75188 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 3 Dec 2022 23:03:05 -0800 Subject: [PATCH] lib/{storage,mergeset}: use a single sync.WaitGroup for all background workers This simplifies the code --- lib/mergeset/table.go | 26 +++++++------------- lib/storage/partition.go | 52 +++++++++++----------------------------- 2 files changed, 22 insertions(+), 56 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 25fc20cc41..f0203eabd2 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -107,10 +107,7 @@ type Table struct { stopCh chan struct{} - // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. - partMergersWG syncwg.WaitGroup - - rawItemsFlusherWG sync.WaitGroup + wg sync.WaitGroup // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. rawItemsPendingFlushesWG syncwg.WaitGroup @@ -332,15 +329,10 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb func (tb *Table) MustClose() { close(tb.stopCh) - logger.Infof("waiting for raw items flusher to stop on %q...", tb.path) + logger.Infof("waiting for background workers to stop on %q...", tb.path) startTime := time.Now() - tb.rawItemsFlusherWG.Wait() - logger.Infof("raw items flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path) - - logger.Infof("waiting for part mergers to stop on %q...", tb.path) - startTime = time.Now() - tb.partMergersWG.Wait() - logger.Infof("part mergers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path) + tb.wg.Wait() + logger.Infof("background workers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path) logger.Infof("flushing inmemory parts to files on %q...", tb.path) startTime = time.Now() @@ -500,10 +492,10 @@ func (tb *Table) putParts(pws []*partWrapper) { } func (tb *Table) startRawItemsFlusher() { - tb.rawItemsFlusherWG.Add(1) + tb.wg.Add(1) go func() { tb.rawItemsFlusher() - tb.rawItemsFlusherWG.Done() + tb.wg.Done() }() } @@ -592,8 +584,6 @@ func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) { if len(ibs) == 0 { return } - tb.partMergersWG.Add(1) - defer tb.partMergersWG.Done() pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge) var pwsLock sync.Mutex @@ -720,12 +710,12 @@ func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper { func (tb *Table) startPartMergers() { for i := 0; i < mergeWorkersCount; i++ { - tb.partMergersWG.Add(1) + tb.wg.Add(1) go func() { if err := tb.partMerger(); err != nil { logger.Panicf("FATAL: unrecoverable error when merging parts in %q: %s", tb.path, err) } - tb.partMergersWG.Done() + tb.wg.Done() }() } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index d73a69837d..57354802b9 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -145,11 +145,7 @@ type partition struct { stopCh chan struct{} - smallPartsMergerWG sync.WaitGroup - bigPartsMergerWG sync.WaitGroup - rawRowsFlusherWG sync.WaitGroup - inmemoryPartsFlusherWG sync.WaitGroup - stalePartsRemoverWG sync.WaitGroup + wg sync.WaitGroup } // partWrapper is a wrapper for the part. @@ -620,30 +616,10 @@ func (pt *partition) MustClose() { // Wait until all the pending transaction deletions are finished. pendingTxnDeletionsWG.Wait() - logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath) + logger.Infof("waiting for service workers to stop on %q...", pt.smallPartsPath) startTime := time.Now() - pt.stalePartsRemoverWG.Wait() - logger.Infof("stale parts remover stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) - - logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath) - startTime = time.Now() - pt.inmemoryPartsFlusherWG.Wait() - logger.Infof("inmemory parts flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) - - logger.Infof("waiting for raw rows flusher to stop on %q...", pt.smallPartsPath) - startTime = time.Now() - pt.rawRowsFlusherWG.Wait() - logger.Infof("raw rows flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) - - logger.Infof("waiting for small part mergers to stop on %q...", pt.smallPartsPath) - startTime = time.Now() - pt.smallPartsMergerWG.Wait() - logger.Infof("small part mergers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) - - logger.Infof("waiting for big part mergers to stop on %q...", pt.bigPartsPath) - startTime = time.Now() - pt.bigPartsMergerWG.Wait() - logger.Infof("big part mergers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.bigPartsPath) + pt.wg.Wait() + logger.Infof("service workers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) logger.Infof("flushing inmemory parts to files on %q...", pt.smallPartsPath) startTime = time.Now() @@ -695,10 +671,10 @@ func (pt *partition) MustClose() { } func (pt *partition) startRawRowsFlusher() { - pt.rawRowsFlusherWG.Add(1) + pt.wg.Add(1) go func() { pt.rawRowsFlusher() - pt.rawRowsFlusherWG.Done() + pt.wg.Done() }() } @@ -748,10 +724,10 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFin } func (pt *partition) startInmemoryPartsFlusher() { - pt.inmemoryPartsFlusherWG.Add(1) + pt.wg.Add(1) go func() { pt.inmemoryPartsFlusher() - pt.inmemoryPartsFlusherWG.Done() + pt.wg.Done() }() } @@ -909,17 +885,17 @@ func SetSmallMergeWorkersCount(n int) { func (pt *partition) startMergeWorkers() { for i := 0; i < smallMergeWorkersCount; i++ { - pt.smallPartsMergerWG.Add(1) + pt.wg.Add(1) go func() { pt.smallPartsMerger() - pt.smallPartsMergerWG.Done() + pt.wg.Done() }() } for i := 0; i < bigMergeWorkersCount; i++ { - pt.bigPartsMergerWG.Add(1) + pt.wg.Add(1) go func() { pt.bigPartsMerger() - pt.bigPartsMergerWG.Done() + pt.wg.Done() }() } } @@ -1346,10 +1322,10 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig } func (pt *partition) startStalePartsRemover() { - pt.stalePartsRemoverWG.Add(1) + pt.wg.Add(1) go func() { pt.stalePartsRemover() - pt.stalePartsRemoverWG.Done() + pt.wg.Done() }() }