From 0ec44497ea69a2d12f4051c4ae8c7788dfa24268 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Apr 2023 23:36:06 -0700 Subject: [PATCH] docs/CHANGELOG.md: run at least 4 background mergers on systems with less than 4 CPU cores This reduces the probability of sudden spike in the number of small parts when all the background mergers are busy with big merges. --- docs/CHANGELOG.md | 1 + lib/mergeset/table.go | 17 +++++++++++++---- lib/storage/partition.go | 16 +++++++--------- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1751b8a0c..7c6476805 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## v1.87.x long-time support release (LTS) +* BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores. * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): suppress `series after dedup` error message in logs when `-remoteWrite.streamAggr.dedupInterval` command-line flag is set at [vmagent](https://docs.victoriametrics.com/vmgent.html) or when `-streamAggr.dedupInterval` command-line flag is set at [single-node VictoriaMetrics](https://docs.victoriametrics.com/). ## [v1.87.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.5) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 5ce62829b..513597434 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -923,10 +923,8 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T } func (tb *Table) startMergeWorkers() { - // Start a merge worker per available CPU core. // The actual number of concurrent merges is limited inside mergeWorker() below. - workersCount := cgroup.AvailableCPUs() - for i := 0; i < workersCount; i++ { + for i := 0; i < cap(mergeWorkersLimitCh); i++ { tb.wg.Add(1) go func() { tb.mergeWorker() @@ -1403,7 +1401,18 @@ func (tb *Table) nextMergeIdx() uint64 { return atomic.AddUint64(&tb.mergeIdx, 1) } -var mergeWorkersLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) +var mergeWorkersLimitCh = make(chan struct{}, getWorkersCount()) + +func getWorkersCount() int { + n := cgroup.AvailableCPUs() + if n < 4 { + // Allow at least 4 merge workers on systems with small CPUs count + // in order to guarantee that background merges can be continued + // when multiple workers are busy with big merges. + n = 4 + } + return n +} func openParts(path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 8127943df..f9d9f8b51 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1004,7 +1004,7 @@ func hasActiveMerges(pws []*partWrapper) bool { return false } -var mergeWorkersLimitCh = make(chan struct{}, adjustMergeWorkersLimit(getDefaultMergeConcurrency(16))) +var mergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(16)) var bigMergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(4)) @@ -1013,7 +1013,7 @@ func getDefaultMergeConcurrency(max int) int { if v > max { v = max } - return v + return adjustMergeWorkersLimit(v) } // SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks. @@ -1040,20 +1040,18 @@ func SetMergeWorkersCount(n int) { } func adjustMergeWorkersLimit(n int) int { - if n < 2 { - // Allow at least 2 merge workers on systems with a single CPU core + if n < 4 { + // Allow at least 4 merge workers on systems with small CPUs count // in order to guarantee that background merges can be continued - // when a single worker is busy with the long merge of big parts. - return 2 + // when multiple workers are busy with big merges. + n = 4 } return n } func (pt *partition) startMergeWorkers() { - // Start a merge worker per available CPU core. // The actual number of concurrent merges is limited inside mergeWorker() below. - workersCount := cgroup.AvailableCPUs() - for i := 0; i < workersCount; i++ { + for i := 0; i < cap(mergeWorkersLimitCh); i++ { pt.wg.Add(1) go func() { pt.mergeWorker()