Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2023-01-18 01:14:19 -08:00
commit 30453af768
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 68 additions and 43 deletions

View file

@ -137,6 +137,10 @@ type Table struct {
// fileParts contains file-backed parts. // fileParts contains file-backed parts.
fileParts []*partWrapper fileParts []*partWrapper
// This channel is used for signaling the background mergers that there are parts,
// which may need to be merged.
needMergeCh chan struct{}
snapshotLock sync.RWMutex snapshotLock sync.RWMutex
flockF *os.File flockF *os.File
@ -254,6 +258,14 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
tb.flushBlocksToParts(ibsToFlush, false) tb.flushBlocksToParts(ibsToFlush, false)
if len(ibsToFlush) > 0 {
// Run assisted merges if needed.
flushConcurrencyCh <- struct{}{}
tb.assistedMergeForInmemoryParts()
tb.assistedMergeForFileParts()
<-flushConcurrencyCh
}
return tailItems return tailItems
} }
@ -332,12 +344,16 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
isReadOnly: isReadOnly, isReadOnly: isReadOnly,
fileParts: pws, fileParts: pws,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
needMergeCh: make(chan struct{}, 1),
flockF: flockF, flockF: flockF,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
tb.rawItems.init() tb.rawItems.init()
tb.startBackgroundWorkers() tb.startBackgroundWorkers()
// Wake up a single background merger, so it could start merging parts if needed.
tb.notifyBackgroundMergers()
var m TableMetrics var m TableMetrics
tb.UpdateMetrics(&m) tb.UpdateMetrics(&m)
logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d", logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d",
@ -747,13 +763,13 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
tb.partsLock.Lock() tb.partsLock.Lock()
tb.inmemoryParts = append(tb.inmemoryParts, pws...) tb.inmemoryParts = append(tb.inmemoryParts, pws...)
for range pws {
if !tb.notifyBackgroundMergers() {
break
}
}
tb.partsLock.Unlock() tb.partsLock.Unlock()
flushConcurrencyCh <- struct{}{}
tb.assistedMergeForInmemoryParts()
tb.assistedMergeForFileParts()
<-flushConcurrencyCh
if tb.flushCallback != nil { if tb.flushCallback != nil {
if isFinal { if isFinal {
tb.flushCallback() tb.flushCallback()
@ -763,6 +779,15 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
} }
} }
func (tb *Table) notifyBackgroundMergers() bool {
select {
case tb.needMergeCh <- struct{}{}:
return true
default:
return false
}
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func needAssistedMerge(pws []*partWrapper, maxParts int) bool { func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
@ -968,16 +993,9 @@ func (tb *Table) mergeExistingParts(isFinal bool) error {
return tb.mergeParts(pws, tb.stopCh, isFinal) return tb.mergeParts(pws, tb.stopCh, isFinal)
} }
const (
minMergeSleepTime = 10 * time.Millisecond
maxMergeSleepTime = 10 * time.Second
)
func (tb *Table) mergeWorker() { func (tb *Table) mergeWorker() {
sleepTime := minMergeSleepTime
var lastMergeTime uint64 var lastMergeTime uint64
isFinal := false isFinal := false
t := time.NewTimer(sleepTime)
for { for {
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
// across tables may exceed the the cap(mergeWorkersLimitCh). // across tables may exceed the the cap(mergeWorkersLimitCh).
@ -986,7 +1004,6 @@ func (tb *Table) mergeWorker() {
<-mergeWorkersLimitCh <-mergeWorkersLimitCh
if err == nil { if err == nil {
// Try merging additional parts. // Try merging additional parts.
sleepTime = minMergeSleepTime
lastMergeTime = fasttime.UnixTimestamp() lastMergeTime = fasttime.UnixTimestamp()
isFinal = false isFinal = false
continue continue
@ -1007,16 +1024,11 @@ func (tb *Table) mergeWorker() {
continue continue
} }
// Nothing to merge. Sleep for a while and try again. // Nothing to merge. Wait for the notification of new merge.
sleepTime *= 2
if sleepTime > maxMergeSleepTime {
sleepTime = maxMergeSleepTime
}
select { select {
case <-tb.stopCh: case <-tb.stopCh:
return return
case <-t.C: case <-tb.needMergeCh:
t.Reset(sleepTime)
} }
} }
} }
@ -1337,6 +1349,7 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst
default: default:
logger.Panicf("BUG: unknown partType=%d", dstPartType) logger.Panicf("BUG: unknown partType=%d", dstPartType)
} }
tb.notifyBackgroundMergers()
} }
tb.partsLock.Unlock() tb.partsLock.Unlock()

View file

@ -154,6 +154,10 @@ type partition struct {
// Contains file-based parts with big number of items. // Contains file-based parts with big number of items.
bigParts []*partWrapper bigParts []*partWrapper
// This channel is used for signaling the background mergers that there are parts,
// which may need to be merged.
needMergeCh chan struct{}
snapshotLock sync.RWMutex snapshotLock sync.RWMutex
stopCh chan struct{} stopCh chan struct{}
@ -280,6 +284,9 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
} }
pt.startBackgroundWorkers() pt.startBackgroundWorkers()
// Wake up a single background merger, so it could start merging parts if needed.
pt.notifyBackgroundMergers()
return pt, nil return pt, nil
} }
@ -292,6 +299,8 @@ func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partit
s: s, s: s,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
needMergeCh: make(chan struct{}, cgroup.AvailableCPUs()),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
p.rawRows.init() p.rawRows.init()
@ -515,6 +524,14 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow {
if rrb != nil { if rrb != nil {
pt.flushRowsToParts(rrb.rows) pt.flushRowsToParts(rrb.rows)
putRawRowsBlock(rrb) putRawRowsBlock(rrb)
// Run assisted merges if needed.
flushConcurrencyCh <- struct{}{}
pt.assistedMergeForInmemoryParts()
pt.assistedMergeForSmallParts()
// There is no need in assisted merges for big parts,
// since the bottleneck is possible only at inmemory and small parts.
<-flushConcurrencyCh
} }
return rows return rows
@ -581,14 +598,21 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
pt.partsLock.Lock() pt.partsLock.Lock()
pt.inmemoryParts = append(pt.inmemoryParts, pws...) pt.inmemoryParts = append(pt.inmemoryParts, pws...)
for range pws {
if !pt.notifyBackgroundMergers() {
break
}
}
pt.partsLock.Unlock() pt.partsLock.Unlock()
}
flushConcurrencyCh <- struct{}{} func (pt *partition) notifyBackgroundMergers() bool {
pt.assistedMergeForInmemoryParts() select {
pt.assistedMergeForSmallParts() case pt.needMergeCh <- struct{}{}:
<-flushConcurrencyCh return true
// There is no need in assisted merges for small and big parts, default:
// since the bottleneck is possible only at inmemory parts. return false
}
} }
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
@ -1025,16 +1049,9 @@ func (pt *partition) startMergeWorkers() {
} }
} }
const (
minMergeSleepTime = 10 * time.Millisecond
maxMergeSleepTime = 10 * time.Second
)
func (pt *partition) mergeWorker() { func (pt *partition) mergeWorker() {
sleepTime := minMergeSleepTime
var lastMergeTime uint64 var lastMergeTime uint64
isFinal := false isFinal := false
t := time.NewTimer(sleepTime)
for { for {
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
// across partitions may exceed the the cap(mergeWorkersLimitCh). // across partitions may exceed the the cap(mergeWorkersLimitCh).
@ -1043,7 +1060,6 @@ func (pt *partition) mergeWorker() {
<-mergeWorkersLimitCh <-mergeWorkersLimitCh
if err == nil { if err == nil {
// Try merging additional parts. // Try merging additional parts.
sleepTime = minMergeSleepTime
lastMergeTime = fasttime.UnixTimestamp() lastMergeTime = fasttime.UnixTimestamp()
isFinal = false isFinal = false
continue continue
@ -1064,16 +1080,11 @@ func (pt *partition) mergeWorker() {
continue continue
} }
// Nothing to merge. Sleep for a while and try again. // Nothing to merge. Wait for the notification of new merge.
sleepTime *= 2
if sleepTime > maxMergeSleepTime {
sleepTime = maxMergeSleepTime
}
select { select {
case <-pt.stopCh: case <-pt.stopCh:
return return
case <-t.C: case <-pt.needMergeCh:
t.Reset(sleepTime)
} }
} }
} }
@ -1565,6 +1576,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper,
default: default:
logger.Panicf("BUG: unknown partType=%d", dstPartType) logger.Panicf("BUG: unknown partType=%d", dstPartType)
} }
pt.notifyBackgroundMergers()
} }
pt.partsLock.Unlock() pt.partsLock.Unlock()