From fa566c68a6ccf7385a05f649aee7e5f5a38afb15 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 24 Jan 2024 03:27:49 +0200 Subject: [PATCH] lib/mergeset: really limit the number of in-memory parts to 15 It has been appeared that the registration of new time series slows down linearly with the number of indexdb parts, since VictoriaMetrics needs to check every indexdb part when it searches for TSID by newly ingested metric name. The number of in-memory parts grows when new time series are registered at high rate. The number of in-memory parts grows faster on systems with big number of CPU cores, because the mergeset maintains per-CPU buffers with newly added entries for the indexdb, and every such entry is transformed eventually into a separate in-memory part. The solution has been suggested in https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 by @misutoth - to limit the number of in-memory parts with buffered channel. This solution is implemented in this commit. Additionally, this commit merges per-CPU parts into a single part before adding it to the list of in-memory parts. This reduces CPU load when searching for TSID by newly ingested metric name. The https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 recommends setting the limit on the number of in-memory parts to 100, but my internal testing shows that much lower limit 15 works with the same efficiency on a system with 16 CPU cores while reducing memory usage for `indexdb/dataBlocks` cache by up to 50%. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5190 --- docs/CHANGELOG.md | 1 + lib/mergeset/table.go | 131 ++++++++++++++++++++++++++---------------- 2 files changed, 84 insertions(+), 48 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2b36f335a..8c1b4dca3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,7 @@ The sandbox cluster installation is running under the constant load generated by * SECURITY: upgrade Go builder from Go1.21.5 to Go1.21.6. See [the list of issues addressed in Go1.21.6](https://github.com/golang/go/issues?q=milestone%3AGo1.21.6+label%3ACherryPickApproved). +* FEATURE: improve new [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) registration speed on systems with high number of CPU cores. Thanks to @misutoth for the initial idea and [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for discovering [Hetzner Cloud](https://www.hetzner.com/cloud) and [Hetzner Robot](https://docs.hetzner.com/robot) scrape targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3154) and [these docs](https://docs.victoriametrics.com/sd_configs.html#hetzner_sd_configs). * FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for negative index in `groupByNode` and `aliasByNode` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5581). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index ab82d123c..b0680ed9f 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -24,17 +24,20 @@ import ( // maxInmemoryParts is the maximum number of inmemory parts in the table. // +// This limit allows reducing CPU usage under high ingestion rate. +// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 +// // This number may be reached when the insertion pace outreaches merger pace. // If this number is reached, then assisted merges are performed // during data ingestion. -const maxInmemoryParts = 30 +const maxInmemoryParts = 15 // maxFileParts is the maximum number of file parts in the table. // // This number may be reached when the insertion pace outreaches merger pace. // If this number is reached, then assisted merges are performed // during data ingestion. -const maxFileParts = 64 +const maxFileParts = 30 // Default number of parts to merge at once. // @@ -135,6 +138,10 @@ type Table struct { // inmemoryParts contains inmemory parts. inmemoryParts []*partWrapper + // inmemoryPartsLimitCh limits the number of inmemory parts + // in order to prevent from data ingestion slowdown as described at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 + inmemoryPartsLimitCh chan struct{} + // fileParts contains file-backed parts. fileParts []*partWrapper @@ -255,14 +262,6 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { tb.flushBlocksToParts(ibsToFlush, false) - if len(ibsToFlush) > 0 { - // Run assisted merges if needed. - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - } - return tailItems } @@ -334,14 +333,15 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC pws := mustOpenParts(path) tb := &Table{ - path: path, - flushCallback: flushCallback, - prepareBlock: prepareBlock, - isReadOnly: isReadOnly, - fileParts: pws, - mergeIdx: uint64(time.Now().UnixNano()), - needMergeCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), + path: path, + flushCallback: flushCallback, + prepareBlock: prepareBlock, + isReadOnly: isReadOnly, + inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts), + fileParts: pws, + mergeIdx: uint64(time.Now().UnixNano()), + needMergeCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } tb.rawItems.init() tb.startBackgroundWorkers() @@ -728,13 +728,25 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { wg.Wait() putWaitGroup(wg) - tb.partsLock.Lock() - tb.inmemoryParts = append(tb.inmemoryParts, pws...) - for range pws { - if !tb.notifyBackgroundMergers() { - break - } + flushConcurrencyCh <- struct{}{} + pw := tb.mustMergeInmemoryParts(pws) + <-flushConcurrencyCh + + select { + case tb.inmemoryPartsLimitCh <- struct{}{}: + default: + // Too many in-memory parts. Try assist merging them before adding pw to tb.inmemoryParts. + flushConcurrencyCh <- struct{}{} + tb.assistedMergeForInmemoryParts() + tb.assistedMergeForFileParts() + <-flushConcurrencyCh + + tb.inmemoryPartsLimitCh <- struct{}{} } + + tb.partsLock.Lock() + tb.inmemoryParts = append(tb.inmemoryParts, pw) + tb.notifyBackgroundMergers() tb.partsLock.Unlock() if tb.flushCallback != nil { @@ -772,23 +784,23 @@ var flushConcurrencyLimit = func() int { var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) -func needAssistedMerge(pws []*partWrapper, maxParts int) bool { - if len(pws) < maxParts { - return false - } - return getNotInMergePartsCount(pws) >= defaultPartsToMerge -} - func (tb *Table) assistedMergeForInmemoryParts() { tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) + needMerge := getNotInMergePartsCount(tb.inmemoryParts) >= defaultPartsToMerge tb.partsLock.Unlock() if !needMerge { return } atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) - err := tb.mergeInmemoryParts() + + maxOutBytes := tb.getMaxFilePartSize() + + tb.partsLock.Lock() + pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, true) + tb.partsLock.Unlock() + + err := tb.mergeParts(pws, tb.stopCh, true) if err == nil { return } @@ -800,7 +812,7 @@ func (tb *Table) assistedMergeForInmemoryParts() { func (tb *Table) assistedMergeForFileParts() { tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.fileParts, maxFileParts) + needMerge := getNotInMergePartsCount(tb.fileParts) >= defaultPartsToMerge tb.partsLock.Unlock() if !needMerge { return @@ -841,12 +853,27 @@ func putWaitGroup(wg *sync.WaitGroup) { var wgPool sync.Pool -func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { - outItemsCount := uint64(0) - for _, ib := range ibs { - outItemsCount += uint64(ib.Len()) +func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) *partWrapper { + if len(pws) == 1 { + // Nothing to merge + return pws[0] } + bsrs := make([]*blockStreamReader, 0, len(pws)) + for _, pw := range pws { + if pw.mp == nil { + logger.Panicf("BUG: unexpected file part") + } + bsr := getBlockStreamReader() + bsr.MustInitFromInmemoryPart(pw.mp) + bsrs = append(bsrs, bsr) + } + + flushToDiskDeadline := getFlushToDiskDeadline(pws) + return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline) +} + +func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { // Prepare blockStreamReaders for source blocks. bsrs := make([]*blockStreamReader, 0, len(ibs)) for _, ib := range ibs { @@ -861,6 +888,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { if len(bsrs) == 0 { return nil } + flushToDiskDeadline := time.Now().Add(dataFlushInterval) if len(bsrs) == 1 { // Nothing to merge. Just return a single inmemory part. @@ -871,7 +899,15 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { return newPartWrapperFromInmemoryPart(mp, flushToDiskDeadline) } + return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline) +} + +func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDiskDeadline time.Time) *partWrapper { // Prepare blockStreamWriter for destination part. + outItemsCount := uint64(0) + for _, bsr := range bsrs { + outItemsCount += bsr.ph.itemsCount + } compressLevel := getCompressLevel(outItemsCount) bsw := getBlockStreamWriter() mpDst := &inmemoryPart{} @@ -891,6 +927,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { for _, bsr := range bsrs { putBlockStreamReader(bsr) } + return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline) } @@ -940,16 +977,6 @@ func (tb *Table) canBackgroundMerge() bool { var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") -func (tb *Table) mergeInmemoryParts() error { - maxOutBytes := tb.getMaxFilePartSize() - - tb.partsLock.Lock() - pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, false) - tb.partsLock.Unlock() - - return tb.mergeParts(pws, tb.stopCh, false) -} - func (tb *Table) mergeExistingParts(isFinal bool) error { if !tb.canBackgroundMerge() { // Do not perform background merge in read-only mode @@ -1278,6 +1305,14 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst tb.partsLock.Unlock() + // Update inmemoryPartsLimitCh accordingly to the number of the remaining in-memory parts. + for i := 0; i < removedInmemoryParts; i++ { + <-tb.inmemoryPartsLimitCh + } + if dstPartType == partInmemory { + tb.inmemoryPartsLimitCh <- struct{}{} + } + removedParts := removedInmemoryParts + removedFileParts if removedParts != len(m) { logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m))