mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/{mergeset,storage}: limit the maximum number of concurrent merges; leave smaller number of parts during final merge
This commit is contained in:
parent
78166cc478
commit
5d2276dbf7
2 changed files with 40 additions and 35 deletions
|
@ -633,7 +633,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe
|
|||
}
|
||||
|
||||
func (tb *Table) startPartMergers() {
|
||||
for i := 0; i < mergeWorkers; i++ {
|
||||
for i := 0; i < mergeWorkersCount; i++ {
|
||||
tb.partMergersWG.Add(1)
|
||||
go func() {
|
||||
if err := tb.partMerger(); err != nil {
|
||||
|
@ -683,7 +683,7 @@ func (tb *Table) partMerger() error {
|
|||
if err != errNothingToMerge {
|
||||
return err
|
||||
}
|
||||
if time.Since(lastMergeTime) > 10*time.Second {
|
||||
if time.Since(lastMergeTime) > 30*time.Second {
|
||||
// We have free time for merging into bigger parts.
|
||||
// This should improve select performance.
|
||||
lastMergeTime = time.Now()
|
||||
|
@ -901,12 +901,12 @@ func (tb *Table) maxOutPartItemsSlow() uint64 {
|
|||
|
||||
// Calculate the maximum number of items in the output merge part
|
||||
// by dividing the freeSpace by 4 and by the number of concurrent
|
||||
// mergeWorkers.
|
||||
// mergeWorkersCount.
|
||||
// This assumes each item is compressed into 4 bytes.
|
||||
return freeSpace / uint64(mergeWorkers) / 4
|
||||
return freeSpace / uint64(mergeWorkersCount) / 4
|
||||
}
|
||||
|
||||
var mergeWorkers = func() int {
|
||||
var mergeWorkersCount = func() int {
|
||||
return runtime.GOMAXPROCS(-1)
|
||||
}()
|
||||
|
||||
|
@ -1246,30 +1246,31 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
|
|||
for i := 2; i <= n; i++ {
|
||||
for j := 0; j <= len(src)-i; j++ {
|
||||
itemsSum := uint64(0)
|
||||
for _, pw := range src[j : j+i] {
|
||||
a := src[j : j+i]
|
||||
for _, pw := range a {
|
||||
itemsSum += pw.p.ph.itemsCount
|
||||
}
|
||||
if itemsSum > maxItems {
|
||||
continue
|
||||
// There is no sense in checking the remaining bigger parts.
|
||||
break
|
||||
}
|
||||
m := float64(itemsSum) / float64(src[j+i-1].p.ph.itemsCount)
|
||||
m := float64(itemsSum) / float64(a[len(a)-1].p.ph.itemsCount)
|
||||
if m < maxM {
|
||||
continue
|
||||
}
|
||||
maxM = m
|
||||
pws = src[j : j+i]
|
||||
pws = a
|
||||
}
|
||||
}
|
||||
|
||||
minM := float64(maxPartsToMerge / 2)
|
||||
if minM < 2 {
|
||||
minM = 2
|
||||
minM := float64(maxPartsToMerge) / 2
|
||||
if minM < 1.7 {
|
||||
minM = 1.7
|
||||
}
|
||||
if maxM < minM {
|
||||
// There is no sense in merging parts with too small m.
|
||||
return dst
|
||||
}
|
||||
|
||||
return append(dst, pws...)
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ const defaultPartsToMerge = 15
|
|||
// It must be smaller than defaultPartsToMerge.
|
||||
// Lower value improves select performance at the cost of increased
|
||||
// write amplification.
|
||||
const finalPartsToMerge = 3
|
||||
const finalPartsToMerge = 2
|
||||
|
||||
// getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet.
|
||||
func getMaxRawRowsPerPartition() int {
|
||||
|
@ -734,7 +734,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
var mergeWorkers = func() int {
|
||||
var mergeWorkersCount = func() int {
|
||||
n := runtime.GOMAXPROCS(-1) / 2
|
||||
if n <= 0 {
|
||||
n = 1
|
||||
|
@ -742,16 +742,18 @@ var mergeWorkers = func() int {
|
|||
return n
|
||||
}()
|
||||
|
||||
var bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
|
||||
var smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
|
||||
|
||||
func (pt *partition) startMergeWorkers() {
|
||||
for i := 0; i < mergeWorkers; i++ {
|
||||
for i := 0; i < mergeWorkersCount; i++ {
|
||||
pt.smallPartsMergerWG.Add(1)
|
||||
go func() {
|
||||
pt.smallPartsMerger()
|
||||
pt.smallPartsMergerWG.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < mergeWorkers; i++ {
|
||||
for i := 0; i < mergeWorkersCount; i++ {
|
||||
pt.bigPartsMergerWG.Add(1)
|
||||
go func() {
|
||||
pt.bigPartsMerger()
|
||||
|
@ -798,7 +800,7 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error {
|
|||
if err != errNothingToMerge {
|
||||
return err
|
||||
}
|
||||
if time.Since(lastMergeTime) > 10*time.Second {
|
||||
if time.Since(lastMergeTime) > 30*time.Second {
|
||||
// We have free time for merging into bigger parts.
|
||||
// This should improve select performance.
|
||||
lastMergeTime = time.Now()
|
||||
|
@ -825,11 +827,11 @@ func maxRowsByPath(path string) uint64 {
|
|||
|
||||
// Calculate the maximum number of rows in the output merge part
|
||||
// by dividing the freeSpace by the number of concurrent
|
||||
// mergeWorkers for big parts.
|
||||
// mergeWorkersCount for big parts.
|
||||
// This assumes each row is compressed into 1 byte. Production
|
||||
// simulation shows that each row usually occupies up to 0.5 bytes,
|
||||
// so this is quite safe assumption.
|
||||
maxRows := freeSpace / uint64(mergeWorkers)
|
||||
maxRows := freeSpace / uint64(mergeWorkersCount)
|
||||
if maxRows > maxRowsPerBigPart {
|
||||
maxRows = maxRowsPerBigPart
|
||||
}
|
||||
|
@ -878,7 +880,9 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
|
|||
|
||||
atomic.AddUint64(&pt.bigMergesCount, 1)
|
||||
atomic.AddUint64(&pt.activeBigMerges, 1)
|
||||
bigMergeConcurrencyLimitCh <- struct{}{}
|
||||
err := pt.mergeParts(pws, pt.stopCh)
|
||||
<-bigMergeConcurrencyLimitCh
|
||||
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
|
||||
|
||||
return err
|
||||
|
@ -905,7 +909,9 @@ func (pt *partition) mergeSmallParts(isFinal bool) error {
|
|||
|
||||
atomic.AddUint64(&pt.smallMergesCount, 1)
|
||||
atomic.AddUint64(&pt.activeSmallMerges, 1)
|
||||
smallMergeConcurrencyLimitCh <- struct{}{}
|
||||
err := pt.mergeParts(pws, pt.stopCh)
|
||||
<-smallMergeConcurrencyLimitCh
|
||||
atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0))
|
||||
|
||||
return err
|
||||
|
@ -1165,13 +1171,10 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
|||
sort.Slice(src, func(i, j int) bool {
|
||||
a := &src[i].p.ph
|
||||
b := &src[j].p.ph
|
||||
if a.RowsCount < b.RowsCount {
|
||||
return true
|
||||
if a.RowsCount == b.RowsCount {
|
||||
return a.MinTimestamp > b.MinTimestamp
|
||||
}
|
||||
if a.RowsCount > b.RowsCount {
|
||||
return false
|
||||
}
|
||||
return a.MinTimestamp > b.MinTimestamp
|
||||
return a.RowsCount < b.RowsCount
|
||||
})
|
||||
|
||||
n := maxPartsToMerge
|
||||
|
@ -1185,31 +1188,32 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
|||
maxM := float64(0)
|
||||
for i := 2; i <= n; i++ {
|
||||
for j := 0; j <= len(src)-i; j++ {
|
||||
a := src[j : j+i]
|
||||
rowsSum := uint64(0)
|
||||
for _, pw := range src[j : j+i] {
|
||||
for _, pw := range a {
|
||||
rowsSum += pw.p.ph.RowsCount
|
||||
}
|
||||
if rowsSum > maxRows {
|
||||
continue
|
||||
// There is no need in verifying remaining parts with higher number of rows
|
||||
break
|
||||
}
|
||||
m := float64(rowsSum) / float64(src[j+i-1].p.ph.RowsCount)
|
||||
m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount)
|
||||
if m < maxM {
|
||||
continue
|
||||
}
|
||||
maxM = m
|
||||
pws = src[j : j+i]
|
||||
pws = a
|
||||
}
|
||||
}
|
||||
|
||||
minM := float64(maxPartsToMerge / 2)
|
||||
if minM < 2 {
|
||||
minM = 2
|
||||
minM := float64(maxPartsToMerge) / 2
|
||||
if minM < 1.7 {
|
||||
minM = 1.7
|
||||
}
|
||||
if maxM < minM {
|
||||
// There is no sense in merging parts with too small m.
|
||||
return dst
|
||||
}
|
||||
|
||||
return append(dst, pws...)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue