mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
docs/CHANGELOG.md: run at least 4 background mergers on systems with less than 4 CPU cores
This reduces the probability of sudden spike in the number of small parts when all the background mergers are busy with big merges.
This commit is contained in:
parent
550d5c7ea4
commit
9f8209d593
3 changed files with 22 additions and 13 deletions
|
@ -21,6 +21,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager.html): add `created_at` field to the output of `/api/v1/backups` API and `vmbackupmanager backup list` command. See this [doc](https://docs.victoriametrics.com/vmbackupmanager.html#api-methods) for data format details.
|
||||
* FEATURE: deprecate `-bigMergeConcurrency` command-line flag, since improper configuration for this flag frequently led to uncontrolled growth of unmerged parts, which, in turn, could lead to queries slowdown and increased CPU usage. The concurrency for [background merges](https://docs.victoriametrics.com/#storage) can be controlled via `-smallMergeConcurrency` command-line flag, though it isn't recommended to do in general case.
|
||||
|
||||
* BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores.
|
||||
|
||||
## [v1.90.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.90.0)
|
||||
|
||||
Released at 2023-04-06
|
||||
|
|
|
@ -929,10 +929,8 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T
|
|||
}
|
||||
|
||||
func (tb *Table) startMergeWorkers() {
|
||||
// Start a merge worker per available CPU core.
|
||||
// The actual number of concurrent merges is limited inside mergeWorker() below.
|
||||
workersCount := cgroup.AvailableCPUs()
|
||||
for i := 0; i < workersCount; i++ {
|
||||
for i := 0; i < cap(mergeWorkersLimitCh); i++ {
|
||||
tb.wg.Add(1)
|
||||
go func() {
|
||||
tb.mergeWorker()
|
||||
|
@ -1365,7 +1363,18 @@ func (tb *Table) nextMergeIdx() uint64 {
|
|||
return atomic.AddUint64(&tb.mergeIdx, 1)
|
||||
}
|
||||
|
||||
var mergeWorkersLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
var mergeWorkersLimitCh = make(chan struct{}, getWorkersCount())
|
||||
|
||||
func getWorkersCount() int {
|
||||
n := cgroup.AvailableCPUs()
|
||||
if n < 4 {
|
||||
// Allow at least 4 merge workers on systems with small CPUs count
|
||||
// in order to guarantee that background merges can be continued
|
||||
// when multiple workers are busy with big merges.
|
||||
n = 4
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func openParts(path string) ([]*partWrapper, error) {
|
||||
// The path can be missing after restoring from backup, so create it if needed.
|
||||
|
|
|
@ -996,14 +996,14 @@ func hasActiveMerges(pws []*partWrapper) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
var mergeWorkersLimitCh = make(chan struct{}, adjustMergeWorkersLimit(getDefaultMergeConcurrency(16)))
|
||||
var mergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(16))
|
||||
|
||||
func getDefaultMergeConcurrency(max int) int {
|
||||
v := (cgroup.AvailableCPUs() + 1) / 2
|
||||
if v > max {
|
||||
v = max
|
||||
}
|
||||
return v
|
||||
return adjustMergeWorkersLimit(v)
|
||||
}
|
||||
|
||||
// SetMergeWorkersCount sets the maximum number of concurrent mergers for parts.
|
||||
|
@ -1019,20 +1019,18 @@ func SetMergeWorkersCount(n int) {
|
|||
}
|
||||
|
||||
func adjustMergeWorkersLimit(n int) int {
|
||||
if n < 2 {
|
||||
// Allow at least 2 merge workers on systems with a single CPU core
|
||||
if n < 4 {
|
||||
// Allow at least 4 merge workers on systems with small CPUs count
|
||||
// in order to guarantee that background merges can be continued
|
||||
// when a single worker is busy with the long merge of big parts.
|
||||
return 2
|
||||
// when multiple workers are busy with big merges.
|
||||
n = 4
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (pt *partition) startMergeWorkers() {
|
||||
// Start a merge worker per available CPU core.
|
||||
// The actual number of concurrent merges is limited inside mergeWorker() below.
|
||||
workersCount := cgroup.AvailableCPUs()
|
||||
for i := 0; i < workersCount; i++ {
|
||||
for i := 0; i < cap(mergeWorkersLimitCh); i++ {
|
||||
pt.wg.Add(1)
|
||||
go func() {
|
||||
pt.mergeWorker()
|
||||
|
|
Loading…
Reference in a new issue