From b51fa16177df12402b88e23d98f5e2f7154105e6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 7 Oct 2020 17:35:42 +0300 Subject: [PATCH] app/vmstorage: add `-finalMergeDelay` command-line flag for configuring the delay before final merge for per-month partitions after no new data is ingested to it --- app/vmstorage/main.go | 4 ++++ lib/storage/partition.go | 15 ++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 17ccf7fc8..6b2d7f340 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -30,6 +30,9 @@ var ( snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages") + finalMergeDelay = flag.Duration("finalMergeDelay", 30*time.Second, "The delay before starting final merge for per-month partition after no new data is ingested into it. "+ + "Query speed and disk space usage is usually reduced after the final merge is complete. Too low delay for final merge may result in increased "+ + "disk IO usage and CPU usage") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+ @@ -46,6 +49,7 @@ func main() { cgroup.UpdateGOMAXPROCSToCPUQuota() storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) + storage.SetFinalMergeDelay(*finalMergeDelay) storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index cefa70c70..f159c0b98 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -958,7 +958,7 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error { if !errors.Is(err, errNothingToMerge) { return err } - if fasttime.UnixTimestamp()-lastMergeTime > 30 { + if fasttime.UnixTimestamp()-lastMergeTime > finalMergeDelaySeconds { // We have free time for merging into bigger parts. // This should improve select performance. lastMergeTime = fasttime.UnixTimestamp() @@ -980,6 +980,19 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error { } } +var finalMergeDelaySeconds = uint64(30) + +// SetFinalMergeDelay sets the delay before doing final merge for partitions without newly ingested data. +// +// This function may be called only before Storage initialization. +func SetFinalMergeDelay(delay time.Duration) { + delaySeconds := int(delay.Seconds() + 0.5) + if delaySeconds <= 0 { + return + } + finalMergeDelaySeconds = uint64(delaySeconds) +} + func maxRowsByPath(path string) uint64 { freeSpace := fs.MustGetFreeSpace(path)