From 2a190f6451ea7b2d1123e46dcd74ff6655360c25 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 12 Dec 2022 16:49:21 -0800 Subject: [PATCH] lib/{mergeset,storage}: do not block small merges by pending big merges - assist with small merges instead Blocked small merges may result into big number of small parts, which, in turn, may result in increased CPU and memory usage during queries, since queries need to inspect all the existing small parts. The issue has been introduced in 8189770c50165b62867327ad388f2c2ef237ab6f Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337 --- app/vmstorage/main.go | 9 ++++++- docs/CHANGELOG.md | 3 ++- lib/mergeset/table.go | 54 ++++++++++++++++++++++++++++++++++------ lib/storage/partition.go | 52 ++++++++++++++++++++++++++++++++++---- 4 files changed, 104 insertions(+), 14 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 22b9566e28..d96c90e761 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -467,8 +467,15 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_assisted_merges_total{type="storage/inmemory"}`, func() float64 { return float64(tm().InmemoryAssistedMerges) }) + metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 { + return float64(tm().SmallAssistedMerges) + }) + metrics.NewGauge(`vm_assisted_merges_total{type="indexdb/inmemory"}`, func() float64 { - return float64(idbm().AssistedInmemoryMerges) + return float64(idbm().InmemoryAssistedMerges) + }) + metrics.NewGauge(`vm_assisted_merges_total{type="indexdb/file"}`, func() float64 { + return float64(idbm().FileAssistedMerges) }) metrics.NewGauge(`vm_indexdb_items_added_total`, func() float64 { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 60217e2380..786e527351 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -14,11 +14,12 @@ The following tip changes can be tested by building VictoriaMetrics components f * [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build) ## tip -* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246). +* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246). * BUGFIX: [DataDog protocol parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): do not re-use `host` and `device` fields from the previously parsed messages if these fields are missing in the currently parsed message. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432). * BUGFIX: reduce CPU usage when the regex-based relabeling rules are applied to more than 100K unique Graphite metrics. See [this issue](https://docs.victoriametrics.com/CHANGELOG.html#v1820). The issue was introduced in [v1.82.0](https://docs.victoriametrics.com/CHANGELOG.html#v1820). +* BUGFIX: do not block [merges](https://docs.victoriametrics.com/#storage) of small parts by merges of big parts on hosts with small number of CPU cores. This issue could result in the increasing number of `storage/small` parts while big merge is in progress. This, in turn, could result in increased CPU usage and memory usage during querying, since queries need to inspect bigger number of small parts. The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850). ## [v1.85.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.0) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 08b1e4e378..aa921b9e1f 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -25,8 +25,17 @@ import ( // maxInmemoryParts is the maximum number of inmemory parts in the table. // // This number may be reached when the insertion pace outreaches merger pace. +// If this number is reached, then assisted merges are performed +// during data ingestion. const maxInmemoryParts = 64 +// maxFileParts is the maximum number of file parts in the table. +// +// This number may be reached when the insertion pace outreaches merger pace. +// If this number is reached, then assisted merges are performed +// during data ingestion. +const maxFileParts = 256 + // Default number of parts to merge at once. // // This number has been obtained empirically - it gives the lowest possible overhead. @@ -98,7 +107,8 @@ type Table struct { inmemoryItemsMerged uint64 fileItemsMerged uint64 - assistedInmemoryMerges uint64 + inmemoryAssistedMerges uint64 + fileAssistedMerges uint64 itemsAdded uint64 itemsAddedSizeBytes uint64 @@ -419,7 +429,8 @@ type TableMetrics struct { InmemoryItemsMerged uint64 FileItemsMerged uint64 - AssistedInmemoryMerges uint64 + InmemoryAssistedMerges uint64 + FileAssistedMerges uint64 ItemsAdded uint64 ItemsAddedSizeBytes uint64 @@ -469,7 +480,8 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.InmemoryItemsMerged += atomic.LoadUint64(&tb.inmemoryItemsMerged) m.FileItemsMerged += atomic.LoadUint64(&tb.fileItemsMerged) - m.AssistedInmemoryMerges += atomic.LoadUint64(&tb.assistedInmemoryMerges) + m.InmemoryAssistedMerges += atomic.LoadUint64(&tb.inmemoryAssistedMerges) + m.FileAssistedMerges += atomic.LoadUint64(&tb.fileAssistedMerges) m.ItemsAdded += atomic.LoadUint64(&tb.itemsAdded) m.ItemsAddedSizeBytes += atomic.LoadUint64(&tb.itemsAddedSizeBytes) @@ -739,9 +751,8 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { flushConcurrencyCh <- struct{}{} tb.assistedMergeForInmemoryParts() + tb.assistedMergeForFileParts() <-flushConcurrencyCh - // There is no need in assited merge for file parts, - // since the bottleneck is possible only at inmemory parts. if tb.flushCallback != nil { if isFinal { @@ -765,10 +776,10 @@ func (tb *Table) assistedMergeForInmemoryParts() { // Prioritize assisted merges over searches. storagepacelimiter.Search.Inc() + atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) err := tb.mergeInmemoryParts() storagepacelimiter.Search.Dec() if err == nil { - atomic.AddUint64(&tb.assistedInmemoryMerges, 1) continue } if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { @@ -778,6 +789,30 @@ func (tb *Table) assistedMergeForInmemoryParts() { } } +func (tb *Table) assistedMergeForFileParts() { + for { + tb.partsLock.Lock() + ok := getNotInMergePartsCount(tb.fileParts) < maxFileParts + tb.partsLock.Unlock() + if ok { + return + } + + // Prioritize assisted merges over searches. + storagepacelimiter.Search.Inc() + atomic.AddUint64(&tb.fileAssistedMerges, 1) + err := tb.mergeExistingParts(false) + storagepacelimiter.Search.Dec() + if err == nil { + continue + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { + return + } + logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) + } +} + func getNotInMergePartsCount(pws []*partWrapper) int { n := 0 for _, pw := range pws { @@ -866,7 +901,10 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T } func (tb *Table) startMergeWorkers() { - for i := 0; i < cap(mergeWorkersLimitCh); i++ { + // Start a merge worker per available CPU core. + // The actual number of concurrent merges is limited inside mergeWorker() below. + workersCount := cgroup.AvailableCPUs() + for i := 0; i < workersCount; i++ { tb.wg.Add(1) go func() { tb.mergeWorker() @@ -940,6 +978,8 @@ func (tb *Table) mergeWorker() { isFinal := false t := time.NewTimer(sleepTime) for { + // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers + // across tables may exceed the the cap(mergeWorkersLimitCh). mergeWorkersLimitCh <- struct{}{} err := tb.mergeExistingParts(isFinal) <-mergeWorkersLimitCh diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 020e6a3201..77f07a2443 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -31,8 +31,15 @@ import ( const maxBigPartSize = 1e12 // The maximum number of inmemory parts in the partition. +// +// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion. const maxInmemoryPartsPerPartition = 32 +// The maximum number of small parts in the partition. +// +// If the number of small parts reaches this value, then assisted merge runs during data ingestion. +const maxSmallPartsPerPartition = 128 + // Default number of parts to merge at once. // // This number has been obtained empirically - it gives the lowest possible overhead. @@ -112,6 +119,7 @@ type partition struct { bigRowsDeleted uint64 inmemoryAssistedMerges uint64 + smallAssistedMerges uint64 mergeNeedFreeDiskSpace uint64 @@ -338,6 +346,7 @@ type partitionMetrics struct { BigPartsRefCount uint64 InmemoryAssistedMerges uint64 + SmallAssistedMerges uint64 MergeNeedFreeDiskSpace uint64 } @@ -404,6 +413,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.BigRowsDeleted += atomic.LoadUint64(&pt.bigRowsDeleted) m.InmemoryAssistedMerges += atomic.LoadUint64(&pt.inmemoryAssistedMerges) + m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges) m.MergeNeedFreeDiskSpace += atomic.LoadUint64(&pt.mergeNeedFreeDiskSpace) } @@ -576,6 +586,7 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { flushConcurrencyCh <- struct{}{} pt.assistedMergeForInmemoryParts() + pt.assistedMergeForSmallParts() <-flushConcurrencyCh // There is no need in assisted merges for small and big parts, // since the bottleneck is possible only at inmemory parts. @@ -597,10 +608,10 @@ func (pt *partition) assistedMergeForInmemoryParts() { // Assist with mering inmemory parts. // Prioritize assisted merges over searches. storagepacelimiter.Search.Inc() + atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) err := pt.mergeInmemoryParts() storagepacelimiter.Search.Dec() if err == nil { - atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) continue } if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { @@ -610,6 +621,33 @@ func (pt *partition) assistedMergeForInmemoryParts() { } } +func (pt *partition) assistedMergeForSmallParts() { + for { + pt.partsLock.Lock() + ok := getNotInMergePartsCount(pt.smallParts) < maxSmallPartsPerPartition + pt.partsLock.Unlock() + if ok { + return + } + + // There are too many unmerged small parts. + // This usually means that the app cannot keep up with the data ingestion rate. + // Assist with mering small parts. + // Prioritize assisted merges over searches. + storagepacelimiter.Search.Inc() + atomic.AddUint64(&pt.smallAssistedMerges, 1) + err := pt.mergeExistingParts(false) + storagepacelimiter.Search.Dec() + if err == nil { + continue + } + if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { + return + } + logger.Panicf("FATAL: cannot merge small parts: %s", err) + } +} + func getNotInMergePartsCount(pws []*partWrapper) int { n := 0 for _, pw := range pws { @@ -981,7 +1019,10 @@ func SetMergeWorkersCount(n int) { } func (pt *partition) startMergeWorkers() { - for i := 0; i < cap(mergeWorkersLimitCh); i++ { + // Start a merge worker per available CPU core. + // The actual number of concurrent merges is limited inside mergeWorker() below. + workersCount := cgroup.AvailableCPUs() + for i := 0; i < workersCount; i++ { pt.wg.Add(1) go func() { pt.mergeWorker() @@ -1001,7 +1042,8 @@ func (pt *partition) mergeWorker() { isFinal := false t := time.NewTimer(sleepTime) for { - // Limit the number of concurrent calls to mergeExistingParts, cine the number of merge + // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers + // across partitions may exceed the the cap(mergeWorkersLimitCh). mergeWorkersLimitCh <- struct{}{} err := pt.mergeExistingParts(isFinal) <-mergeWorkersLimitCh @@ -1092,10 +1134,10 @@ func (pt *partition) getMaxBigPartSize() uint64 { func getMaxOutBytes(path string, workersCount int) uint64 { n := fs.MustGetFreeSpace(path) - // Do not substract freeDiskSpaceLimitBytes from n before calculating the maxOutBytes, + // Do not subtract freeDiskSpaceLimitBytes from n before calculating the maxOutBytes, // since this will result in sub-optimal merges - e.g. many small parts will be left unmerged. - // Divide free space by the max number concurrent merges. + // Divide free space by the max number of concurrent merges. maxOutBytes := n / uint64(workersCount) if maxOutBytes > maxBigPartSize { maxOutBytes = maxBigPartSize