lib/{mergeset,storage}: add start background workers via startBackgroundWorkers() function

This commit is contained in:
Aliaksandr Valialkin 2022-12-04 00:01:04 -08:00
parent 33dda2809b
commit 544ea89f91
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 15 additions and 10 deletions

View file

@ -291,8 +291,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
stopCh: make(chan struct{}),
}
tb.rawItems.init()
tb.startPartMergers()
tb.startRawItemsFlusher()
tb.startBackgroundWorkers()
var m TableMetrics
tb.UpdateMetrics(&m)
@ -323,6 +322,11 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
return tb, nil
}
func (tb *Table) startBackgroundWorkers() {
tb.startPartMergers()
tb.startRawItemsFlusher()
}
// MustClose closes the table.
func (tb *Table) MustClose() {
close(tb.stopCh)

View file

@ -204,16 +204,20 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
pt.tr.fromPartitionTimestamp(timestamp)
pt.startMergeWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
pt.startBackgroundWorkers()
logger.Infof("partition %q has been created", name)
return pt, nil
}
func (pt *partition) startBackgroundWorkers() {
pt.startMergeWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
}
// Drop drops all the data on the storage for the given pt.
//
// The pt must be detached from table before calling pt.Drop.
@ -258,10 +262,7 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
if err := pt.tr.fromPartitionName(name); err != nil {
return nil, fmt.Errorf("cannot obtain partition time range from smallPartsPath %q: %w", smallPartsPath, err)
}
pt.startMergeWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
pt.startBackgroundWorkers()
return pt, nil
}