mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{mergeset,storage}: consistently reset isInMerge field in parts passed to mergeParts() before returning from the function
While at it consistently check that the isInMerge field is set in all the parts passed to mergeParts()
This commit is contained in:
parent
bf6ebc86fd
commit
d41841c0c9
2 changed files with 24 additions and 2 deletions
|
@ -1018,6 +1018,14 @@ func SetFinalMergeDelay(delay time.Duration) {
|
|||
|
||||
var errNothingToMerge = fmt.Errorf("nothing to merge")
|
||||
|
||||
func assertIsInMerge(pws []*partWrapper) {
|
||||
for _, pw := range pws {
|
||||
if !pw.isInMerge {
|
||||
logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to false")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tb *Table) releasePartsToMerge(pws []*partWrapper) {
|
||||
tb.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
|
@ -1036,12 +1044,16 @@ func (tb *Table) releasePartsToMerge(pws []*partWrapper) {
|
|||
// If isFinal is set, then the resulting part will be stored to disk.
|
||||
//
|
||||
// All the parts inside pws must have isInMerge field set to true.
|
||||
// The isInMerge field inside pws parts is set to false before returning from the function.
|
||||
func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
|
||||
if len(pws) == 0 {
|
||||
// Nothing to merge.
|
||||
return errNothingToMerge
|
||||
}
|
||||
|
||||
assertIsInMerge(pws)
|
||||
defer tb.releasePartsToMerge(pws)
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
// Initialize destination paths.
|
||||
|
@ -1091,7 +1103,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
putBlockStreamReader(bsr)
|
||||
}
|
||||
if err != nil {
|
||||
tb.releasePartsToMerge(pws)
|
||||
return err
|
||||
}
|
||||
if mpNew != nil {
|
||||
|
|
|
@ -1163,6 +1163,14 @@ func (pt *partition) mergeExistingParts(isFinal bool) error {
|
|||
return pt.mergeParts(pws, pt.stopCh, isFinal)
|
||||
}
|
||||
|
||||
func assertIsInMerge(pws []*partWrapper) {
|
||||
for _, pw := range pws {
|
||||
if !pw.isInMerge {
|
||||
logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to false")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pt *partition) releasePartsToMerge(pws []*partWrapper) {
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
|
@ -1222,12 +1230,16 @@ func getMinDedupInterval(pws []*partWrapper) int64 {
|
|||
// if isFinal is set, then the resulting part will be saved to disk.
|
||||
//
|
||||
// All the parts inside pws must have isInMerge field set to true.
|
||||
// The isInMerge field inside pws parts is set to false before returning from the function.
|
||||
func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
|
||||
if len(pws) == 0 {
|
||||
// Nothing to merge.
|
||||
return errNothingToMerge
|
||||
}
|
||||
|
||||
assertIsInMerge(pws)
|
||||
defer pt.releasePartsToMerge(pws)
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
// Initialize destination paths.
|
||||
|
@ -1278,7 +1290,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
putBlockStreamReader(bsr)
|
||||
}
|
||||
if err != nil {
|
||||
pt.releasePartsToMerge(pws)
|
||||
return err
|
||||
}
|
||||
if mpNew != nil {
|
||||
|
|
Loading…
Reference in a new issue