lib/{storage,mergeset}: do not run assisted merges when flushing pending samples to parts

Assisted merges are intended to be performed by goroutines, which accept the incoming samples,
in order to limit the data ingestion rate.

The worker, which converts pending samples to parts, shouldn't be penalized by assisted merges,
since this may result in increased number of pending rows as seen at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647#issuecomment-1385039142
when the assisted merge takes too much time.
This commit is contained in:
Aliaksandr Valialkin 2023-01-18 00:20:56 -08:00
parent 1ac025bbc9
commit b8409d6600
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 16 additions and 12 deletions

View file

@ -254,6 +254,14 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
tb.flushBlocksToParts(ibsToFlush, false)
if len(ibsToFlush) > 0 {
// Run assisted merges if needed.
flushConcurrencyCh <- struct{}{}
tb.assistedMergeForInmemoryParts()
tb.assistedMergeForFileParts()
<-flushConcurrencyCh
}
return tailItems
}
@ -749,11 +757,6 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
tb.inmemoryParts = append(tb.inmemoryParts, pws...)
tb.partsLock.Unlock()
flushConcurrencyCh <- struct{}{}
tb.assistedMergeForInmemoryParts()
tb.assistedMergeForFileParts()
<-flushConcurrencyCh
if tb.flushCallback != nil {
if isFinal {
tb.flushCallback()

View file

@ -515,6 +515,14 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow {
if rrb != nil {
pt.flushRowsToParts(rrb.rows)
putRawRowsBlock(rrb)
// Run assisted merges if needed.
flushConcurrencyCh <- struct{}{}
pt.assistedMergeForInmemoryParts()
pt.assistedMergeForSmallParts()
// There is no need in assisted merges for big parts,
// since the bottleneck is possible only at inmemory and small parts.
<-flushConcurrencyCh
}
return rows
@ -582,13 +590,6 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
pt.partsLock.Lock()
pt.inmemoryParts = append(pt.inmemoryParts, pws...)
pt.partsLock.Unlock()
flushConcurrencyCh <- struct{}{}
pt.assistedMergeForInmemoryParts()
pt.assistedMergeForSmallParts()
<-flushConcurrencyCh
// There is no need in assisted merges for small and big parts,
// since the bottleneck is possible only at inmemory parts.
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())