From b8409d66001edae68939b9777160751812254220 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 18 Jan 2023 00:20:56 -0800 Subject: [PATCH 1/2] lib/{storage,mergeset}: do not run assisted merges when flushing pending samples to parts Assisted merges are intended to be performed by goroutines, which accept the incoming samples, in order to limit the data ingestion rate. The worker, which converts pending samples to parts, shouldn't be penalized by assisted merges, since this may result in increased number of pending rows as seen at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647#issuecomment-1385039142 when the assisted merge takes too much time. --- lib/mergeset/table.go | 13 ++++++++----- lib/storage/partition.go | 15 ++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 158b96cd4..4d0923981 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -254,6 +254,14 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { tb.flushBlocksToParts(ibsToFlush, false) + if len(ibsToFlush) > 0 { + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + tb.assistedMergeForInmemoryParts() + tb.assistedMergeForFileParts() + <-flushConcurrencyCh + } + return tailItems } @@ -749,11 +757,6 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { tb.inmemoryParts = append(tb.inmemoryParts, pws...) tb.partsLock.Unlock() - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - if tb.flushCallback != nil { if isFinal { tb.flushCallback() diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 65616ee6b..22815a77d 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -515,6 +515,14 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { if rrb != nil { pt.flushRowsToParts(rrb.rows) putRawRowsBlock(rrb) + + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + pt.assistedMergeForInmemoryParts() + pt.assistedMergeForSmallParts() + // There is no need in assisted merges for big parts, + // since the bottleneck is possible only at inmemory and small parts. + <-flushConcurrencyCh } return rows @@ -582,13 +590,6 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { pt.partsLock.Lock() pt.inmemoryParts = append(pt.inmemoryParts, pws...) pt.partsLock.Unlock() - - flushConcurrencyCh <- struct{}{} - pt.assistedMergeForInmemoryParts() - pt.assistedMergeForSmallParts() - <-flushConcurrencyCh - // There is no need in assisted merges for small and big parts, - // since the bottleneck is possible only at inmemory parts. } var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) From 2ac530eb28e4cbbe47da28b875a652afd5cd1417 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 18 Jan 2023 01:09:03 -0800 Subject: [PATCH 2/2] lib/{storage,mergeset}: wake up background merges as soon as there is a potential work for them Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647 --- lib/mergeset/table.go | 40 +++++++++++++++++++++-------------- lib/storage/partition.go | 45 +++++++++++++++++++++++++--------------- 2 files changed, 53 insertions(+), 32 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 4d0923981..ef403174e 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -137,6 +137,10 @@ type Table struct { // fileParts contains file-backed parts. fileParts []*partWrapper + // This channel is used for signaling the background mergers that there are parts, + // which may need to be merged. + needMergeCh chan struct{} + snapshotLock sync.RWMutex flockF *os.File @@ -340,12 +344,16 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb isReadOnly: isReadOnly, fileParts: pws, mergeIdx: uint64(time.Now().UnixNano()), + needMergeCh: make(chan struct{}, 1), flockF: flockF, stopCh: make(chan struct{}), } tb.rawItems.init() tb.startBackgroundWorkers() + // Wake up a single background merger, so it could start merging parts if needed. + tb.notifyBackgroundMergers() + var m TableMetrics tb.UpdateMetrics(&m) logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d", @@ -755,6 +763,11 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { tb.partsLock.Lock() tb.inmemoryParts = append(tb.inmemoryParts, pws...) + for range pws { + if !tb.notifyBackgroundMergers() { + break + } + } tb.partsLock.Unlock() if tb.flushCallback != nil { @@ -766,6 +779,15 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { } } +func (tb *Table) notifyBackgroundMergers() bool { + select { + case tb.needMergeCh <- struct{}{}: + return true + default: + return false + } +} + var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) func needAssistedMerge(pws []*partWrapper, maxParts int) bool { @@ -971,16 +993,9 @@ func (tb *Table) mergeExistingParts(isFinal bool) error { return tb.mergeParts(pws, tb.stopCh, isFinal) } -const ( - minMergeSleepTime = 10 * time.Millisecond - maxMergeSleepTime = 10 * time.Second -) - func (tb *Table) mergeWorker() { - sleepTime := minMergeSleepTime var lastMergeTime uint64 isFinal := false - t := time.NewTimer(sleepTime) for { // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers // across tables may exceed the the cap(mergeWorkersLimitCh). @@ -989,7 +1004,6 @@ func (tb *Table) mergeWorker() { <-mergeWorkersLimitCh if err == nil { // Try merging additional parts. - sleepTime = minMergeSleepTime lastMergeTime = fasttime.UnixTimestamp() isFinal = false continue @@ -1010,16 +1024,11 @@ func (tb *Table) mergeWorker() { continue } - // Nothing to merge. Sleep for a while and try again. - sleepTime *= 2 - if sleepTime > maxMergeSleepTime { - sleepTime = maxMergeSleepTime - } + // Nothing to merge. Wait for the notification of new merge. select { case <-tb.stopCh: return - case <-t.C: - t.Reset(sleepTime) + case <-tb.needMergeCh: } } } @@ -1340,6 +1349,7 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } + tb.notifyBackgroundMergers() } tb.partsLock.Unlock() diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 22815a77d..1e0a7532b 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -154,6 +154,10 @@ type partition struct { // Contains file-based parts with big number of items. bigParts []*partWrapper + // This channel is used for signaling the background mergers that there are parts, + // which may need to be merged. + needMergeCh chan struct{} + snapshotLock sync.RWMutex stopCh chan struct{} @@ -280,6 +284,9 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, } pt.startBackgroundWorkers() + // Wake up a single background merger, so it could start merging parts if needed. + pt.notifyBackgroundMergers() + return pt, nil } @@ -291,8 +298,10 @@ func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partit s: s, - mergeIdx: uint64(time.Now().UnixNano()), - stopCh: make(chan struct{}), + mergeIdx: uint64(time.Now().UnixNano()), + needMergeCh: make(chan struct{}, cgroup.AvailableCPUs()), + + stopCh: make(chan struct{}), } p.rawRows.init() return p @@ -589,9 +598,23 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { pt.partsLock.Lock() pt.inmemoryParts = append(pt.inmemoryParts, pws...) + for range pws { + if !pt.notifyBackgroundMergers() { + break + } + } pt.partsLock.Unlock() } +func (pt *partition) notifyBackgroundMergers() bool { + select { + case pt.needMergeCh <- struct{}{}: + return true + default: + return false + } +} + var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) func needAssistedMerge(pws []*partWrapper, maxParts int) bool { @@ -1026,16 +1049,9 @@ func (pt *partition) startMergeWorkers() { } } -const ( - minMergeSleepTime = 10 * time.Millisecond - maxMergeSleepTime = 10 * time.Second -) - func (pt *partition) mergeWorker() { - sleepTime := minMergeSleepTime var lastMergeTime uint64 isFinal := false - t := time.NewTimer(sleepTime) for { // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers // across partitions may exceed the the cap(mergeWorkersLimitCh). @@ -1044,7 +1060,6 @@ func (pt *partition) mergeWorker() { <-mergeWorkersLimitCh if err == nil { // Try merging additional parts. - sleepTime = minMergeSleepTime lastMergeTime = fasttime.UnixTimestamp() isFinal = false continue @@ -1065,16 +1080,11 @@ func (pt *partition) mergeWorker() { continue } - // Nothing to merge. Sleep for a while and try again. - sleepTime *= 2 - if sleepTime > maxMergeSleepTime { - sleepTime = maxMergeSleepTime - } + // Nothing to merge. Wait for the notification of new merge. select { case <-pt.stopCh: return - case <-t.C: - t.Reset(sleepTime) + case <-pt.needMergeCh: } } } @@ -1566,6 +1576,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } + pt.notifyBackgroundMergers() } pt.partsLock.Unlock()