app/vmstorage: add -bigMergeConcurrency and -smallMergeConcurrency flags for tuning the maximum number of CPU cores used during merges

This commit is contained in:
Aliaksandr Valialkin 2019-10-31 16:16:53 +02:00
parent 6a22727676
commit 6ab9c98a1e
2 changed files with 37 additions and 2 deletions

View file

@ -25,6 +25,9 @@ var (
vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services")
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
)
func main() {
@ -32,6 +35,9 @@ func main() {
buildinfo.Init()
logger.Init()
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
logger.Infof("opening storage at %q with retention period %d months", *storageDataPath, *retentionPeriod)
startTime := time.Now()
strg, err := storage.OpenStorage(*storageDataPath, *retentionPeriod)

View file

@ -742,8 +742,37 @@ var mergeWorkersCount = func() int {
return n
}()
var bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
var smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
var (
bigMergeWorkersCount = uint64(mergeWorkersCount)
smallMergeWorkersCount = uint64(mergeWorkersCount)
)
var (
bigMergeConcurrencyLimitCh = make(chan struct{}, bigMergeWorkersCount)
smallMergeConcurrencyLimitCh = make(chan struct{}, smallMergeWorkersCount)
)
// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks.
//
// The function must be called before opening or creating any storage.
func SetBigMergeWorkersCount(n int) {
if n <= 0 {
// Do nothing
return
}
atomic.StoreUint64(&bigMergeWorkersCount, uint64(n))
}
// SetSmallMergeWorkersCount sets the maximum number of concurrent mergers for small blocks.
//
// The function must be called before opening or creating any storage.
func SetSmallMergeWorkersCount(n int) {
if n <= 0 {
// Do nothing
return
}
atomic.StoreUint64(&smallMergeWorkersCount, uint64(n))
}
func (pt *partition) startMergeWorkers() {
for i := 0; i < mergeWorkersCount; i++ {