diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9b08c5b55..1d259ede2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -23,6 +23,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmalert enterprise](https://docs.victoriametrics.com/vmalert.html): add ability to read alerting and recording rules from S3, GCS or S3-compatible object storage. See [these docs](https://docs.victoriametrics.com/vmalert.html#reading-rules-from-object-storage). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `mad_over_time(m[d])` function for calculating the [median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) over raw samples on the lookbehind window `d`. See [this feature request](https://github.com/prometheus/prometheus/issues/5514). +* BUGFIX: prevent from possible data ingestion slowdown and query performance slowdown during [background merges of big parts](https://docs.victoriametrics.com/#storage) on systems with small number of CPU cores (1 or 2 CPU cores). The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790). + ## [v1.87.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.1) Released at 2023-02-09 diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index ef403174e..9ca34650b 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -788,7 +788,18 @@ func (tb *Table) notifyBackgroundMergers() bool { } } -var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) +var flushConcurrencyLimit = func() int { + n := cgroup.AvailableCPUs() + if n < 2 { + // Allow at least 2 concurrent flushers on systems with a single CPU core + // in order to guarantee that in-memory data flushes and background merges can be continued + // when a single flusher is busy with the long merge. + n = 2 + } + return n +}() + +var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) func needAssistedMerge(pws []*partWrapper, maxParts int) bool { if len(pws) < maxParts { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index cbcaf295d..15b9e1a0e 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -615,7 +615,19 @@ func (pt *partition) notifyBackgroundMergers() bool { } } -var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) +var flushConcurrencyLimit = func() int { + n := cgroup.AvailableCPUs() + if n < 3 { + // Allow at least 3 concurrent flushers on systems with a single CPU core + // in order to guarantee that in-memory data flushes and background merges can be continued + // when a single flusher is busy with the long merge of big parts, + // while another flusher is busy with the long merge of small parts. + n = 3 + } + return n +}() + +var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) func needAssistedMerge(pws []*partWrapper, maxParts int) bool { if len(pws) < maxParts { @@ -1007,7 +1019,7 @@ func hasActiveMerges(pws []*partWrapper) bool { return false } -var mergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(16)) +var mergeWorkersLimitCh = make(chan struct{}, adjustMergeWorkersLimit(getDefaultMergeConcurrency(16))) var bigMergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(4)) @@ -1038,9 +1050,20 @@ func SetMergeWorkersCount(n int) { // Do nothing return } + n = adjustMergeWorkersLimit(n) mergeWorkersLimitCh = make(chan struct{}, n) } +func adjustMergeWorkersLimit(n int) int { + if n < 2 { + // Allow at least 2 merge workers on systems with a single CPU core + // in order to guarantee that background merges can be continued + // when a single worker is busy with the long merge of big parts. + return 2 + } + return n +} + func (pt *partition) startMergeWorkers() { // Start a merge worker per available CPU core. // The actual number of concurrent merges is limited inside mergeWorker() below.