From e9636b4c697024ffcf84c6692da8dabc8d980686 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 30 Nov 2022 19:53:02 -0800 Subject: [PATCH] lib/{mergeset,storage}: re-use the code for removing isInMerge flag at parts Move the common code into releasePartsToMerge() method and consistently use it throughout the code. --- lib/mergeset/table.go | 40 ++++++++++++++++------------------------ lib/storage/partition.go | 16 ++++------------ 2 files changed, 20 insertions(+), 36 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index d4d5f3620..aa4b595cc 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -586,21 +586,13 @@ func (tb *Table) convertToV1280() { } func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error { - defer func() { - // Remove isInMerge flag from pws. - tb.partsLock.Lock() - for _, pw := range pws { - // Do not check for pws.isInMerge set to false, - // since it may be set to false in mergeParts below. - pw.isInMerge = false - } - tb.partsLock.Unlock() - }() for len(pws) > defaultPartsToMerge { - if err := tb.mergeParts(pws[:defaultPartsToMerge], stopCh, false); err != nil { + pwsChunk := pws[:defaultPartsToMerge] + pws = pws[defaultPartsToMerge:] + if err := tb.mergeParts(pwsChunk, stopCh, false); err != nil { + tb.releasePartsToMerge(pws) return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err) } - pws = pws[defaultPartsToMerge:] } if len(pws) == 0 { return nil @@ -874,6 +866,17 @@ func (tb *Table) partMerger() error { var errNothingToMerge = fmt.Errorf("nothing to merge") +func (tb *Table) releasePartsToMerge(pws []*partWrapper) { + tb.partsLock.Lock() + for _, pw := range pws { + if !pw.isInMerge { + logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path) + } + pw.isInMerge = false + } + tb.partsLock.Unlock() +} + // mergeParts merges pws. // // Merging is immediately stopped if stopCh is closed. @@ -884,6 +887,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP // Nothing to merge. return errNothingToMerge } + defer tb.releasePartsToMerge(pws) atomic.AddUint64(&tb.mergesCount, 1) atomic.AddUint64(&tb.activeMerges, 1) @@ -891,18 +895,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP startTime := time.Now() - defer func() { - // Remove isInMerge flag from pws. - tb.partsLock.Lock() - for _, pw := range pws { - if !pw.isInMerge { - logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path) - } - pw.isInMerge = false - } - tb.partsLock.Unlock() - }() - // Prepare blockStreamReaders for source parts. bsrs := make([]*blockStreamReader, 0, len(pws)) defer func() { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 797c57bdb..6f91cba1c 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -800,21 +800,13 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p } func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error { - defer func() { - // Remove isInMerge flag from pws. - pt.partsLock.Lock() - for _, pw := range pws { - // Do not check for pws.isInMerge set to false, - // since it may be set to false in mergeParts below. - pw.isInMerge = false - } - pt.partsLock.Unlock() - }() for len(pws) > defaultPartsToMerge { - if err := pt.mergeParts(pws[:defaultPartsToMerge], stopCh); err != nil { + pwsChunk := pws[:defaultPartsToMerge] + pws = pws[defaultPartsToMerge:] + if err := pt.mergeParts(pwsChunk, stopCh); err != nil { + pt.releasePartsToMerge(pws) return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err) } - pws = pws[defaultPartsToMerge:] } if len(pws) == 0 { return nil