lib/{storage,mergeset}: use a single sync.WaitGroup for all background workers

This simplifies the code
This commit is contained in:
Aliaksandr Valialkin 2022-12-03 23:03:05 -08:00
parent 28e6d9e1ff
commit cb44976716
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 22 additions and 56 deletions

View file

@ -107,10 +107,7 @@ type Table struct {
stopCh chan struct{}
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
partMergersWG syncwg.WaitGroup
rawItemsFlusherWG sync.WaitGroup
wg sync.WaitGroup
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
rawItemsPendingFlushesWG syncwg.WaitGroup
@ -332,15 +329,10 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
func (tb *Table) MustClose() {
close(tb.stopCh)
logger.Infof("waiting for raw items flusher to stop on %q...", tb.path)
logger.Infof("waiting for background workers to stop on %q...", tb.path)
startTime := time.Now()
tb.rawItemsFlusherWG.Wait()
logger.Infof("raw items flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path)
logger.Infof("waiting for part mergers to stop on %q...", tb.path)
startTime = time.Now()
tb.partMergersWG.Wait()
logger.Infof("part mergers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path)
tb.wg.Wait()
logger.Infof("background workers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path)
logger.Infof("flushing inmemory parts to files on %q...", tb.path)
startTime = time.Now()
@ -500,10 +492,10 @@ func (tb *Table) putParts(pws []*partWrapper) {
}
func (tb *Table) startRawItemsFlusher() {
tb.rawItemsFlusherWG.Add(1)
tb.wg.Add(1)
go func() {
tb.rawItemsFlusher()
tb.rawItemsFlusherWG.Done()
tb.wg.Done()
}()
}
@ -592,8 +584,6 @@ func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) {
if len(ibs) == 0 {
return
}
tb.partMergersWG.Add(1)
defer tb.partMergersWG.Done()
pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)
var pwsLock sync.Mutex
@ -720,12 +710,12 @@ func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
func (tb *Table) startPartMergers() {
for i := 0; i < mergeWorkersCount; i++ {
tb.partMergersWG.Add(1)
tb.wg.Add(1)
go func() {
if err := tb.partMerger(); err != nil {
logger.Panicf("FATAL: unrecoverable error when merging parts in %q: %s", tb.path, err)
}
tb.partMergersWG.Done()
tb.wg.Done()
}()
}
}

View file

@ -145,11 +145,7 @@ type partition struct {
stopCh chan struct{}
smallPartsMergerWG sync.WaitGroup
bigPartsMergerWG sync.WaitGroup
rawRowsFlusherWG sync.WaitGroup
inmemoryPartsFlusherWG sync.WaitGroup
stalePartsRemoverWG sync.WaitGroup
wg sync.WaitGroup
}
// partWrapper is a wrapper for the part.
@ -620,30 +616,10 @@ func (pt *partition) MustClose() {
// Wait until all the pending transaction deletions are finished.
pendingTxnDeletionsWG.Wait()
logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath)
logger.Infof("waiting for service workers to stop on %q...", pt.smallPartsPath)
startTime := time.Now()
pt.stalePartsRemoverWG.Wait()
logger.Infof("stale parts remover stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath)
startTime = time.Now()
pt.inmemoryPartsFlusherWG.Wait()
logger.Infof("inmemory parts flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
logger.Infof("waiting for raw rows flusher to stop on %q...", pt.smallPartsPath)
startTime = time.Now()
pt.rawRowsFlusherWG.Wait()
logger.Infof("raw rows flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
logger.Infof("waiting for small part mergers to stop on %q...", pt.smallPartsPath)
startTime = time.Now()
pt.smallPartsMergerWG.Wait()
logger.Infof("small part mergers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
logger.Infof("waiting for big part mergers to stop on %q...", pt.bigPartsPath)
startTime = time.Now()
pt.bigPartsMergerWG.Wait()
logger.Infof("big part mergers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.bigPartsPath)
pt.wg.Wait()
logger.Infof("service workers stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
logger.Infof("flushing inmemory parts to files on %q...", pt.smallPartsPath)
startTime = time.Now()
@ -695,10 +671,10 @@ func (pt *partition) MustClose() {
}
func (pt *partition) startRawRowsFlusher() {
pt.rawRowsFlusherWG.Add(1)
pt.wg.Add(1)
go func() {
pt.rawRowsFlusher()
pt.rawRowsFlusherWG.Done()
pt.wg.Done()
}()
}
@ -748,10 +724,10 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFin
}
func (pt *partition) startInmemoryPartsFlusher() {
pt.inmemoryPartsFlusherWG.Add(1)
pt.wg.Add(1)
go func() {
pt.inmemoryPartsFlusher()
pt.inmemoryPartsFlusherWG.Done()
pt.wg.Done()
}()
}
@ -909,17 +885,17 @@ func SetSmallMergeWorkersCount(n int) {
func (pt *partition) startMergeWorkers() {
for i := 0; i < smallMergeWorkersCount; i++ {
pt.smallPartsMergerWG.Add(1)
pt.wg.Add(1)
go func() {
pt.smallPartsMerger()
pt.smallPartsMergerWG.Done()
pt.wg.Done()
}()
}
for i := 0; i < bigMergeWorkersCount; i++ {
pt.bigPartsMergerWG.Add(1)
pt.wg.Add(1)
go func() {
pt.bigPartsMerger()
pt.bigPartsMergerWG.Done()
pt.wg.Done()
}()
}
}
@ -1346,10 +1322,10 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
}
func (pt *partition) startStalePartsRemover() {
pt.stalePartsRemoverWG.Add(1)
pt.wg.Add(1)
go func() {
pt.stalePartsRemover()
pt.stalePartsRemoverWG.Done()
pt.wg.Done()
}()
}