mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
lib/storage: get parts to merge after applying the limit on the number of concurrent merges
This should reduce write amplification under high ingestion rate.
This commit is contained in:
parent
5d2276dbf7
commit
a27e034a40
1 changed files with 10 additions and 4 deletions
|
@ -868,6 +868,11 @@ type freeSpaceEntry struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pt *partition) mergeBigParts(isFinal bool) error {
|
func (pt *partition) mergeBigParts(isFinal bool) error {
|
||||||
|
bigMergeConcurrencyLimitCh <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-bigMergeConcurrencyLimitCh
|
||||||
|
}()
|
||||||
|
|
||||||
maxRows := maxRowsByPath(pt.bigPartsPath)
|
maxRows := maxRowsByPath(pt.bigPartsPath)
|
||||||
|
|
||||||
pt.partsLock.Lock()
|
pt.partsLock.Lock()
|
||||||
|
@ -880,15 +885,18 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
|
||||||
|
|
||||||
atomic.AddUint64(&pt.bigMergesCount, 1)
|
atomic.AddUint64(&pt.bigMergesCount, 1)
|
||||||
atomic.AddUint64(&pt.activeBigMerges, 1)
|
atomic.AddUint64(&pt.activeBigMerges, 1)
|
||||||
bigMergeConcurrencyLimitCh <- struct{}{}
|
|
||||||
err := pt.mergeParts(pws, pt.stopCh)
|
err := pt.mergeParts(pws, pt.stopCh)
|
||||||
<-bigMergeConcurrencyLimitCh
|
|
||||||
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
|
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pt *partition) mergeSmallParts(isFinal bool) error {
|
func (pt *partition) mergeSmallParts(isFinal bool) error {
|
||||||
|
smallMergeConcurrencyLimitCh <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-smallMergeConcurrencyLimitCh
|
||||||
|
}()
|
||||||
|
|
||||||
maxRows := maxRowsByPath(pt.smallPartsPath)
|
maxRows := maxRowsByPath(pt.smallPartsPath)
|
||||||
if maxRows > maxRowsPerSmallPart() {
|
if maxRows > maxRowsPerSmallPart() {
|
||||||
// The output part may go to big part,
|
// The output part may go to big part,
|
||||||
|
@ -909,9 +917,7 @@ func (pt *partition) mergeSmallParts(isFinal bool) error {
|
||||||
|
|
||||||
atomic.AddUint64(&pt.smallMergesCount, 1)
|
atomic.AddUint64(&pt.smallMergesCount, 1)
|
||||||
atomic.AddUint64(&pt.activeSmallMerges, 1)
|
atomic.AddUint64(&pt.activeSmallMerges, 1)
|
||||||
smallMergeConcurrencyLimitCh <- struct{}{}
|
|
||||||
err := pt.mergeParts(pws, pt.stopCh)
|
err := pt.mergeParts(pws, pt.stopCh)
|
||||||
<-smallMergeConcurrencyLimitCh
|
|
||||||
atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0))
|
atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in a new issue