From b8409d66001edae68939b9777160751812254220 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 18 Jan 2023 00:20:56 -0800 Subject: [PATCH] lib/{storage,mergeset}: do not run assisted merges when flushing pending samples to parts Assisted merges are intended to be performed by goroutines, which accept the incoming samples, in order to limit the data ingestion rate. The worker, which converts pending samples to parts, shouldn't be penalized by assisted merges, since this may result in increased number of pending rows as seen at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647#issuecomment-1385039142 when the assisted merge takes too much time. --- lib/mergeset/table.go | 13 ++++++++----- lib/storage/partition.go | 15 ++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 158b96cd4..4d0923981 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -254,6 +254,14 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { tb.flushBlocksToParts(ibsToFlush, false) + if len(ibsToFlush) > 0 { + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + tb.assistedMergeForInmemoryParts() + tb.assistedMergeForFileParts() + <-flushConcurrencyCh + } + return tailItems } @@ -749,11 +757,6 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { tb.inmemoryParts = append(tb.inmemoryParts, pws...) tb.partsLock.Unlock() - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - if tb.flushCallback != nil { if isFinal { tb.flushCallback() diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 65616ee6b..22815a77d 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -515,6 +515,14 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { if rrb != nil { pt.flushRowsToParts(rrb.rows) putRawRowsBlock(rrb) + + // Run assisted merges if needed. + flushConcurrencyCh <- struct{}{} + pt.assistedMergeForInmemoryParts() + pt.assistedMergeForSmallParts() + // There is no need in assisted merges for big parts, + // since the bottleneck is possible only at inmemory and small parts. + <-flushConcurrencyCh } return rows @@ -582,13 +590,6 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { pt.partsLock.Lock() pt.inmemoryParts = append(pt.inmemoryParts, pws...) pt.partsLock.Unlock() - - flushConcurrencyCh <- struct{}{} - pt.assistedMergeForInmemoryParts() - pt.assistedMergeForSmallParts() - <-flushConcurrencyCh - // There is no need in assisted merges for small and big parts, - // since the bottleneck is possible only at inmemory parts. } var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())