mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/{mergeset,storage}: do not block small merges by pending big merges - assist with small merges instead
Blocked small merges may result into big number of small parts, which, in turn,
may result in increased CPU and memory usage during queries, since queries need to inspect
all the existing small parts.
The issue has been introduced in 8189770c50
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337
This commit is contained in:
parent
3b18931050
commit
d1af6046c7
4 changed files with 104 additions and 14 deletions
|
@ -558,8 +558,15 @@ func registerStorageMetrics(strg *storage.Storage) {
|
|||
metrics.NewGauge(`vm_assisted_merges_total{type="storage/inmemory"}`, func() float64 {
|
||||
return float64(tm().InmemoryAssistedMerges)
|
||||
})
|
||||
metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 {
|
||||
return float64(tm().SmallAssistedMerges)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_assisted_merges_total{type="indexdb/inmemory"}`, func() float64 {
|
||||
return float64(idbm().AssistedInmemoryMerges)
|
||||
return float64(idbm().InmemoryAssistedMerges)
|
||||
})
|
||||
metrics.NewGauge(`vm_assisted_merges_total{type="indexdb/file"}`, func() float64 {
|
||||
return float64(idbm().FileAssistedMerges)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_indexdb_items_added_total`, func() float64 {
|
||||
|
|
|
@ -14,11 +14,12 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build)
|
||||
|
||||
## tip
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246).
|
||||
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246).
|
||||
|
||||
* BUGFIX: [DataDog protocol parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): do not re-use `host` and `device` fields from the previously parsed messages if these fields are missing in the currently parsed message. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432).
|
||||
* BUGFIX: reduce CPU usage when the regex-based relabeling rules are applied to more than 100K unique Graphite metrics. See [this issue](https://docs.victoriametrics.com/CHANGELOG.html#v1820). The issue was introduced in [v1.82.0](https://docs.victoriametrics.com/CHANGELOG.html#v1820).
|
||||
* BUGFIX: do not block [merges](https://docs.victoriametrics.com/#storage) of small parts by merges of big parts on hosts with small number of CPU cores. This issue could result in the increasing number of `storage/small` parts while big merge is in progress. This, in turn, could result in increased CPU usage and memory usage during querying, since queries need to inspect bigger number of small parts. The issue has been introduced in [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850).
|
||||
|
||||
|
||||
## [v1.85.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.0)
|
||||
|
|
|
@ -25,8 +25,17 @@ import (
|
|||
// maxInmemoryParts is the maximum number of inmemory parts in the table.
|
||||
//
|
||||
// This number may be reached when the insertion pace outreaches merger pace.
|
||||
// If this number is reached, then assisted merges are performed
|
||||
// during data ingestion.
|
||||
const maxInmemoryParts = 64
|
||||
|
||||
// maxFileParts is the maximum number of file parts in the table.
|
||||
//
|
||||
// This number may be reached when the insertion pace outreaches merger pace.
|
||||
// If this number is reached, then assisted merges are performed
|
||||
// during data ingestion.
|
||||
const maxFileParts = 256
|
||||
|
||||
// Default number of parts to merge at once.
|
||||
//
|
||||
// This number has been obtained empirically - it gives the lowest possible overhead.
|
||||
|
@ -98,7 +107,8 @@ type Table struct {
|
|||
inmemoryItemsMerged uint64
|
||||
fileItemsMerged uint64
|
||||
|
||||
assistedInmemoryMerges uint64
|
||||
inmemoryAssistedMerges uint64
|
||||
fileAssistedMerges uint64
|
||||
|
||||
itemsAdded uint64
|
||||
itemsAddedSizeBytes uint64
|
||||
|
@ -419,7 +429,8 @@ type TableMetrics struct {
|
|||
InmemoryItemsMerged uint64
|
||||
FileItemsMerged uint64
|
||||
|
||||
AssistedInmemoryMerges uint64
|
||||
InmemoryAssistedMerges uint64
|
||||
FileAssistedMerges uint64
|
||||
|
||||
ItemsAdded uint64
|
||||
ItemsAddedSizeBytes uint64
|
||||
|
@ -469,7 +480,8 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
|
|||
m.InmemoryItemsMerged += atomic.LoadUint64(&tb.inmemoryItemsMerged)
|
||||
m.FileItemsMerged += atomic.LoadUint64(&tb.fileItemsMerged)
|
||||
|
||||
m.AssistedInmemoryMerges += atomic.LoadUint64(&tb.assistedInmemoryMerges)
|
||||
m.InmemoryAssistedMerges += atomic.LoadUint64(&tb.inmemoryAssistedMerges)
|
||||
m.FileAssistedMerges += atomic.LoadUint64(&tb.fileAssistedMerges)
|
||||
|
||||
m.ItemsAdded += atomic.LoadUint64(&tb.itemsAdded)
|
||||
m.ItemsAddedSizeBytes += atomic.LoadUint64(&tb.itemsAddedSizeBytes)
|
||||
|
@ -739,9 +751,8 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
|
|||
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
tb.assistedMergeForInmemoryParts()
|
||||
tb.assistedMergeForFileParts()
|
||||
<-flushConcurrencyCh
|
||||
// There is no need in assited merge for file parts,
|
||||
// since the bottleneck is possible only at inmemory parts.
|
||||
|
||||
if tb.flushCallback != nil {
|
||||
if isFinal {
|
||||
|
@ -765,10 +776,10 @@ func (tb *Table) assistedMergeForInmemoryParts() {
|
|||
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&tb.inmemoryAssistedMerges, 1)
|
||||
err := tb.mergeInmemoryParts()
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
atomic.AddUint64(&tb.assistedInmemoryMerges, 1)
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
|
||||
|
@ -778,6 +789,30 @@ func (tb *Table) assistedMergeForInmemoryParts() {
|
|||
}
|
||||
}
|
||||
|
||||
func (tb *Table) assistedMergeForFileParts() {
|
||||
for {
|
||||
tb.partsLock.Lock()
|
||||
ok := getNotInMergePartsCount(tb.fileParts) < maxFileParts
|
||||
tb.partsLock.Unlock()
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&tb.fileAssistedMerges, 1)
|
||||
err := tb.mergeExistingParts(false)
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) {
|
||||
return
|
||||
}
|
||||
logger.Panicf("FATAL: cannot assist with merging file parts: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func getNotInMergePartsCount(pws []*partWrapper) int {
|
||||
n := 0
|
||||
for _, pw := range pws {
|
||||
|
@ -866,7 +901,10 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T
|
|||
}
|
||||
|
||||
func (tb *Table) startMergeWorkers() {
|
||||
for i := 0; i < cap(mergeWorkersLimitCh); i++ {
|
||||
// Start a merge worker per available CPU core.
|
||||
// The actual number of concurrent merges is limited inside mergeWorker() below.
|
||||
workersCount := cgroup.AvailableCPUs()
|
||||
for i := 0; i < workersCount; i++ {
|
||||
tb.wg.Add(1)
|
||||
go func() {
|
||||
tb.mergeWorker()
|
||||
|
@ -940,6 +978,8 @@ func (tb *Table) mergeWorker() {
|
|||
isFinal := false
|
||||
t := time.NewTimer(sleepTime)
|
||||
for {
|
||||
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
|
||||
// across tables may exceed the the cap(mergeWorkersLimitCh).
|
||||
mergeWorkersLimitCh <- struct{}{}
|
||||
err := tb.mergeExistingParts(isFinal)
|
||||
<-mergeWorkersLimitCh
|
||||
|
|
|
@ -31,8 +31,15 @@ import (
|
|||
const maxBigPartSize = 1e12
|
||||
|
||||
// The maximum number of inmemory parts in the partition.
|
||||
//
|
||||
// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion.
|
||||
const maxInmemoryPartsPerPartition = 32
|
||||
|
||||
// The maximum number of small parts in the partition.
|
||||
//
|
||||
// If the number of small parts reaches this value, then assisted merge runs during data ingestion.
|
||||
const maxSmallPartsPerPartition = 128
|
||||
|
||||
// Default number of parts to merge at once.
|
||||
//
|
||||
// This number has been obtained empirically - it gives the lowest possible overhead.
|
||||
|
@ -112,6 +119,7 @@ type partition struct {
|
|||
bigRowsDeleted uint64
|
||||
|
||||
inmemoryAssistedMerges uint64
|
||||
smallAssistedMerges uint64
|
||||
|
||||
mergeNeedFreeDiskSpace uint64
|
||||
|
||||
|
@ -338,6 +346,7 @@ type partitionMetrics struct {
|
|||
BigPartsRefCount uint64
|
||||
|
||||
InmemoryAssistedMerges uint64
|
||||
SmallAssistedMerges uint64
|
||||
|
||||
MergeNeedFreeDiskSpace uint64
|
||||
}
|
||||
|
@ -404,6 +413,7 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
|
|||
m.BigRowsDeleted += atomic.LoadUint64(&pt.bigRowsDeleted)
|
||||
|
||||
m.InmemoryAssistedMerges += atomic.LoadUint64(&pt.inmemoryAssistedMerges)
|
||||
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
|
||||
|
||||
m.MergeNeedFreeDiskSpace += atomic.LoadUint64(&pt.mergeNeedFreeDiskSpace)
|
||||
}
|
||||
|
@ -576,6 +586,7 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
|
|||
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
pt.assistedMergeForInmemoryParts()
|
||||
pt.assistedMergeForSmallParts()
|
||||
<-flushConcurrencyCh
|
||||
// There is no need in assisted merges for small and big parts,
|
||||
// since the bottleneck is possible only at inmemory parts.
|
||||
|
@ -597,10 +608,10 @@ func (pt *partition) assistedMergeForInmemoryParts() {
|
|||
// Assist with mering inmemory parts.
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&pt.inmemoryAssistedMerges, 1)
|
||||
err := pt.mergeInmemoryParts()
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
atomic.AddUint64(&pt.inmemoryAssistedMerges, 1)
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
|
||||
|
@ -610,6 +621,33 @@ func (pt *partition) assistedMergeForInmemoryParts() {
|
|||
}
|
||||
}
|
||||
|
||||
func (pt *partition) assistedMergeForSmallParts() {
|
||||
for {
|
||||
pt.partsLock.Lock()
|
||||
ok := getNotInMergePartsCount(pt.smallParts) < maxSmallPartsPerPartition
|
||||
pt.partsLock.Unlock()
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
|
||||
// There are too many unmerged small parts.
|
||||
// This usually means that the app cannot keep up with the data ingestion rate.
|
||||
// Assist with mering small parts.
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&pt.smallAssistedMerges, 1)
|
||||
err := pt.mergeExistingParts(false)
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) {
|
||||
return
|
||||
}
|
||||
logger.Panicf("FATAL: cannot merge small parts: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func getNotInMergePartsCount(pws []*partWrapper) int {
|
||||
n := 0
|
||||
for _, pw := range pws {
|
||||
|
@ -981,7 +1019,10 @@ func SetMergeWorkersCount(n int) {
|
|||
}
|
||||
|
||||
func (pt *partition) startMergeWorkers() {
|
||||
for i := 0; i < cap(mergeWorkersLimitCh); i++ {
|
||||
// Start a merge worker per available CPU core.
|
||||
// The actual number of concurrent merges is limited inside mergeWorker() below.
|
||||
workersCount := cgroup.AvailableCPUs()
|
||||
for i := 0; i < workersCount; i++ {
|
||||
pt.wg.Add(1)
|
||||
go func() {
|
||||
pt.mergeWorker()
|
||||
|
@ -1001,7 +1042,8 @@ func (pt *partition) mergeWorker() {
|
|||
isFinal := false
|
||||
t := time.NewTimer(sleepTime)
|
||||
for {
|
||||
// Limit the number of concurrent calls to mergeExistingParts, cine the number of merge
|
||||
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
|
||||
// across partitions may exceed the the cap(mergeWorkersLimitCh).
|
||||
mergeWorkersLimitCh <- struct{}{}
|
||||
err := pt.mergeExistingParts(isFinal)
|
||||
<-mergeWorkersLimitCh
|
||||
|
@ -1092,10 +1134,10 @@ func (pt *partition) getMaxBigPartSize() uint64 {
|
|||
|
||||
func getMaxOutBytes(path string, workersCount int) uint64 {
|
||||
n := fs.MustGetFreeSpace(path)
|
||||
// Do not substract freeDiskSpaceLimitBytes from n before calculating the maxOutBytes,
|
||||
// Do not subtract freeDiskSpaceLimitBytes from n before calculating the maxOutBytes,
|
||||
// since this will result in sub-optimal merges - e.g. many small parts will be left unmerged.
|
||||
|
||||
// Divide free space by the max number concurrent merges.
|
||||
// Divide free space by the max number of concurrent merges.
|
||||
maxOutBytes := n / uint64(workersCount)
|
||||
if maxOutBytes > maxBigPartSize {
|
||||
maxOutBytes = maxBigPartSize
|
||||
|
|
Loading…
Reference in a new issue