lib/storage: reduce the maximum number of concurrent merge workers to GOMAXPROCS/2

Previously the limit has been raised to GOMAXPROCS, but it has been appeared that this
increases query latencies since more CPUs are busy with merges.

While at it, substitute `*MergeConcurrencyLimitCh` channels with simple integer limits.
This commit is contained in:
Aliaksandr Valialkin 2020-07-31 13:48:35 +03:00
parent d01f3c1943
commit 5e71fab8a6

View file

@ -814,11 +814,9 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
return nil
}
var mergeWorkersCount = runtime.GOMAXPROCS(-1)
var (
bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
)
// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks.
@ -829,7 +827,7 @@ func SetBigMergeWorkersCount(n int) {
// Do nothing
return
}
bigMergeConcurrencyLimitCh = make(chan struct{}, n)
bigMergeWorkersCount = n
}
// SetSmallMergeWorkersCount sets the maximum number of concurrent mergers for small blocks.
@ -840,18 +838,18 @@ func SetSmallMergeWorkersCount(n int) {
// Do nothing
return
}
smallMergeConcurrencyLimitCh = make(chan struct{}, n)
smallMergeWorkersCount = n
}
func (pt *partition) startMergeWorkers() {
for i := 0; i < mergeWorkersCount; i++ {
for i := 0; i < smallMergeWorkersCount; i++ {
pt.smallPartsMergerWG.Add(1)
go func() {
pt.smallPartsMerger()
pt.smallPartsMergerWG.Done()
}()
}
for i := 0; i < mergeWorkersCount; i++ {
for i := 0; i < bigMergeWorkersCount; i++ {
pt.bigPartsMergerWG.Add(1)
go func() {
pt.bigPartsMerger()
@ -924,11 +922,11 @@ func maxRowsByPath(path string) uint64 {
freeSpace := fs.MustGetFreeSpace(path)
// Calculate the maximum number of rows in the output merge part
// by dividing the freeSpace by the number of concurrent
// mergeWorkersCount for big parts.
// This assumes each row is compressed into 0.5 bytes
// by dividing the freeSpace by the maximum number of concurrent
// workers for big parts.
// This assumes each row is compressed into 1 byte
// according to production data.
maxRows := 2 * (freeSpace / uint64(mergeWorkersCount))
maxRows := freeSpace / uint64(bigMergeWorkersCount)
if maxRows > maxRowsPerBigPart {
maxRows = maxRowsPerBigPart
}
@ -936,11 +934,6 @@ func maxRowsByPath(path string) uint64 {
}
func (pt *partition) mergeBigParts(isFinal bool) error {
bigMergeConcurrencyLimitCh <- struct{}{}
defer func() {
<-bigMergeConcurrencyLimitCh
}()
maxRows := maxRowsByPath(pt.bigPartsPath)
pt.partsLock.Lock()
@ -954,11 +947,6 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
}
func (pt *partition) mergeSmallParts(isFinal bool) error {
smallMergeConcurrencyLimitCh <- struct{}{}
defer func() {
<-smallMergeConcurrencyLimitCh
}()
maxRows := maxRowsByPath(pt.smallPartsPath)
if maxRows > maxRowsPerSmallPart() {
// The output part may go to big part,
@ -1244,7 +1232,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
}
// Filter out too big parts.
// This should reduce N for O(n^2) algorithm below.
// This should reduce N for O(N^2) algorithm below.
maxInPartRows := maxRows / 2
tmp := make([]*partWrapper, 0, len(src))
for _, pw := range src {