lib/logstorage: assist merging in-memory parts at data ingestion path if their number starts exceeding maxInmemoryPartsPerPartition

This is a follow-up for 9310e9f584 , which removed data ingestion pacing.
This can result in uncontrolled growth of in-memory parts under high data ingestion rate,
which, in turn, can result in unbounded RAM usage, OOM crashes and slow query performance.

While at it, consistently reset isInMerge field for parts passed to mergeParts() before returning from this function.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4775
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4828
This commit is contained in:
Aliaksandr Valialkin 2023-10-02 08:20:34 +02:00
parent f55d114785
commit 78e9cda4b1
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -228,7 +228,7 @@ func (ddb *datadb) flushInmemoryParts() {
}
err := ddb.mergePartsFinal(partsToFlush)
if err != nil {
logger.Errorf("cannot flush inmemory parts to disk: %s", err)
logger.Panicf("FATAL: cannot flush inmemory parts to disk: %s", err)
}
select {
@ -256,7 +256,7 @@ func (ddb *datadb) startMergeWorkerLocked() {
err := ddb.mergeExistingParts()
<-globalMergeLimitCh
if err != nil && !errors.Is(err, errReadOnly) {
logger.Errorf("cannot merge parts: %s", err)
logger.Panicf("FATAL: background merge failed: %s", err)
}
ddb.wg.Done()
}()
@ -307,7 +307,6 @@ func (ddb *datadb) mergeExistingParts() error {
}
err := ddb.mergeParts(pws, false)
ddb.releaseDiskSpace(partsSize)
ddb.releasePartsToMerge(pws)
if err != nil {
return err
}
@ -356,15 +355,16 @@ var errReadOnly = errors.New("the storage is in read-only mode")
//
// All the parts inside pws must have isInMerge field set to true.
func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
if ddb.IsReadOnly() {
return errReadOnly
}
if len(pws) == 0 {
// Nothing to merge.
return nil
}
if ddb.IsReadOnly() {
return errReadOnly
}
assertIsInMerge(pws)
defer ddb.releasePartsToMerge(pws)
startTime := time.Now()
@ -436,7 +436,6 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
fs.MustSyncPath(dstPartPath)
}
if needStop(stopCh) {
ddb.releasePartsToMerge(pws)
// Remove incomplete destination part
if dstPartType == partFile {
fs.MustRemoveAll(dstPartPath)
@ -548,7 +547,43 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
if len(ddb.inmemoryParts) > defaultPartsToMerge {
ddb.startMergeWorkerLocked()
}
needAssistedMerge := ddb.needAssistedMergeForInmemoryPartsLocked()
ddb.partsLock.Unlock()
if needAssistedMerge {
ddb.assistedMergeForInmemoryParts()
}
}
func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool {
if ddb.IsReadOnly() {
return false
}
if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition {
return false
}
n := 0
for _, pw := range ddb.inmemoryParts {
if pw.isInMerge {
n++
}
}
return n >= defaultPartsToMerge
}
func (ddb *datadb) assistedMergeForInmemoryParts() {
ddb.partsLock.Lock()
parts := make([]*partWrapper, 0, len(ddb.inmemoryParts))
parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts)
pws := appendPartsToMerge(nil, parts, (1<<64)-1)
setInMergeLocked(pws)
ddb.partsLock.Unlock()
err := ddb.mergeParts(pws, false)
if err == nil || errors.Is(err, errReadOnly) {
return
}
logger.Panicf("FATAL: cannot perform assisted merge for in-memory parts: %s", err)
}
// DatadbStats contains various stats for datadb.
@ -646,18 +681,18 @@ func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error {
if len(pwsChunk) == 0 {
pwsChunk = append(pwsChunk[:0], pws...)
}
err := ddb.mergeParts(pwsChunk, true)
if err != nil {
ddb.releasePartsToMerge(pwsChunk)
return err
}
partsToRemove := partsToMap(pwsChunk)
removedParts := 0
pws, removedParts = removeParts(pws, partsToRemove)
if removedParts != len(pwsChunk) {
logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk))
}
err := ddb.mergeParts(pwsChunk, true)
if err != nil {
ddb.releasePartsToMerge(pws)
return err
}
}
return nil
}