mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/{mergeset,storage}: improve the detection of the needed free space for background merge
This should prevent from possible out of disk space crashes during big merges. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1560
This commit is contained in:
parent
a287a48634
commit
ffc0ab1774
5 changed files with 153 additions and 170 deletions
|
@ -12,6 +12,7 @@ sort: 15
|
|||
* FEATURE: update Go builder from v1.16.7 to v1.17.0. This improves data ingestion and query performance by up to 5% according to benchmarks. See [the release post for Go1.17](https://go.dev/blog/go1.17).
|
||||
|
||||
* BUGFIX: rename `sign` function to `sgn` in order to be consistent with PromQL. See [this pull request from Prometheus](https://github.com/prometheus/prometheus/pull/8457).
|
||||
* BUGFIX: improve the detection of the needed free space for background merge operation. This should prevent from possible out of disk space crashes during big merges. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1560).
|
||||
|
||||
|
||||
## [v1.64.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.64.1)
|
||||
|
|
|
@ -50,13 +50,11 @@ const defaultPartsToMerge = 15
|
|||
// write amplification.
|
||||
const finalPartsToMerge = 2
|
||||
|
||||
// maxItemsPerPart is the absolute maximum number of items per part.
|
||||
// maxPartSize is the maximum part size in bytes.
|
||||
//
|
||||
// This number should be limited by the amount of time required to merge
|
||||
// such number of items. The required time shouldn't exceed a day.
|
||||
//
|
||||
// TODO: adjust this number using production stats.
|
||||
const maxItemsPerPart = 100e9
|
||||
// This number should be limited by the amount of time required to merge parts of this summary size.
|
||||
// The required time shouldn't exceed a day.
|
||||
const maxPartSize = 400e9
|
||||
|
||||
// maxItemsPerCachedPart is the maximum items per created part by the merge,
|
||||
// which must be cached in the OS page cache.
|
||||
|
@ -790,13 +788,15 @@ func (tb *Table) startPartMergers() {
|
|||
}
|
||||
|
||||
func (tb *Table) mergeExistingParts(isFinal bool) error {
|
||||
maxItems := tb.maxOutPartItems()
|
||||
if maxItems > maxItemsPerPart {
|
||||
maxItems = maxItemsPerPart
|
||||
n := fs.MustGetFreeSpace(tb.path)
|
||||
// Divide free space by the max number of concurrent merges.
|
||||
maxOutBytes := n / uint64(mergeWorkersCount)
|
||||
if maxOutBytes > maxPartSize {
|
||||
maxOutBytes = maxPartSize
|
||||
}
|
||||
|
||||
tb.partsLock.Lock()
|
||||
pws := getPartsToMerge(tb.parts, maxItems, isFinal)
|
||||
pws := getPartsToMerge(tb.parts, maxOutBytes, isFinal)
|
||||
tb.partsLock.Unlock()
|
||||
|
||||
return tb.mergeParts(pws, tb.stopCh, false)
|
||||
|
@ -1045,33 +1045,6 @@ func (tb *Table) nextMergeIdx() uint64 {
|
|||
return atomic.AddUint64(&tb.mergeIdx, 1)
|
||||
}
|
||||
|
||||
var (
|
||||
maxOutPartItemsLock sync.Mutex
|
||||
maxOutPartItemsDeadline uint64
|
||||
lastMaxOutPartItems uint64
|
||||
)
|
||||
|
||||
func (tb *Table) maxOutPartItems() uint64 {
|
||||
maxOutPartItemsLock.Lock()
|
||||
if maxOutPartItemsDeadline < fasttime.UnixTimestamp() {
|
||||
lastMaxOutPartItems = tb.maxOutPartItemsSlow()
|
||||
maxOutPartItemsDeadline = fasttime.UnixTimestamp() + 2
|
||||
}
|
||||
n := lastMaxOutPartItems
|
||||
maxOutPartItemsLock.Unlock()
|
||||
return n
|
||||
}
|
||||
|
||||
func (tb *Table) maxOutPartItemsSlow() uint64 {
|
||||
freeSpace := fs.MustGetFreeSpace(tb.path)
|
||||
|
||||
// Calculate the maximum number of items in the output merge part
|
||||
// by dividing the freeSpace by 4 and by the number of concurrent
|
||||
// mergeWorkersCount.
|
||||
// This assumes each item is compressed into 4 bytes.
|
||||
return freeSpace / uint64(mergeWorkersCount) / 4
|
||||
}
|
||||
|
||||
var mergeWorkersCount = cgroup.AvailableCPUs()
|
||||
|
||||
func openParts(path string) ([]*partWrapper, error) {
|
||||
|
@ -1371,8 +1344,8 @@ func validatePath(pathPrefix, path string) (string, error) {
|
|||
//
|
||||
// if isFinal is set, then merge harder.
|
||||
//
|
||||
// The returned parts will contain less than maxItems items.
|
||||
func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partWrapper {
|
||||
// The summary size of the returned parts must be smaller than the maxOutBytes.
|
||||
func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*partWrapper {
|
||||
pwsRemaining := make([]*partWrapper, 0, len(pws))
|
||||
for _, pw := range pws {
|
||||
if !pw.isInMerge {
|
||||
|
@ -1383,11 +1356,11 @@ func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partW
|
|||
var dst []*partWrapper
|
||||
if isFinal {
|
||||
for len(dst) == 0 && maxPartsToMerge >= finalPartsToMerge {
|
||||
dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxItems)
|
||||
dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
|
||||
maxPartsToMerge--
|
||||
}
|
||||
} else {
|
||||
dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxItems)
|
||||
dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
|
||||
}
|
||||
for _, pw := range dst {
|
||||
if pw.isInMerge {
|
||||
|
@ -1398,9 +1371,17 @@ func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partW
|
|||
return dst
|
||||
}
|
||||
|
||||
// minMergeMultiplier is the minimum multiplier for the size of the output part
|
||||
// compared to the size of the maximum input part for the merge.
|
||||
//
|
||||
// Higher value reduces write amplification (disk write IO induced by the merge),
|
||||
// while increases the number of unmerged parts.
|
||||
// The 1.7 is good enough for production workloads.
|
||||
const minMergeMultiplier = 1.7
|
||||
|
||||
// appendPartsToMerge finds optimal parts to merge from src, appends
|
||||
// them to dst and returns the result.
|
||||
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems uint64) []*partWrapper {
|
||||
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) []*partWrapper {
|
||||
if len(src) < 2 {
|
||||
// There is no need in merging zero or one part :)
|
||||
return dst
|
||||
|
@ -1411,18 +1392,18 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
|
|||
|
||||
// Filter out too big parts.
|
||||
// This should reduce N for O(n^2) algorithm below.
|
||||
maxInPartItems := maxItems / 2
|
||||
maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier)
|
||||
tmp := make([]*partWrapper, 0, len(src))
|
||||
for _, pw := range src {
|
||||
if pw.p.ph.itemsCount > maxInPartItems {
|
||||
if pw.p.size > maxInPartBytes {
|
||||
continue
|
||||
}
|
||||
tmp = append(tmp, pw)
|
||||
}
|
||||
src = tmp
|
||||
|
||||
// Sort src parts by itemsCount.
|
||||
sort.Slice(src, func(i, j int) bool { return src[i].p.ph.itemsCount < src[j].p.ph.itemsCount })
|
||||
// Sort src parts by size.
|
||||
sort.Slice(src, func(i, j int) bool { return src[i].p.size < src[j].p.size })
|
||||
|
||||
maxSrcParts := maxPartsToMerge
|
||||
if maxSrcParts > len(src) {
|
||||
|
@ -1439,20 +1420,20 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
|
|||
for i := minSrcParts; i <= maxSrcParts; i++ {
|
||||
for j := 0; j <= len(src)-i; j++ {
|
||||
a := src[j : j+i]
|
||||
if a[0].p.ph.itemsCount*uint64(len(a)) < a[len(a)-1].p.ph.itemsCount {
|
||||
// Do not merge parts with too big difference in items count,
|
||||
if a[0].p.size*uint64(len(a)) < a[len(a)-1].p.size {
|
||||
// Do not merge parts with too big difference in size,
|
||||
// since this results in unbalanced merges.
|
||||
continue
|
||||
}
|
||||
itemsSum := uint64(0)
|
||||
outBytes := uint64(0)
|
||||
for _, pw := range a {
|
||||
itemsSum += pw.p.ph.itemsCount
|
||||
outBytes += pw.p.size
|
||||
}
|
||||
if itemsSum > maxItems {
|
||||
if outBytes > maxOutBytes {
|
||||
// There is no sense in checking the remaining bigger parts.
|
||||
break
|
||||
}
|
||||
m := float64(itemsSum) / float64(a[len(a)-1].p.ph.itemsCount)
|
||||
m := float64(outBytes) / float64(a[len(a)-1].p.size)
|
||||
if m < maxM {
|
||||
continue
|
||||
}
|
||||
|
@ -1462,11 +1443,12 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
|
|||
}
|
||||
|
||||
minM := float64(maxPartsToMerge) / 2
|
||||
if minM < 1.7 {
|
||||
minM = 1.7
|
||||
if minM < minMergeMultiplier {
|
||||
minM = minMergeMultiplier
|
||||
}
|
||||
if maxM < minM {
|
||||
// There is no sense in merging parts with too small m.
|
||||
// There is no sense in merging parts with too small m,
|
||||
// since this leads to high disk write IO.
|
||||
return dst
|
||||
}
|
||||
return append(dst, pws...)
|
||||
|
|
|
@ -133,7 +133,7 @@ func (p *part) MustClose() {
|
|||
p.valuesFile.MustClose()
|
||||
p.indexFile.MustClose()
|
||||
|
||||
isBig := p.ph.RowsCount > maxRowsPerSmallPart()
|
||||
isBig := p.size > maxSmallPartSize()
|
||||
p.ibCache.MustClose(isBig)
|
||||
}
|
||||
|
||||
|
|
|
@ -35,27 +35,26 @@ var (
|
|||
historicalSmallIndexBlocksCacheMisses uint64
|
||||
)
|
||||
|
||||
func maxRowsPerSmallPart() uint64 {
|
||||
func maxSmallPartSize() uint64 {
|
||||
// Small parts are cached in the OS page cache,
|
||||
// so limit the number of rows for small part by the remaining free RAM.
|
||||
// so limit their size by the remaining free RAM.
|
||||
mem := memory.Remaining()
|
||||
// Production data shows that each row occupies ~1 byte in the compressed part.
|
||||
// It is expected no more than defaultPartsToMerge/2 parts exist
|
||||
// in the OS page cache before they are merged into bigger part.
|
||||
// Half of the remaining RAM must be left for lib/mergeset parts,
|
||||
// so the maxItems is calculated using the below code:
|
||||
maxRows := uint64(mem) / defaultPartsToMerge
|
||||
if maxRows < 10e6 {
|
||||
maxRows = 10e6
|
||||
maxSize := uint64(mem) / defaultPartsToMerge
|
||||
if maxSize < 10e6 {
|
||||
maxSize = 10e6
|
||||
}
|
||||
return maxRows
|
||||
return maxSize
|
||||
}
|
||||
|
||||
// The maximum number of rows per big part.
|
||||
// The maximum size of big part.
|
||||
//
|
||||
// This number limits the maximum time required for building big part.
|
||||
// This time shouldn't exceed a few days.
|
||||
const maxRowsPerBigPart = 1e12
|
||||
const maxBigPartSize = 1e12
|
||||
|
||||
// The maximum number of small parts in the partition.
|
||||
const maxSmallPartsPerPartition = 256
|
||||
|
@ -977,26 +976,21 @@ func SetFinalMergeDelay(delay time.Duration) {
|
|||
finalMergeDelaySeconds = uint64(delay.Seconds() + 1)
|
||||
}
|
||||
|
||||
func maxRowsByPath(path string) uint64 {
|
||||
freeSpace := fs.MustGetFreeSpace(path)
|
||||
|
||||
// Calculate the maximum number of rows in the output merge part
|
||||
// by dividing the freeSpace by the maximum number of concurrent
|
||||
// workers for big parts.
|
||||
// This assumes each row is compressed into 1 byte
|
||||
// according to production data.
|
||||
maxRows := freeSpace / uint64(bigMergeWorkersCount)
|
||||
if maxRows > maxRowsPerBigPart {
|
||||
maxRows = maxRowsPerBigPart
|
||||
func getMaxOutBytes(path string, workersCount int) uint64 {
|
||||
n := fs.MustGetFreeSpace(path)
|
||||
// Divide free space by the max number concurrent merges.
|
||||
maxOutBytes := n / uint64(workersCount)
|
||||
if maxOutBytes > maxBigPartSize {
|
||||
maxOutBytes = maxBigPartSize
|
||||
}
|
||||
return maxRows
|
||||
return maxOutBytes
|
||||
}
|
||||
|
||||
func (pt *partition) mergeBigParts(isFinal bool) error {
|
||||
maxRows := maxRowsByPath(pt.bigPartsPath)
|
||||
maxOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
|
||||
|
||||
pt.partsLock.Lock()
|
||||
pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal)
|
||||
pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxOutBytes, isFinal)
|
||||
pt.partsLock.Unlock()
|
||||
|
||||
atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace)
|
||||
|
@ -1005,29 +999,29 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
|
|||
|
||||
func (pt *partition) mergeSmallParts(isFinal bool) error {
|
||||
// Try merging small parts to a big part at first.
|
||||
maxBigPartRows := maxRowsByPath(pt.bigPartsPath)
|
||||
maxBigPartOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
|
||||
pt.partsLock.Lock()
|
||||
pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartRows, isFinal)
|
||||
pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartOutBytes, isFinal)
|
||||
pt.partsLock.Unlock()
|
||||
atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace)
|
||||
|
||||
rowsCount := getRowsCount(pws)
|
||||
if rowsCount > maxRowsPerSmallPart() {
|
||||
outSize := getPartsSize(pws)
|
||||
if outSize > maxSmallPartSize() {
|
||||
// Merge small parts to a big part.
|
||||
return pt.mergeParts(pws, pt.stopCh)
|
||||
}
|
||||
|
||||
// Make sure that the output small part fits small parts storage.
|
||||
maxSmallPartRows := maxRowsByPath(pt.smallPartsPath)
|
||||
if rowsCount <= maxSmallPartRows {
|
||||
maxSmallPartOutBytes := getMaxOutBytes(pt.smallPartsPath, smallMergeWorkersCount)
|
||||
if outSize <= maxSmallPartOutBytes {
|
||||
// Merge small parts to a small part.
|
||||
return pt.mergeParts(pws, pt.stopCh)
|
||||
}
|
||||
|
||||
// The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartRows limit.
|
||||
// The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartOutBytes limit.
|
||||
pt.releasePartsToMerge(pws)
|
||||
pt.partsLock.Lock()
|
||||
pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartRows, isFinal)
|
||||
pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartOutBytes, isFinal)
|
||||
pt.partsLock.Unlock()
|
||||
atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace)
|
||||
|
||||
|
@ -1088,13 +1082,15 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
bsrs = append(bsrs, bsr)
|
||||
}
|
||||
|
||||
outSize := uint64(0)
|
||||
outRowsCount := uint64(0)
|
||||
outBlocksCount := uint64(0)
|
||||
for _, pw := range pws {
|
||||
outSize += pw.p.size
|
||||
outRowsCount += pw.p.ph.RowsCount
|
||||
outBlocksCount += pw.p.ph.BlocksCount
|
||||
}
|
||||
isBigPart := outRowsCount > maxRowsPerSmallPart()
|
||||
isBigPart := outSize > maxSmallPartSize()
|
||||
nocache := isBigPart
|
||||
|
||||
// Prepare BlockStreamWriter for destination part.
|
||||
|
@ -1343,9 +1339,9 @@ func (pt *partition) removeStaleParts() {
|
|||
|
||||
// getPartsToMerge returns optimal parts to merge from pws.
|
||||
//
|
||||
// The returned parts will contain less than maxRows rows.
|
||||
// The function returns true if pws contains parts, which cannot be merged because of maxRows limit.
|
||||
func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partWrapper, bool) {
|
||||
// The summary size of the returned parts must be smaller than maxOutBytes.
|
||||
// The function returns true if pws contains parts, which cannot be merged because of maxOutBytes limit.
|
||||
func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) ([]*partWrapper, bool) {
|
||||
pwsRemaining := make([]*partWrapper, 0, len(pws))
|
||||
for _, pw := range pws {
|
||||
if !pw.isInMerge {
|
||||
|
@ -1357,11 +1353,11 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partW
|
|||
needFreeSpace := false
|
||||
if isFinal {
|
||||
for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge {
|
||||
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
|
||||
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
|
||||
maxPartsToMerge--
|
||||
}
|
||||
} else {
|
||||
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
|
||||
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
|
||||
}
|
||||
for _, pw := range pms {
|
||||
if pw.isInMerge {
|
||||
|
@ -1372,10 +1368,18 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partW
|
|||
return pms, needFreeSpace
|
||||
}
|
||||
|
||||
// minMergeMultiplier is the minimum multiplier for the size of the output part
|
||||
// compared to the size of the maximum input part for the merge.
|
||||
//
|
||||
// Higher value reduces write amplification (disk write IO induced by the merge),
|
||||
// while increases the number of unmerged parts.
|
||||
// The 1.7 is good enough for production workloads.
|
||||
const minMergeMultiplier = 1.7
|
||||
|
||||
// appendPartsToMerge finds optimal parts to merge from src, appends
|
||||
// them to dst and returns the result.
|
||||
// The function returns true if src contains parts, which cannot be merged because of maxRows limit.
|
||||
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) ([]*partWrapper, bool) {
|
||||
// The function returns true if src contains parts, which cannot be merged because of maxOutBytes limit.
|
||||
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) ([]*partWrapper, bool) {
|
||||
if len(src) < 2 {
|
||||
// There is no need in merging zero or one part :)
|
||||
return dst, false
|
||||
|
@ -1387,10 +1391,10 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
|||
// Filter out too big parts.
|
||||
// This should reduce N for O(N^2) algorithm below.
|
||||
skippedBigParts := 0
|
||||
maxInPartRows := maxRows / 2
|
||||
maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier)
|
||||
tmp := make([]*partWrapper, 0, len(src))
|
||||
for _, pw := range src {
|
||||
if pw.p.ph.RowsCount > maxInPartRows {
|
||||
if pw.p.size > maxInPartBytes {
|
||||
skippedBigParts++
|
||||
continue
|
||||
}
|
||||
|
@ -1399,15 +1403,15 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
|||
src = tmp
|
||||
needFreeSpace := skippedBigParts > 1
|
||||
|
||||
// Sort src parts by rows count and backwards timestamp.
|
||||
// Sort src parts by size and backwards timestamp.
|
||||
// This should improve adjanced points' locality in the merged parts.
|
||||
sort.Slice(src, func(i, j int) bool {
|
||||
a := &src[i].p.ph
|
||||
b := &src[j].p.ph
|
||||
if a.RowsCount == b.RowsCount {
|
||||
return a.MinTimestamp > b.MinTimestamp
|
||||
a := src[i].p
|
||||
b := src[j].p
|
||||
if a.size == b.size {
|
||||
return a.ph.MinTimestamp > b.ph.MinTimestamp
|
||||
}
|
||||
return a.RowsCount < b.RowsCount
|
||||
return a.size < b.size
|
||||
})
|
||||
|
||||
maxSrcParts := maxPartsToMerge
|
||||
|
@ -1425,20 +1429,20 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
|||
for i := minSrcParts; i <= maxSrcParts; i++ {
|
||||
for j := 0; j <= len(src)-i; j++ {
|
||||
a := src[j : j+i]
|
||||
rowsCount := getRowsCount(a)
|
||||
if rowsCount > maxRows {
|
||||
outSize := getPartsSize(a)
|
||||
if outSize > maxOutBytes {
|
||||
needFreeSpace = true
|
||||
}
|
||||
if a[0].p.ph.RowsCount*uint64(len(a)) < a[len(a)-1].p.ph.RowsCount {
|
||||
// Do not merge parts with too big difference in rows count,
|
||||
if a[0].p.size*uint64(len(a)) < a[len(a)-1].p.size {
|
||||
// Do not merge parts with too big difference in size,
|
||||
// since this results in unbalanced merges.
|
||||
continue
|
||||
}
|
||||
if rowsCount > maxRows {
|
||||
// There is no need in verifying remaining parts with higher number of rows
|
||||
if outSize > maxOutBytes {
|
||||
// There is no need in verifying remaining parts with bigger sizes.
|
||||
break
|
||||
}
|
||||
m := float64(rowsCount) / float64(a[len(a)-1].p.ph.RowsCount)
|
||||
m := float64(outSize) / float64(a[len(a)-1].p.size)
|
||||
if m < maxM {
|
||||
continue
|
||||
}
|
||||
|
@ -1448,20 +1452,21 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
|||
}
|
||||
|
||||
minM := float64(maxPartsToMerge) / 2
|
||||
if minM < 1.7 {
|
||||
minM = 1.7
|
||||
if minM < minMergeMultiplier {
|
||||
minM = minMergeMultiplier
|
||||
}
|
||||
if maxM < minM {
|
||||
// There is no sense in merging parts with too small m.
|
||||
// There is no sense in merging parts with too small m,
|
||||
// since this leads to high disk write IO.
|
||||
return dst, needFreeSpace
|
||||
}
|
||||
return append(dst, pws...), needFreeSpace
|
||||
}
|
||||
|
||||
func getRowsCount(pws []*partWrapper) uint64 {
|
||||
func getPartsSize(pws []*partWrapper) uint64 {
|
||||
n := uint64(0)
|
||||
for _, pw := range pws {
|
||||
n += pw.p.ph.RowsCount
|
||||
n += pw.p.size
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
|
|
@ -6,10 +6,10 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
func TestPartitionMaxRowsByPath(t *testing.T) {
|
||||
n := maxRowsByPath(".")
|
||||
func TestPartitionGetMaxOutBytes(t *testing.T) {
|
||||
n := getMaxOutBytes(".", 1)
|
||||
if n < 1e3 {
|
||||
t.Fatalf("too small number of rows can be created in the current directory: %d", n)
|
||||
t.Fatalf("too small free space remained in the current directory: %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -35,10 +35,10 @@ func TestAppendPartsToMerge(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) {
|
||||
f := func(a []uint64, maxItems int, expectedNeedFreeSpace bool) {
|
||||
f := func(sizes []uint64, maxOutBytes int, expectedNeedFreeSpace bool) {
|
||||
t.Helper()
|
||||
pws := newTestPartWrappersForRowsCount(a)
|
||||
_, needFreeSpace := appendPartsToMerge(nil, pws, defaultPartsToMerge, uint64(maxItems))
|
||||
pws := newTestPartWrappersForSizes(sizes)
|
||||
_, needFreeSpace := appendPartsToMerge(nil, pws, defaultPartsToMerge, uint64(maxOutBytes))
|
||||
if needFreeSpace != expectedNeedFreeSpace {
|
||||
t.Fatalf("unexpected needFreeSpace; got %v; want %v", needFreeSpace, expectedNeedFreeSpace)
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) {
|
|||
f(nil, 1000, false)
|
||||
f([]uint64{1000}, 100, false)
|
||||
f([]uint64{1000}, 1100, false)
|
||||
f([]uint64{100, 200}, 180, true)
|
||||
f([]uint64{120, 200}, 180, true)
|
||||
f([]uint64{100, 200}, 310, false)
|
||||
f([]uint64{100, 110, 109, 1}, 300, true)
|
||||
f([]uint64{100, 110, 109, 1}, 330, false)
|
||||
|
@ -55,8 +55,8 @@ func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) {
|
|||
func TestAppendPartsToMergeManyParts(t *testing.T) {
|
||||
// Verify that big number of parts are merged into minimal number of parts
|
||||
// using minimum merges.
|
||||
var a []uint64
|
||||
maxOutPartRows := uint64(0)
|
||||
var sizes []uint64
|
||||
maxOutSize := uint64(0)
|
||||
r := rand.New(rand.NewSource(1))
|
||||
for i := 0; i < 1024; i++ {
|
||||
n := uint64(uint32(r.NormFloat64() * 1e9))
|
||||
|
@ -64,15 +64,15 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
|
|||
n = -n
|
||||
}
|
||||
n++
|
||||
maxOutPartRows += n
|
||||
a = append(a, n)
|
||||
maxOutSize += n
|
||||
sizes = append(sizes, n)
|
||||
}
|
||||
pws := newTestPartWrappersForRowsCount(a)
|
||||
pws := newTestPartWrappersForSizes(sizes)
|
||||
|
||||
iterationsCount := 0
|
||||
rowsMerged := uint64(0)
|
||||
sizeMergedTotal := uint64(0)
|
||||
for {
|
||||
pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows)
|
||||
pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutSize)
|
||||
if len(pms) == 0 {
|
||||
break
|
||||
}
|
||||
|
@ -81,61 +81,58 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
|
|||
m[pw] = true
|
||||
}
|
||||
var pwsNew []*partWrapper
|
||||
rowsCount := uint64(0)
|
||||
size := uint64(0)
|
||||
for _, pw := range pws {
|
||||
if m[pw] {
|
||||
rowsCount += pw.p.ph.RowsCount
|
||||
size += pw.p.size
|
||||
} else {
|
||||
pwsNew = append(pwsNew, pw)
|
||||
}
|
||||
}
|
||||
pw := &partWrapper{
|
||||
p: &part{},
|
||||
p: &part{
|
||||
size: size,
|
||||
},
|
||||
}
|
||||
pw.p.ph = partHeader{
|
||||
RowsCount: rowsCount,
|
||||
}
|
||||
rowsMerged += rowsCount
|
||||
sizeMergedTotal += size
|
||||
pwsNew = append(pwsNew, pw)
|
||||
pws = pwsNew
|
||||
iterationsCount++
|
||||
}
|
||||
rowsCount := newTestRowsCountFromPartWrappers(pws)
|
||||
rowsTotal := uint64(0)
|
||||
for _, rc := range rowsCount {
|
||||
rowsTotal += uint64(rc)
|
||||
sizes = newTestSizesFromPartWrappers(pws)
|
||||
sizeTotal := uint64(0)
|
||||
for _, size := range sizes {
|
||||
sizeTotal += uint64(size)
|
||||
}
|
||||
overhead := float64(rowsMerged) / float64(rowsTotal)
|
||||
overhead := float64(sizeMergedTotal) / float64(sizeTotal)
|
||||
if overhead > 2.1 {
|
||||
t.Fatalf("too big overhead; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
|
||||
rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
|
||||
t.Fatalf("too big overhead; sizes=%d, iterationsCount=%d, sizeTotal=%d, sizeMergedTotal=%d, overhead=%f",
|
||||
sizes, iterationsCount, sizeTotal, sizeMergedTotal, overhead)
|
||||
}
|
||||
if len(rowsCount) > 18 {
|
||||
t.Fatalf("too many rowsCount %d; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
|
||||
len(rowsCount), rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
|
||||
if len(sizes) > 18 {
|
||||
t.Fatalf("too many sizes %d; sizes=%d, iterationsCount=%d, sizeTotal=%d, sizeMergedTotal=%d, overhead=%f",
|
||||
len(sizes), sizes, iterationsCount, sizeTotal, sizeMergedTotal, overhead)
|
||||
}
|
||||
}
|
||||
|
||||
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) {
|
||||
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialSizes, expectedSizes []uint64) {
|
||||
t.Helper()
|
||||
|
||||
pws := newTestPartWrappersForRowsCount(initialRowsCount)
|
||||
pws := newTestPartWrappersForSizes(initialSizes)
|
||||
|
||||
// Verify appending to nil.
|
||||
pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9)
|
||||
rowsCount := newTestRowsCountFromPartWrappers(pms)
|
||||
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
|
||||
t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
|
||||
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount)
|
||||
sizes := newTestSizesFromPartWrappers(pms)
|
||||
if !reflect.DeepEqual(sizes, expectedSizes) {
|
||||
t.Fatalf("unexpected size for maxPartsToMerge=%d, initialSizes=%d; got\n%d; want\n%d",
|
||||
maxPartsToMerge, initialSizes, sizes, expectedSizes)
|
||||
}
|
||||
|
||||
// Verify appending to prefix
|
||||
prefix := []*partWrapper{
|
||||
{
|
||||
p: &part{
|
||||
ph: partHeader{
|
||||
RowsCount: 1234,
|
||||
},
|
||||
size: 1234,
|
||||
},
|
||||
},
|
||||
{},
|
||||
|
@ -143,33 +140,31 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
|
|||
}
|
||||
pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9)
|
||||
if !reflect.DeepEqual(pms[:len(prefix)], prefix) {
|
||||
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v",
|
||||
maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix)
|
||||
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialSizes=%d; got\n%+v; want\n%+v",
|
||||
maxPartsToMerge, initialSizes, pms[:len(prefix)], prefix)
|
||||
}
|
||||
|
||||
rowsCount = newTestRowsCountFromPartWrappers(pms[len(prefix):])
|
||||
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
|
||||
t.Fatalf("unexpected prefixed rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
|
||||
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount)
|
||||
sizes = newTestSizesFromPartWrappers(pms[len(prefix):])
|
||||
if !reflect.DeepEqual(sizes, expectedSizes) {
|
||||
t.Fatalf("unexpected prefixed sizes for maxPartsToMerge=%d, initialSizes=%d; got\n%d; want\n%d",
|
||||
maxPartsToMerge, initialSizes, sizes, expectedSizes)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 {
|
||||
var rowsCount []uint64
|
||||
func newTestSizesFromPartWrappers(pws []*partWrapper) []uint64 {
|
||||
var sizes []uint64
|
||||
for _, pw := range pws {
|
||||
rowsCount = append(rowsCount, pw.p.ph.RowsCount)
|
||||
sizes = append(sizes, pw.p.size)
|
||||
}
|
||||
return rowsCount
|
||||
return sizes
|
||||
}
|
||||
|
||||
func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper {
|
||||
func newTestPartWrappersForSizes(sizes []uint64) []*partWrapper {
|
||||
var pws []*partWrapper
|
||||
for _, rc := range rowsCount {
|
||||
for _, size := range sizes {
|
||||
pw := &partWrapper{
|
||||
p: &part{
|
||||
ph: partHeader{
|
||||
RowsCount: rc,
|
||||
},
|
||||
size: size,
|
||||
},
|
||||
}
|
||||
pws = append(pws, pw)
|
||||
|
|
Loading…
Reference in a new issue