diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 4b511847a..4369ec116 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1025,24 +1025,47 @@ func (pt *partition) mergeBigParts(isFinal bool) error { } func (pt *partition) mergeSmallParts(isFinal bool) error { - maxRows := maxRowsByPath(pt.smallPartsPath) - if maxRows > maxRowsPerSmallPart() { - // The output part may go to big part, - // so make sure it has enough space. - maxBigPartRows := maxRowsByPath(pt.bigPartsPath) - if maxRows > maxBigPartRows { - maxRows = maxBigPartRows - } + // Try merging small parts to a big part at first. + maxBigPartRows := maxRowsByPath(pt.bigPartsPath) + pt.partsLock.Lock() + pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartRows, isFinal) + pt.partsLock.Unlock() + atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace) + + rowsCount := getRowsCount(pws) + if rowsCount > maxRowsPerSmallPart() { + // Merge small parts to a big part. + return pt.mergeParts(pws, pt.stopCh) } - pt.partsLock.Lock() - pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxRows, isFinal) - pt.partsLock.Unlock() + // Make sure that the output small part fits small parts storage. + maxSmallPartRows := maxRowsByPath(pt.smallPartsPath) + if rowsCount <= maxSmallPartRows { + // Merge small parts to a small part. + return pt.mergeParts(pws, pt.stopCh) + } + // The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartRows limit. + pt.releasePartsToMerge(pws) + pt.partsLock.Lock() + pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartRows, isFinal) + pt.partsLock.Unlock() atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace) + return pt.mergeParts(pws, pt.stopCh) } +func (pt *partition) releasePartsToMerge(pws []*partWrapper) { + pt.partsLock.Lock() + for _, pw := range pws { + if !pw.isInMerge { + logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path) + } + pw.isInMerge = false + } + pt.partsLock.Unlock() +} + var errNothingToMerge = fmt.Errorf("nothing to merge") func atomicSetBool(p *uint64, b bool) { @@ -1063,18 +1086,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro // Nothing to merge. return errNothingToMerge } - - defer func() { - // Remove isInMerge flag from pws. - pt.partsLock.Lock() - for _, pw := range pws { - if !pw.isInMerge { - logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path) - } - pw.isInMerge = false - } - pt.partsLock.Unlock() - }() + defer pt.releasePartsToMerge(pws) startTime := time.Now() @@ -1365,22 +1377,19 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui // since this results in unbalanced merges. continue } - rowsSum := uint64(0) - for _, pw := range a { - rowsSum += pw.p.ph.RowsCount - } - if rowsSum < 1e6 && len(a) < maxPartsToMerge { + rowsCount := getRowsCount(a) + if rowsCount < 1e6 && len(a) < maxPartsToMerge { // Do not merge parts with too small number of rows if the number of source parts // isn't equal to maxPartsToMerge. This should reduce CPU usage and disk IO usage // for small parts merge. continue } - if rowsSum > maxRows { + if rowsCount > maxRows { // There is no need in verifying remaining parts with higher number of rows needFreeSpace = true break } - m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount) + m := float64(rowsCount) / float64(a[len(a)-1].p.ph.RowsCount) if m < maxM { continue } @@ -1400,6 +1409,14 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui return append(dst, pws...), needFreeSpace } +func getRowsCount(pws []*partWrapper) uint64 { + n := uint64(0) + for _, pw := range pws { + n += pw.p.ph.RowsCount + } + return n +} + func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. if err := fs.MkdirAllIfNotExist(path); err != nil {