docs/CHANGELOG.md: run at least 4 background mergers on systems with less than 4 CPU cores

This reduces the probability of sudden spike in the number of small parts when all the background mergers
are busy with big merges.
This commit is contained in:
Aliaksandr Valialkin 2023-04-13 23:36:06 -07:00
parent a1c4d00750
commit 0ec44497ea
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 21 additions and 13 deletions

View file

@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## v1.87.x long-time support release (LTS) ## v1.87.x long-time support release (LTS)
* BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): suppress `series after dedup` error message in logs when `-remoteWrite.streamAggr.dedupInterval` command-line flag is set at [vmagent](https://docs.victoriametrics.com/vmgent.html) or when `-streamAggr.dedupInterval` command-line flag is set at [single-node VictoriaMetrics](https://docs.victoriametrics.com/). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): suppress `series after dedup` error message in logs when `-remoteWrite.streamAggr.dedupInterval` command-line flag is set at [vmagent](https://docs.victoriametrics.com/vmgent.html) or when `-streamAggr.dedupInterval` command-line flag is set at [single-node VictoriaMetrics](https://docs.victoriametrics.com/).
## [v1.87.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.5) ## [v1.87.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.5)

View file

@ -923,10 +923,8 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T
} }
func (tb *Table) startMergeWorkers() { func (tb *Table) startMergeWorkers() {
// Start a merge worker per available CPU core.
// The actual number of concurrent merges is limited inside mergeWorker() below. // The actual number of concurrent merges is limited inside mergeWorker() below.
workersCount := cgroup.AvailableCPUs() for i := 0; i < cap(mergeWorkersLimitCh); i++ {
for i := 0; i < workersCount; i++ {
tb.wg.Add(1) tb.wg.Add(1)
go func() { go func() {
tb.mergeWorker() tb.mergeWorker()
@ -1403,7 +1401,18 @@ func (tb *Table) nextMergeIdx() uint64 {
return atomic.AddUint64(&tb.mergeIdx, 1) return atomic.AddUint64(&tb.mergeIdx, 1)
} }
var mergeWorkersLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) var mergeWorkersLimitCh = make(chan struct{}, getWorkersCount())
func getWorkersCount() int {
n := cgroup.AvailableCPUs()
if n < 4 {
// Allow at least 4 merge workers on systems with small CPUs count
// in order to guarantee that background merges can be continued
// when multiple workers are busy with big merges.
n = 4
}
return n
}
func openParts(path string) ([]*partWrapper, error) { func openParts(path string) ([]*partWrapper, error) {
// The path can be missing after restoring from backup, so create it if needed. // The path can be missing after restoring from backup, so create it if needed.

View file

@ -1004,7 +1004,7 @@ func hasActiveMerges(pws []*partWrapper) bool {
return false return false
} }
var mergeWorkersLimitCh = make(chan struct{}, adjustMergeWorkersLimit(getDefaultMergeConcurrency(16))) var mergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(16))
var bigMergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(4)) var bigMergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(4))
@ -1013,7 +1013,7 @@ func getDefaultMergeConcurrency(max int) int {
if v > max { if v > max {
v = max v = max
} }
return v return adjustMergeWorkersLimit(v)
} }
// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks. // SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks.
@ -1040,20 +1040,18 @@ func SetMergeWorkersCount(n int) {
} }
func adjustMergeWorkersLimit(n int) int { func adjustMergeWorkersLimit(n int) int {
if n < 2 { if n < 4 {
// Allow at least 2 merge workers on systems with a single CPU core // Allow at least 4 merge workers on systems with small CPUs count
// in order to guarantee that background merges can be continued // in order to guarantee that background merges can be continued
// when a single worker is busy with the long merge of big parts. // when multiple workers are busy with big merges.
return 2 n = 4
} }
return n return n
} }
func (pt *partition) startMergeWorkers() { func (pt *partition) startMergeWorkers() {
// Start a merge worker per available CPU core.
// The actual number of concurrent merges is limited inside mergeWorker() below. // The actual number of concurrent merges is limited inside mergeWorker() below.
workersCount := cgroup.AvailableCPUs() for i := 0; i < cap(mergeWorkersLimitCh); i++ {
for i := 0; i < workersCount; i++ {
pt.wg.Add(1) pt.wg.Add(1)
go func() { go func() {
pt.mergeWorker() pt.mergeWorker()