mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{mergeset,storage}: re-use the code for removing isInMerge flag at parts
Move the common code into releasePartsToMerge() method and consistently use it throughout the code.
This commit is contained in:
parent
50e6a137bb
commit
d8d4d21d7a
2 changed files with 20 additions and 36 deletions
|
@ -586,21 +586,13 @@ func (tb *Table) convertToV1280() {
|
|||
}
|
||||
|
||||
func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
|
||||
defer func() {
|
||||
// Remove isInMerge flag from pws.
|
||||
tb.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
// Do not check for pws.isInMerge set to false,
|
||||
// since it may be set to false in mergeParts below.
|
||||
pw.isInMerge = false
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
}()
|
||||
for len(pws) > defaultPartsToMerge {
|
||||
if err := tb.mergeParts(pws[:defaultPartsToMerge], stopCh, false); err != nil {
|
||||
pwsChunk := pws[:defaultPartsToMerge]
|
||||
pws = pws[defaultPartsToMerge:]
|
||||
if err := tb.mergeParts(pwsChunk, stopCh, false); err != nil {
|
||||
tb.releasePartsToMerge(pws)
|
||||
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
|
||||
}
|
||||
pws = pws[defaultPartsToMerge:]
|
||||
}
|
||||
if len(pws) == 0 {
|
||||
return nil
|
||||
|
@ -874,6 +866,17 @@ func (tb *Table) partMerger() error {
|
|||
|
||||
var errNothingToMerge = fmt.Errorf("nothing to merge")
|
||||
|
||||
func (tb *Table) releasePartsToMerge(pws []*partWrapper) {
|
||||
tb.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
if !pw.isInMerge {
|
||||
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
|
||||
}
|
||||
pw.isInMerge = false
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
}
|
||||
|
||||
// mergeParts merges pws.
|
||||
//
|
||||
// Merging is immediately stopped if stopCh is closed.
|
||||
|
@ -884,6 +887,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||
// Nothing to merge.
|
||||
return errNothingToMerge
|
||||
}
|
||||
defer tb.releasePartsToMerge(pws)
|
||||
|
||||
atomic.AddUint64(&tb.mergesCount, 1)
|
||||
atomic.AddUint64(&tb.activeMerges, 1)
|
||||
|
@ -891,18 +895,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||
|
||||
startTime := time.Now()
|
||||
|
||||
defer func() {
|
||||
// Remove isInMerge flag from pws.
|
||||
tb.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
if !pw.isInMerge {
|
||||
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
|
||||
}
|
||||
pw.isInMerge = false
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
}()
|
||||
|
||||
// Prepare blockStreamReaders for source parts.
|
||||
bsrs := make([]*blockStreamReader, 0, len(pws))
|
||||
defer func() {
|
||||
|
|
|
@ -800,21 +800,13 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p
|
|||
}
|
||||
|
||||
func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
|
||||
defer func() {
|
||||
// Remove isInMerge flag from pws.
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
// Do not check for pws.isInMerge set to false,
|
||||
// since it may be set to false in mergeParts below.
|
||||
pw.isInMerge = false
|
||||
}
|
||||
pt.partsLock.Unlock()
|
||||
}()
|
||||
for len(pws) > defaultPartsToMerge {
|
||||
if err := pt.mergeParts(pws[:defaultPartsToMerge], stopCh); err != nil {
|
||||
pwsChunk := pws[:defaultPartsToMerge]
|
||||
pws = pws[defaultPartsToMerge:]
|
||||
if err := pt.mergeParts(pwsChunk, stopCh); err != nil {
|
||||
pt.releasePartsToMerge(pws)
|
||||
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
|
||||
}
|
||||
pws = pws[defaultPartsToMerge:]
|
||||
}
|
||||
if len(pws) == 0 {
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue