From 78e9cda4b163067d0c0b9b001854c5f5b77a8874 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Mon, 2 Oct 2023 08:20:34 +0200
Subject: [PATCH] lib/logstorage: assist merging in-memory parts at data
 ingestion path if their number starts exceeding maxInmemoryPartsPerPartition

This is a follow-up for 9310e9f5847fa4d0fa49d1f081879014a33a4dc9 , which removed data ingestion pacing.
This can result in uncontrolled growth of in-memory parts under high data ingestion rate,
which, in turn, can result in unbounded RAM usage, OOM crashes and slow query performance.

While at it, consistently reset isInMerge field for parts passed to mergeParts() before returning from this function.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4775
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4828
---
 lib/logstorage/datadb.go | 63 +++++++++++++++++++++++++++++++---------
 1 file changed, 49 insertions(+), 14 deletions(-)

diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go
index d2fd8d5c57..81d3ff322e 100644
--- a/lib/logstorage/datadb.go
+++ b/lib/logstorage/datadb.go
@@ -228,7 +228,7 @@ func (ddb *datadb) flushInmemoryParts() {
 		}
 		err := ddb.mergePartsFinal(partsToFlush)
 		if err != nil {
-			logger.Errorf("cannot flush inmemory parts to disk: %s", err)
+			logger.Panicf("FATAL: cannot flush inmemory parts to disk: %s", err)
 		}
 
 		select {
@@ -256,7 +256,7 @@ func (ddb *datadb) startMergeWorkerLocked() {
 		err := ddb.mergeExistingParts()
 		<-globalMergeLimitCh
 		if err != nil && !errors.Is(err, errReadOnly) {
-			logger.Errorf("cannot merge parts: %s", err)
+			logger.Panicf("FATAL: background merge failed: %s", err)
 		}
 		ddb.wg.Done()
 	}()
@@ -307,7 +307,6 @@ func (ddb *datadb) mergeExistingParts() error {
 		}
 		err := ddb.mergeParts(pws, false)
 		ddb.releaseDiskSpace(partsSize)
-		ddb.releasePartsToMerge(pws)
 		if err != nil {
 			return err
 		}
@@ -356,15 +355,16 @@ var errReadOnly = errors.New("the storage is in read-only mode")
 //
 // All the parts inside pws must have isInMerge field set to true.
 func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
-	if ddb.IsReadOnly() {
-		return errReadOnly
-	}
-
 	if len(pws) == 0 {
 		// Nothing to merge.
 		return nil
 	}
+
+	if ddb.IsReadOnly() {
+		return errReadOnly
+	}
 	assertIsInMerge(pws)
+	defer ddb.releasePartsToMerge(pws)
 
 	startTime := time.Now()
 
@@ -436,7 +436,6 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
 		fs.MustSyncPath(dstPartPath)
 	}
 	if needStop(stopCh) {
-		ddb.releasePartsToMerge(pws)
 		// Remove incomplete destination part
 		if dstPartType == partFile {
 			fs.MustRemoveAll(dstPartPath)
@@ -548,7 +547,43 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
 	if len(ddb.inmemoryParts) > defaultPartsToMerge {
 		ddb.startMergeWorkerLocked()
 	}
+	needAssistedMerge := ddb.needAssistedMergeForInmemoryPartsLocked()
 	ddb.partsLock.Unlock()
+
+	if needAssistedMerge {
+		ddb.assistedMergeForInmemoryParts()
+	}
+}
+
+func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool {
+	if ddb.IsReadOnly() {
+		return false
+	}
+	if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition {
+		return false
+	}
+	n := 0
+	for _, pw := range ddb.inmemoryParts {
+		if pw.isInMerge {
+			n++
+		}
+	}
+	return n >= defaultPartsToMerge
+}
+
+func (ddb *datadb) assistedMergeForInmemoryParts() {
+	ddb.partsLock.Lock()
+	parts := make([]*partWrapper, 0, len(ddb.inmemoryParts))
+	parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts)
+	pws := appendPartsToMerge(nil, parts, (1<<64)-1)
+	setInMergeLocked(pws)
+	ddb.partsLock.Unlock()
+
+	err := ddb.mergeParts(pws, false)
+	if err == nil || errors.Is(err, errReadOnly) {
+		return
+	}
+	logger.Panicf("FATAL: cannot perform assisted merge for in-memory parts: %s", err)
 }
 
 // DatadbStats contains various stats for datadb.
@@ -646,18 +681,18 @@ func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error {
 		if len(pwsChunk) == 0 {
 			pwsChunk = append(pwsChunk[:0], pws...)
 		}
-		err := ddb.mergeParts(pwsChunk, true)
-		if err != nil {
-			ddb.releasePartsToMerge(pwsChunk)
-			return err
-		}
-
 		partsToRemove := partsToMap(pwsChunk)
 		removedParts := 0
 		pws, removedParts = removeParts(pws, partsToRemove)
 		if removedParts != len(pwsChunk) {
 			logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk))
 		}
+
+		err := ddb.mergeParts(pwsChunk, true)
+		if err != nil {
+			ddb.releasePartsToMerge(pws)
+			return err
+		}
 	}
 	return nil
 }