diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index c6d11ddd18..973ca36652 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -633,7 +633,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe } func (tb *Table) startPartMergers() { - for i := 0; i < mergeWorkers; i++ { + for i := 0; i < mergeWorkersCount; i++ { tb.partMergersWG.Add(1) go func() { if err := tb.partMerger(); err != nil { @@ -683,7 +683,7 @@ func (tb *Table) partMerger() error { if err != errNothingToMerge { return err } - if time.Since(lastMergeTime) > 10*time.Second { + if time.Since(lastMergeTime) > 30*time.Second { // We have free time for merging into bigger parts. // This should improve select performance. lastMergeTime = time.Now() @@ -901,12 +901,12 @@ func (tb *Table) maxOutPartItemsSlow() uint64 { // Calculate the maximum number of items in the output merge part // by dividing the freeSpace by 4 and by the number of concurrent - // mergeWorkers. + // mergeWorkersCount. // This assumes each item is compressed into 4 bytes. - return freeSpace / uint64(mergeWorkers) / 4 + return freeSpace / uint64(mergeWorkersCount) / 4 } -var mergeWorkers = func() int { +var mergeWorkersCount = func() int { return runtime.GOMAXPROCS(-1) }() @@ -1246,30 +1246,31 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u for i := 2; i <= n; i++ { for j := 0; j <= len(src)-i; j++ { itemsSum := uint64(0) - for _, pw := range src[j : j+i] { + a := src[j : j+i] + for _, pw := range a { itemsSum += pw.p.ph.itemsCount } if itemsSum > maxItems { - continue + // There is no sense in checking the remaining bigger parts. + break } - m := float64(itemsSum) / float64(src[j+i-1].p.ph.itemsCount) + m := float64(itemsSum) / float64(a[len(a)-1].p.ph.itemsCount) if m < maxM { continue } maxM = m - pws = src[j : j+i] + pws = a } } - minM := float64(maxPartsToMerge / 2) - if minM < 2 { - minM = 2 + minM := float64(maxPartsToMerge) / 2 + if minM < 1.7 { + minM = 1.7 } if maxM < minM { // There is no sense in merging parts with too small m. return dst } - return append(dst, pws...) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 53d8abe48b..676a0fd66b 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -58,7 +58,7 @@ const defaultPartsToMerge = 15 // It must be smaller than defaultPartsToMerge. // Lower value improves select performance at the cost of increased // write amplification. -const finalPartsToMerge = 3 +const finalPartsToMerge = 2 // getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet. func getMaxRawRowsPerPartition() int { @@ -734,7 +734,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { return nil } -var mergeWorkers = func() int { +var mergeWorkersCount = func() int { n := runtime.GOMAXPROCS(-1) / 2 if n <= 0 { n = 1 @@ -742,16 +742,18 @@ var mergeWorkers = func() int { return n }() +var bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) +var smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) + func (pt *partition) startMergeWorkers() { - for i := 0; i < mergeWorkers; i++ { + for i := 0; i < mergeWorkersCount; i++ { pt.smallPartsMergerWG.Add(1) go func() { pt.smallPartsMerger() pt.smallPartsMergerWG.Done() }() } - - for i := 0; i < mergeWorkers; i++ { + for i := 0; i < mergeWorkersCount; i++ { pt.bigPartsMergerWG.Add(1) go func() { pt.bigPartsMerger() @@ -798,7 +800,7 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error { if err != errNothingToMerge { return err } - if time.Since(lastMergeTime) > 10*time.Second { + if time.Since(lastMergeTime) > 30*time.Second { // We have free time for merging into bigger parts. // This should improve select performance. lastMergeTime = time.Now() @@ -825,11 +827,11 @@ func maxRowsByPath(path string) uint64 { // Calculate the maximum number of rows in the output merge part // by dividing the freeSpace by the number of concurrent - // mergeWorkers for big parts. + // mergeWorkersCount for big parts. // This assumes each row is compressed into 1 byte. Production // simulation shows that each row usually occupies up to 0.5 bytes, // so this is quite safe assumption. - maxRows := freeSpace / uint64(mergeWorkers) + maxRows := freeSpace / uint64(mergeWorkersCount) if maxRows > maxRowsPerBigPart { maxRows = maxRowsPerBigPart } @@ -878,7 +880,9 @@ func (pt *partition) mergeBigParts(isFinal bool) error { atomic.AddUint64(&pt.bigMergesCount, 1) atomic.AddUint64(&pt.activeBigMerges, 1) + bigMergeConcurrencyLimitCh <- struct{}{} err := pt.mergeParts(pws, pt.stopCh) + <-bigMergeConcurrencyLimitCh atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) return err @@ -905,7 +909,9 @@ func (pt *partition) mergeSmallParts(isFinal bool) error { atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) + smallMergeConcurrencyLimitCh <- struct{}{} err := pt.mergeParts(pws, pt.stopCh) + <-smallMergeConcurrencyLimitCh atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0)) return err @@ -1165,13 +1171,10 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui sort.Slice(src, func(i, j int) bool { a := &src[i].p.ph b := &src[j].p.ph - if a.RowsCount < b.RowsCount { - return true + if a.RowsCount == b.RowsCount { + return a.MinTimestamp > b.MinTimestamp } - if a.RowsCount > b.RowsCount { - return false - } - return a.MinTimestamp > b.MinTimestamp + return a.RowsCount < b.RowsCount }) n := maxPartsToMerge @@ -1185,31 +1188,32 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui maxM := float64(0) for i := 2; i <= n; i++ { for j := 0; j <= len(src)-i; j++ { + a := src[j : j+i] rowsSum := uint64(0) - for _, pw := range src[j : j+i] { + for _, pw := range a { rowsSum += pw.p.ph.RowsCount } if rowsSum > maxRows { - continue + // There is no need in verifying remaining parts with higher number of rows + break } - m := float64(rowsSum) / float64(src[j+i-1].p.ph.RowsCount) + m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount) if m < maxM { continue } maxM = m - pws = src[j : j+i] + pws = a } } - minM := float64(maxPartsToMerge / 2) - if minM < 2 { - minM = 2 + minM := float64(maxPartsToMerge) / 2 + if minM < 1.7 { + minM = 1.7 } if maxM < minM { // There is no sense in merging parts with too small m. return dst } - return append(dst, pws...) }