From f55d114785b9a5a07dc49af48346a0dddb68c615 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Mon, 2 Oct 2023 08:04:59 +0200
Subject: [PATCH] lib/{mergeset,storage}: consistently reset isInMerge field in
 parts passed to mergeParts() before returning from the function

While at it consistently check that the isInMerge field is set in all the parts passed to mergeParts()
---
 lib/mergeset/table.go    | 13 ++++++++++++-
 lib/storage/partition.go | 13 ++++++++++++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go
index e6f840571d..a5420df3c3 100644
--- a/lib/mergeset/table.go
+++ b/lib/mergeset/table.go
@@ -1018,6 +1018,14 @@ func SetFinalMergeDelay(delay time.Duration) {
 
 var errNothingToMerge = fmt.Errorf("nothing to merge")
 
+func assertIsInMerge(pws []*partWrapper) {
+	for _, pw := range pws {
+		if !pw.isInMerge {
+			logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to false")
+		}
+	}
+}
+
 func (tb *Table) releasePartsToMerge(pws []*partWrapper) {
 	tb.partsLock.Lock()
 	for _, pw := range pws {
@@ -1036,12 +1044,16 @@ func (tb *Table) releasePartsToMerge(pws []*partWrapper) {
 // If isFinal is set, then the resulting part will be stored to disk.
 //
 // All the parts inside pws must have isInMerge field set to true.
+// The isInMerge field inside pws parts is set to false before returning from the function.
 func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
 	if len(pws) == 0 {
 		// Nothing to merge.
 		return errNothingToMerge
 	}
 
+	assertIsInMerge(pws)
+	defer tb.releasePartsToMerge(pws)
+
 	startTime := time.Now()
 
 	// Initialize destination paths.
@@ -1091,7 +1103,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
 		putBlockStreamReader(bsr)
 	}
 	if err != nil {
-		tb.releasePartsToMerge(pws)
 		return err
 	}
 	if mpNew != nil {
diff --git a/lib/storage/partition.go b/lib/storage/partition.go
index d0c0a29ef6..ca91d71e3d 100644
--- a/lib/storage/partition.go
+++ b/lib/storage/partition.go
@@ -1163,6 +1163,14 @@ func (pt *partition) mergeExistingParts(isFinal bool) error {
 	return pt.mergeParts(pws, pt.stopCh, isFinal)
 }
 
+func assertIsInMerge(pws []*partWrapper) {
+	for _, pw := range pws {
+		if !pw.isInMerge {
+			logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to false")
+		}
+	}
+}
+
 func (pt *partition) releasePartsToMerge(pws []*partWrapper) {
 	pt.partsLock.Lock()
 	for _, pw := range pws {
@@ -1222,12 +1230,16 @@ func getMinDedupInterval(pws []*partWrapper) int64 {
 // if isFinal is set, then the resulting part will be saved to disk.
 //
 // All the parts inside pws must have isInMerge field set to true.
+// The isInMerge field inside pws parts is set to false before returning from the function.
 func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
 	if len(pws) == 0 {
 		// Nothing to merge.
 		return errNothingToMerge
 	}
 
+	assertIsInMerge(pws)
+	defer pt.releasePartsToMerge(pws)
+
 	startTime := time.Now()
 
 	// Initialize destination paths.
@@ -1278,7 +1290,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
 		putBlockStreamReader(bsr)
 	}
 	if err != nil {
-		pt.releasePartsToMerge(pws)
 		return err
 	}
 	if mpNew != nil {