mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/{storage,mergeset}: wake up background merges as soon as there is a potential work for them
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647
This commit is contained in:
parent
70b5a6fb28
commit
c5e858461c
2 changed files with 53 additions and 32 deletions
|
@ -137,6 +137,10 @@ type Table struct {
|
|||
// fileParts contains file-backed parts.
|
||||
fileParts []*partWrapper
|
||||
|
||||
// This channel is used for signaling the background mergers that there are parts,
|
||||
// which may need to be merged.
|
||||
needMergeCh chan struct{}
|
||||
|
||||
snapshotLock sync.RWMutex
|
||||
|
||||
flockF *os.File
|
||||
|
@ -340,12 +344,16 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
|
|||
isReadOnly: isReadOnly,
|
||||
fileParts: pws,
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
needMergeCh: make(chan struct{}, 1),
|
||||
flockF: flockF,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
tb.rawItems.init()
|
||||
tb.startBackgroundWorkers()
|
||||
|
||||
// Wake up a single background merger, so it could start merging parts if needed.
|
||||
tb.notifyBackgroundMergers()
|
||||
|
||||
var m TableMetrics
|
||||
tb.UpdateMetrics(&m)
|
||||
logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d",
|
||||
|
@ -755,6 +763,11 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
|
|||
|
||||
tb.partsLock.Lock()
|
||||
tb.inmemoryParts = append(tb.inmemoryParts, pws...)
|
||||
for range pws {
|
||||
if !tb.notifyBackgroundMergers() {
|
||||
break
|
||||
}
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
|
||||
if tb.flushCallback != nil {
|
||||
|
@ -766,6 +779,15 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (tb *Table) notifyBackgroundMergers() bool {
|
||||
select {
|
||||
case tb.needMergeCh <- struct{}{}:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
|
||||
|
@ -971,16 +993,9 @@ func (tb *Table) mergeExistingParts(isFinal bool) error {
|
|||
return tb.mergeParts(pws, tb.stopCh, isFinal)
|
||||
}
|
||||
|
||||
const (
|
||||
minMergeSleepTime = 10 * time.Millisecond
|
||||
maxMergeSleepTime = 10 * time.Second
|
||||
)
|
||||
|
||||
func (tb *Table) mergeWorker() {
|
||||
sleepTime := minMergeSleepTime
|
||||
var lastMergeTime uint64
|
||||
isFinal := false
|
||||
t := time.NewTimer(sleepTime)
|
||||
for {
|
||||
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
|
||||
// across tables may exceed the the cap(mergeWorkersLimitCh).
|
||||
|
@ -989,7 +1004,6 @@ func (tb *Table) mergeWorker() {
|
|||
<-mergeWorkersLimitCh
|
||||
if err == nil {
|
||||
// Try merging additional parts.
|
||||
sleepTime = minMergeSleepTime
|
||||
lastMergeTime = fasttime.UnixTimestamp()
|
||||
isFinal = false
|
||||
continue
|
||||
|
@ -1010,16 +1024,11 @@ func (tb *Table) mergeWorker() {
|
|||
continue
|
||||
}
|
||||
|
||||
// Nothing to merge. Sleep for a while and try again.
|
||||
sleepTime *= 2
|
||||
if sleepTime > maxMergeSleepTime {
|
||||
sleepTime = maxMergeSleepTime
|
||||
}
|
||||
// Nothing to merge. Wait for the notification of new merge.
|
||||
select {
|
||||
case <-tb.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
t.Reset(sleepTime)
|
||||
case <-tb.needMergeCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1340,6 +1349,7 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst
|
|||
default:
|
||||
logger.Panicf("BUG: unknown partType=%d", dstPartType)
|
||||
}
|
||||
tb.notifyBackgroundMergers()
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
|
||||
|
|
|
@ -154,6 +154,10 @@ type partition struct {
|
|||
// Contains file-based parts with big number of items.
|
||||
bigParts []*partWrapper
|
||||
|
||||
// This channel is used for signaling the background mergers that there are parts,
|
||||
// which may need to be merged.
|
||||
needMergeCh chan struct{}
|
||||
|
||||
snapshotLock sync.RWMutex
|
||||
|
||||
stopCh chan struct{}
|
||||
|
@ -280,6 +284,9 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
|
|||
}
|
||||
pt.startBackgroundWorkers()
|
||||
|
||||
// Wake up a single background merger, so it could start merging parts if needed.
|
||||
pt.notifyBackgroundMergers()
|
||||
|
||||
return pt, nil
|
||||
}
|
||||
|
||||
|
@ -292,6 +299,8 @@ func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partit
|
|||
s: s,
|
||||
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
needMergeCh: make(chan struct{}, cgroup.AvailableCPUs()),
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
p.rawRows.init()
|
||||
|
@ -589,9 +598,23 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
|
|||
|
||||
pt.partsLock.Lock()
|
||||
pt.inmemoryParts = append(pt.inmemoryParts, pws...)
|
||||
for range pws {
|
||||
if !pt.notifyBackgroundMergers() {
|
||||
break
|
||||
}
|
||||
}
|
||||
pt.partsLock.Unlock()
|
||||
}
|
||||
|
||||
func (pt *partition) notifyBackgroundMergers() bool {
|
||||
select {
|
||||
case pt.needMergeCh <- struct{}{}:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
|
||||
|
@ -1026,16 +1049,9 @@ func (pt *partition) startMergeWorkers() {
|
|||
}
|
||||
}
|
||||
|
||||
const (
|
||||
minMergeSleepTime = 10 * time.Millisecond
|
||||
maxMergeSleepTime = 10 * time.Second
|
||||
)
|
||||
|
||||
func (pt *partition) mergeWorker() {
|
||||
sleepTime := minMergeSleepTime
|
||||
var lastMergeTime uint64
|
||||
isFinal := false
|
||||
t := time.NewTimer(sleepTime)
|
||||
for {
|
||||
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
|
||||
// across partitions may exceed the the cap(mergeWorkersLimitCh).
|
||||
|
@ -1044,7 +1060,6 @@ func (pt *partition) mergeWorker() {
|
|||
<-mergeWorkersLimitCh
|
||||
if err == nil {
|
||||
// Try merging additional parts.
|
||||
sleepTime = minMergeSleepTime
|
||||
lastMergeTime = fasttime.UnixTimestamp()
|
||||
isFinal = false
|
||||
continue
|
||||
|
@ -1065,16 +1080,11 @@ func (pt *partition) mergeWorker() {
|
|||
continue
|
||||
}
|
||||
|
||||
// Nothing to merge. Sleep for a while and try again.
|
||||
sleepTime *= 2
|
||||
if sleepTime > maxMergeSleepTime {
|
||||
sleepTime = maxMergeSleepTime
|
||||
}
|
||||
// Nothing to merge. Wait for the notification of new merge.
|
||||
select {
|
||||
case <-pt.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
t.Reset(sleepTime)
|
||||
case <-pt.needMergeCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1566,6 +1576,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper,
|
|||
default:
|
||||
logger.Panicf("BUG: unknown partType=%d", dstPartType)
|
||||
}
|
||||
pt.notifyBackgroundMergers()
|
||||
}
|
||||
pt.partsLock.Unlock()
|
||||
|
||||
|
|
Loading…
Reference in a new issue