From 6ab9c98a1ecdeddd136cd95dda32d3c5698ecbf0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 31 Oct 2019 16:16:53 +0200 Subject: [PATCH] app/vmstorage: add `-bigMergeConcurrency` and `-smallMergeConcurrency` flags for tuning the maximum number of CPU cores used during merges --- app/vmstorage/main.go | 6 ++++++ lib/storage/partition.go | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 2408266f5d..53af4c0e99 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -25,6 +25,9 @@ var ( vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services") vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") + + bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") + smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") ) func main() { @@ -32,6 +35,9 @@ func main() { buildinfo.Init() logger.Init() + storage.SetBigMergeWorkersCount(*bigMergeConcurrency) + storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) + logger.Infof("opening storage at %q with retention period %d months", *storageDataPath, *retentionPeriod) startTime := time.Now() strg, err := storage.OpenStorage(*storageDataPath, *retentionPeriod) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index f2984830bb..50cae5d643 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -742,8 +742,37 @@ var mergeWorkersCount = func() int { return n }() -var bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) -var smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) +var ( + bigMergeWorkersCount = uint64(mergeWorkersCount) + smallMergeWorkersCount = uint64(mergeWorkersCount) +) + +var ( + bigMergeConcurrencyLimitCh = make(chan struct{}, bigMergeWorkersCount) + smallMergeConcurrencyLimitCh = make(chan struct{}, smallMergeWorkersCount) +) + +// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks. +// +// The function must be called before opening or creating any storage. +func SetBigMergeWorkersCount(n int) { + if n <= 0 { + // Do nothing + return + } + atomic.StoreUint64(&bigMergeWorkersCount, uint64(n)) +} + +// SetSmallMergeWorkersCount sets the maximum number of concurrent mergers for small blocks. +// +// The function must be called before opening or creating any storage. +func SetSmallMergeWorkersCount(n int) { + if n <= 0 { + // Do nothing + return + } + atomic.StoreUint64(&smallMergeWorkersCount, uint64(n)) +} func (pt *partition) startMergeWorkers() { for i := 0; i < mergeWorkersCount; i++ {