lib/{mergeset,storage}: do not block small merges by pending big merges - assist with small merges instead

Blocked small merges may result into big number of small parts, which, in turn,
may result in increased CPU and memory usage during queries, since queries need to inspect
all the existing small parts.

The issue has been introduced in 8189770c50

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337
This commit is contained in:
Aliaksandr Valialkin 2022-12-12 16:49:21 -08:00
parent f3e5c9c246
commit 2a190f6451
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 104 additions and 14 deletions

View file

@ -467,8 +467,15 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_assisted_merges_total{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryAssistedMerges)
})
metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 {
return float64(tm().SmallAssistedMerges)
})
metrics.NewGauge(`vm_assisted_merges_total{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().AssistedInmemoryMerges)
return float64(idbm().InmemoryAssistedMerges)
})
metrics.NewGauge(`vm_assisted_merges_total{type="indexdb/file"}`, func() float64 {
return float64(idbm().FileAssistedMerges)
})
metrics.NewGauge(`vm_indexdb_items_added_total`, func() float64 {

View file

@ -14,11 +14,12 @@ The following tip changes can be tested by building VictoriaMetrics components f
* [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build)
## tip
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246).
* BUGFIX: [DataDog protocol parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): do not re-use `host` and `device` fields from the previously parsed messages if these fields are missing in the currently parsed message. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432).
* BUGFIX: reduce CPU usage when the regex-based relabeling rules are applied to more than 100K unique Graphite metrics. See [this issue](https://docs.victoriametrics.com/CHANGELOG.html#v1820). The issue was introduced in [v1.82.0](https://docs.victoriametrics.com/CHANGELOG.html#v1820).
* BUGFIX: do not block [merges](https://docs.victoriametrics.com/#storage) of small parts by merges of big parts on hosts with small number of CPU cores. This issue could result in the increasing number of `storage/small` parts while big merge is in progress. This, in turn, could result in increased CPU usage and memory usage during querying, since queries need to inspect bigger number of small parts. The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850).
## [v1.85.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.0)

View file

@ -25,8 +25,17 @@ import (
// maxInmemoryParts is the maximum number of inmemory parts in the table.
//
// This number may be reached when the insertion pace outreaches merger pace.
// If this number is reached, then assisted merges are performed
// during data ingestion.
const maxInmemoryParts = 64
// maxFileParts is the maximum number of file parts in the table.
//
// This number may be reached when the insertion pace outreaches merger pace.
// If this number is reached, then assisted merges are performed
// during data ingestion.
const maxFileParts = 256
// Default number of parts to merge at once.
//
// This number has been obtained empirically - it gives the lowest possible overhead.
@ -98,7 +107,8 @@ type Table struct {
inmemoryItemsMerged uint64
fileItemsMerged uint64
assistedInmemoryMerges uint64
inmemoryAssistedMerges uint64
fileAssistedMerges uint64
itemsAdded uint64
itemsAddedSizeBytes uint64
@ -419,7 +429,8 @@ type TableMetrics struct {
InmemoryItemsMerged uint64
FileItemsMerged uint64
AssistedInmemoryMerges uint64
InmemoryAssistedMerges uint64
FileAssistedMerges uint64
ItemsAdded uint64
ItemsAddedSizeBytes uint64
@ -469,7 +480,8 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.InmemoryItemsMerged += atomic.LoadUint64(&tb.inmemoryItemsMerged)
m.FileItemsMerged += atomic.LoadUint64(&tb.fileItemsMerged)
m.AssistedInmemoryMerges += atomic.LoadUint64(&tb.assistedInmemoryMerges)
m.InmemoryAssistedMerges += atomic.LoadUint64(&tb.inmemoryAssistedMerges)
m.FileAssistedMerges += atomic.LoadUint64(&tb.fileAssistedMerges)
m.ItemsAdded += atomic.LoadUint64(&tb.itemsAdded)
m.ItemsAddedSizeBytes += atomic.LoadUint64(&tb.itemsAddedSizeBytes)
@ -739,9 +751,8 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
flushConcurrencyCh <- struct{}{}
tb.assistedMergeForInmemoryParts()
tb.assistedMergeForFileParts()
<-flushConcurrencyCh
// There is no need in assited merge for file parts,
// since the bottleneck is possible only at inmemory parts.
if tb.flushCallback != nil {
if isFinal {
@ -765,10 +776,10 @@ func (tb *Table) assistedMergeForInmemoryParts() {
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&tb.inmemoryAssistedMerges, 1)
err := tb.mergeInmemoryParts()
storagepacelimiter.Search.Dec()
if err == nil {
atomic.AddUint64(&tb.assistedInmemoryMerges, 1)
continue
}
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
@ -778,6 +789,30 @@ func (tb *Table) assistedMergeForInmemoryParts() {
}
}
func (tb *Table) assistedMergeForFileParts() {
for {
tb.partsLock.Lock()
ok := getNotInMergePartsCount(tb.fileParts) < maxFileParts
tb.partsLock.Unlock()
if ok {
return
}
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&tb.fileAssistedMerges, 1)
err := tb.mergeExistingParts(false)
storagepacelimiter.Search.Dec()
if err == nil {
continue
}
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) {
return
}
logger.Panicf("FATAL: cannot assist with merging file parts: %s", err)
}
}
func getNotInMergePartsCount(pws []*partWrapper) int {
n := 0
for _, pw := range pws {
@ -866,7 +901,10 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T
}
func (tb *Table) startMergeWorkers() {
for i := 0; i < cap(mergeWorkersLimitCh); i++ {
// Start a merge worker per available CPU core.
// The actual number of concurrent merges is limited inside mergeWorker() below.
workersCount := cgroup.AvailableCPUs()
for i := 0; i < workersCount; i++ {
tb.wg.Add(1)
go func() {
tb.mergeWorker()
@ -940,6 +978,8 @@ func (tb *Table) mergeWorker() {
isFinal := false
t := time.NewTimer(sleepTime)
for {
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
// across tables may exceed the the cap(mergeWorkersLimitCh).
mergeWorkersLimitCh <- struct{}{}
err := tb.mergeExistingParts(isFinal)
<-mergeWorkersLimitCh

View file

@ -31,8 +31,15 @@ import (
const maxBigPartSize = 1e12
// The maximum number of inmemory parts in the partition.
//
// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion.
const maxInmemoryPartsPerPartition = 32
// The maximum number of small parts in the partition.
//
// If the number of small parts reaches this value, then assisted merge runs during data ingestion.
const maxSmallPartsPerPartition = 128
// Default number of parts to merge at once.
//
// This number has been obtained empirically - it gives the lowest possible overhead.
@ -112,6 +119,7 @@ type partition struct {
bigRowsDeleted uint64
inmemoryAssistedMerges uint64
smallAssistedMerges uint64
mergeNeedFreeDiskSpace uint64
@ -338,6 +346,7 @@ type partitionMetrics struct {
BigPartsRefCount uint64
InmemoryAssistedMerges uint64
SmallAssistedMerges uint64
MergeNeedFreeDiskSpace uint64
}
@ -404,6 +413,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.BigRowsDeleted += atomic.LoadUint64(&pt.bigRowsDeleted)
m.InmemoryAssistedMerges += atomic.LoadUint64(&pt.inmemoryAssistedMerges)
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
m.MergeNeedFreeDiskSpace += atomic.LoadUint64(&pt.mergeNeedFreeDiskSpace)
}
@ -576,6 +586,7 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
flushConcurrencyCh <- struct{}{}
pt.assistedMergeForInmemoryParts()
pt.assistedMergeForSmallParts()
<-flushConcurrencyCh
// There is no need in assisted merges for small and big parts,
// since the bottleneck is possible only at inmemory parts.
@ -597,10 +608,10 @@ func (pt *partition) assistedMergeForInmemoryParts() {
// Assist with mering inmemory parts.
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&pt.inmemoryAssistedMerges, 1)
err := pt.mergeInmemoryParts()
storagepacelimiter.Search.Dec()
if err == nil {
atomic.AddUint64(&pt.inmemoryAssistedMerges, 1)
continue
}
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
@ -610,6 +621,33 @@ func (pt *partition) assistedMergeForInmemoryParts() {
}
}
func (pt *partition) assistedMergeForSmallParts() {
for {
pt.partsLock.Lock()
ok := getNotInMergePartsCount(pt.smallParts) < maxSmallPartsPerPartition
pt.partsLock.Unlock()
if ok {
return
}
// There are too many unmerged small parts.
// This usually means that the app cannot keep up with the data ingestion rate.
// Assist with mering small parts.
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&pt.smallAssistedMerges, 1)
err := pt.mergeExistingParts(false)
storagepacelimiter.Search.Dec()
if err == nil {
continue
}
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) {
return
}
logger.Panicf("FATAL: cannot merge small parts: %s", err)
}
}
func getNotInMergePartsCount(pws []*partWrapper) int {
n := 0
for _, pw := range pws {
@ -981,7 +1019,10 @@ func SetMergeWorkersCount(n int) {
}
func (pt *partition) startMergeWorkers() {
for i := 0; i < cap(mergeWorkersLimitCh); i++ {
// Start a merge worker per available CPU core.
// The actual number of concurrent merges is limited inside mergeWorker() below.
workersCount := cgroup.AvailableCPUs()
for i := 0; i < workersCount; i++ {
pt.wg.Add(1)
go func() {
pt.mergeWorker()
@ -1001,7 +1042,8 @@ func (pt *partition) mergeWorker() {
isFinal := false
t := time.NewTimer(sleepTime)
for {
// Limit the number of concurrent calls to mergeExistingParts, cine the number of merge
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
// across partitions may exceed the the cap(mergeWorkersLimitCh).
mergeWorkersLimitCh <- struct{}{}
err := pt.mergeExistingParts(isFinal)
<-mergeWorkersLimitCh
@ -1092,10 +1134,10 @@ func (pt *partition) getMaxBigPartSize() uint64 {
func getMaxOutBytes(path string, workersCount int) uint64 {
n := fs.MustGetFreeSpace(path)
// Do not substract freeDiskSpaceLimitBytes from n before calculating the maxOutBytes,
// Do not subtract freeDiskSpaceLimitBytes from n before calculating the maxOutBytes,
// since this will result in sub-optimal merges - e.g. many small parts will be left unmerged.
// Divide free space by the max number concurrent merges.
// Divide free space by the max number of concurrent merges.
maxOutBytes := n / uint64(workersCount)
if maxOutBytes > maxBigPartSize {
maxOutBytes = maxBigPartSize