diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f26ce06800..e57a71677b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -17,6 +17,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * SECURITY: upgrade base docker image (alpine) from 3.17.1 to 3.17.2. See [alpine 3.17.2 release notes](https://alpinelinux.org/posts/Alpine-3.17.2-released.html). +* BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790). + ## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1) Released at 2023-02-09 diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index ef403174e1..9ca34650b4 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -788,7 +788,18 @@ func (tb *Table) notifyBackgroundMergers() bool { } } -var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) +var flushConcurrencyLimit = func() int { + n := cgroup.AvailableCPUs() + if n < 2 { + // Allow at least 2 concurrent flushers on systems with a single CPU core + // in order to guarantee that in-memory data flushes and background merges can be continued + // when a single flusher is busy with the long merge. + n = 2 + } + return n +}() + +var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) func needAssistedMerge(pws []*partWrapper, maxParts int) bool { if len(pws) < maxParts { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index cbcaf295dd..15b9e1a0ea 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -615,7 +615,19 @@ func (pt *partition) notifyBackgroundMergers() bool { } } -var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) +var flushConcurrencyLimit = func() int { + n := cgroup.AvailableCPUs() + if n < 3 { + // Allow at least 3 concurrent flushers on systems with a single CPU core + // in order to guarantee that in-memory data flushes and background merges can be continued + // when a single flusher is busy with the long merge of big parts, + // while another flusher is busy with the long merge of small parts. + n = 3 + } + return n +}() + +var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) func needAssistedMerge(pws []*partWrapper, maxParts int) bool { if len(pws) < maxParts { @@ -1007,7 +1019,7 @@ func hasActiveMerges(pws []*partWrapper) bool { return false } -var mergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(16)) +var mergeWorkersLimitCh = make(chan struct{}, adjustMergeWorkersLimit(getDefaultMergeConcurrency(16))) var bigMergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(4)) @@ -1038,9 +1050,20 @@ func SetMergeWorkersCount(n int) { // Do nothing return } + n = adjustMergeWorkersLimit(n) mergeWorkersLimitCh = make(chan struct{}, n) } +func adjustMergeWorkersLimit(n int) int { + if n < 2 { + // Allow at least 2 merge workers on systems with a single CPU core + // in order to guarantee that background merges can be continued + // when a single worker is busy with the long merge of big parts. + return 2 + } + return n +} + func (pt *partition) startMergeWorkers() { // Start a merge worker per available CPU core. // The actual number of concurrent merges is limited inside mergeWorker() below.