app/vmstorage: add -finalMergeDelay command-line flag for configuring the delay before final merge for per-month partitions after no new data is ingested to it

This commit is contained in:
Aliaksandr Valialkin 2020-10-07 17:35:42 +03:00
parent 5d5076c4a2
commit b51fa16177
2 changed files with 18 additions and 1 deletions

View file

@ -30,6 +30,9 @@ var (
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages") forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages")
finalMergeDelay = flag.Duration("finalMergeDelay", 30*time.Second, "The delay before starting final merge for per-month partition after no new data is ingested into it. "+
"Query speed and disk space usage is usually reduced after the final merge is complete. Too low delay for final merge may result in increased "+
"disk IO usage and CPU usage")
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+ minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
@ -46,6 +49,7 @@ func main() {
cgroup.UpdateGOMAXPROCSToCPUQuota() cgroup.UpdateGOMAXPROCSToCPUQuota()
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)
storage.SetFinalMergeDelay(*finalMergeDelay)
storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)

View file

@ -958,7 +958,7 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error {
if !errors.Is(err, errNothingToMerge) { if !errors.Is(err, errNothingToMerge) {
return err return err
} }
if fasttime.UnixTimestamp()-lastMergeTime > 30 { if fasttime.UnixTimestamp()-lastMergeTime > finalMergeDelaySeconds {
// We have free time for merging into bigger parts. // We have free time for merging into bigger parts.
// This should improve select performance. // This should improve select performance.
lastMergeTime = fasttime.UnixTimestamp() lastMergeTime = fasttime.UnixTimestamp()
@ -980,6 +980,19 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error {
} }
} }
var finalMergeDelaySeconds = uint64(30)
// SetFinalMergeDelay sets the delay before doing final merge for partitions without newly ingested data.
//
// This function may be called only before Storage initialization.
func SetFinalMergeDelay(delay time.Duration) {
delaySeconds := int(delay.Seconds() + 0.5)
if delaySeconds <= 0 {
return
}
finalMergeDelaySeconds = uint64(delaySeconds)
}
func maxRowsByPath(path string) uint64 { func maxRowsByPath(path string) uint64 {
freeSpace := fs.MustGetFreeSpace(path) freeSpace := fs.MustGetFreeSpace(path)