lib/{mergeset,storage}: improve the detection of the needed free space for background merge

This should prevent from possible out of disk space crashes during big merges.

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1560
This commit is contained in:
Aliaksandr Valialkin 2021-08-25 09:35:03 +03:00
parent ae8ec78c63
commit b885bd9b7d
5 changed files with 153 additions and 170 deletions

View file

@ -12,6 +12,7 @@ sort: 15
* FEATURE: update Go builder from v1.16.7 to v1.17.0. This improves data ingestion and query performance by up to 5% according to benchmarks. See [the release post for Go1.17](https://go.dev/blog/go1.17). * FEATURE: update Go builder from v1.16.7 to v1.17.0. This improves data ingestion and query performance by up to 5% according to benchmarks. See [the release post for Go1.17](https://go.dev/blog/go1.17).
* BUGFIX: rename `sign` function to `sgn` in order to be consistent with PromQL. See [this pull request from Prometheus](https://github.com/prometheus/prometheus/pull/8457). * BUGFIX: rename `sign` function to `sgn` in order to be consistent with PromQL. See [this pull request from Prometheus](https://github.com/prometheus/prometheus/pull/8457).
* BUGFIX: improve the detection of the needed free space for background merge operation. This should prevent from possible out of disk space crashes during big merges. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1560).
## [v1.64.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.64.1) ## [v1.64.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.64.1)

View file

@ -50,13 +50,11 @@ const defaultPartsToMerge = 15
// write amplification. // write amplification.
const finalPartsToMerge = 2 const finalPartsToMerge = 2
// maxItemsPerPart is the absolute maximum number of items per part. // maxPartSize is the maximum part size in bytes.
// //
// This number should be limited by the amount of time required to merge // This number should be limited by the amount of time required to merge parts of this summary size.
// such number of items. The required time shouldn't exceed a day. // The required time shouldn't exceed a day.
// const maxPartSize = 400e9
// TODO: adjust this number using production stats.
const maxItemsPerPart = 100e9
// maxItemsPerCachedPart is the maximum items per created part by the merge, // maxItemsPerCachedPart is the maximum items per created part by the merge,
// which must be cached in the OS page cache. // which must be cached in the OS page cache.
@ -790,13 +788,15 @@ func (tb *Table) startPartMergers() {
} }
func (tb *Table) mergeExistingParts(isFinal bool) error { func (tb *Table) mergeExistingParts(isFinal bool) error {
maxItems := tb.maxOutPartItems() n := fs.MustGetFreeSpace(tb.path)
if maxItems > maxItemsPerPart { // Divide free space by the max number of concurrent merges.
maxItems = maxItemsPerPart maxOutBytes := n / uint64(mergeWorkersCount)
if maxOutBytes > maxPartSize {
maxOutBytes = maxPartSize
} }
tb.partsLock.Lock() tb.partsLock.Lock()
pws := getPartsToMerge(tb.parts, maxItems, isFinal) pws := getPartsToMerge(tb.parts, maxOutBytes, isFinal)
tb.partsLock.Unlock() tb.partsLock.Unlock()
return tb.mergeParts(pws, tb.stopCh, false) return tb.mergeParts(pws, tb.stopCh, false)
@ -1045,33 +1045,6 @@ func (tb *Table) nextMergeIdx() uint64 {
return atomic.AddUint64(&tb.mergeIdx, 1) return atomic.AddUint64(&tb.mergeIdx, 1)
} }
var (
maxOutPartItemsLock sync.Mutex
maxOutPartItemsDeadline uint64
lastMaxOutPartItems uint64
)
func (tb *Table) maxOutPartItems() uint64 {
maxOutPartItemsLock.Lock()
if maxOutPartItemsDeadline < fasttime.UnixTimestamp() {
lastMaxOutPartItems = tb.maxOutPartItemsSlow()
maxOutPartItemsDeadline = fasttime.UnixTimestamp() + 2
}
n := lastMaxOutPartItems
maxOutPartItemsLock.Unlock()
return n
}
func (tb *Table) maxOutPartItemsSlow() uint64 {
freeSpace := fs.MustGetFreeSpace(tb.path)
// Calculate the maximum number of items in the output merge part
// by dividing the freeSpace by 4 and by the number of concurrent
// mergeWorkersCount.
// This assumes each item is compressed into 4 bytes.
return freeSpace / uint64(mergeWorkersCount) / 4
}
var mergeWorkersCount = cgroup.AvailableCPUs() var mergeWorkersCount = cgroup.AvailableCPUs()
func openParts(path string) ([]*partWrapper, error) { func openParts(path string) ([]*partWrapper, error) {
@ -1371,8 +1344,8 @@ func validatePath(pathPrefix, path string) (string, error) {
// //
// if isFinal is set, then merge harder. // if isFinal is set, then merge harder.
// //
// The returned parts will contain less than maxItems items. // The summary size of the returned parts must be smaller than the maxOutBytes.
func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partWrapper { func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*partWrapper {
pwsRemaining := make([]*partWrapper, 0, len(pws)) pwsRemaining := make([]*partWrapper, 0, len(pws))
for _, pw := range pws { for _, pw := range pws {
if !pw.isInMerge { if !pw.isInMerge {
@ -1383,11 +1356,11 @@ func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partW
var dst []*partWrapper var dst []*partWrapper
if isFinal { if isFinal {
for len(dst) == 0 && maxPartsToMerge >= finalPartsToMerge { for len(dst) == 0 && maxPartsToMerge >= finalPartsToMerge {
dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxItems) dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
maxPartsToMerge-- maxPartsToMerge--
} }
} else { } else {
dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxItems) dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
} }
for _, pw := range dst { for _, pw := range dst {
if pw.isInMerge { if pw.isInMerge {
@ -1398,9 +1371,17 @@ func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partW
return dst return dst
} }
// minMergeMultiplier is the minimum multiplier for the size of the output part
// compared to the size of the maximum input part for the merge.
//
// Higher value reduces write amplification (disk write IO induced by the merge),
// while increases the number of unmerged parts.
// The 1.7 is good enough for production workloads.
const minMergeMultiplier = 1.7
// appendPartsToMerge finds optimal parts to merge from src, appends // appendPartsToMerge finds optimal parts to merge from src, appends
// them to dst and returns the result. // them to dst and returns the result.
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems uint64) []*partWrapper { func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) []*partWrapper {
if len(src) < 2 { if len(src) < 2 {
// There is no need in merging zero or one part :) // There is no need in merging zero or one part :)
return dst return dst
@ -1411,18 +1392,18 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
// Filter out too big parts. // Filter out too big parts.
// This should reduce N for O(n^2) algorithm below. // This should reduce N for O(n^2) algorithm below.
maxInPartItems := maxItems / 2 maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier)
tmp := make([]*partWrapper, 0, len(src)) tmp := make([]*partWrapper, 0, len(src))
for _, pw := range src { for _, pw := range src {
if pw.p.ph.itemsCount > maxInPartItems { if pw.p.size > maxInPartBytes {
continue continue
} }
tmp = append(tmp, pw) tmp = append(tmp, pw)
} }
src = tmp src = tmp
// Sort src parts by itemsCount. // Sort src parts by size.
sort.Slice(src, func(i, j int) bool { return src[i].p.ph.itemsCount < src[j].p.ph.itemsCount }) sort.Slice(src, func(i, j int) bool { return src[i].p.size < src[j].p.size })
maxSrcParts := maxPartsToMerge maxSrcParts := maxPartsToMerge
if maxSrcParts > len(src) { if maxSrcParts > len(src) {
@ -1439,20 +1420,20 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
for i := minSrcParts; i <= maxSrcParts; i++ { for i := minSrcParts; i <= maxSrcParts; i++ {
for j := 0; j <= len(src)-i; j++ { for j := 0; j <= len(src)-i; j++ {
a := src[j : j+i] a := src[j : j+i]
if a[0].p.ph.itemsCount*uint64(len(a)) < a[len(a)-1].p.ph.itemsCount { if a[0].p.size*uint64(len(a)) < a[len(a)-1].p.size {
// Do not merge parts with too big difference in items count, // Do not merge parts with too big difference in size,
// since this results in unbalanced merges. // since this results in unbalanced merges.
continue continue
} }
itemsSum := uint64(0) outBytes := uint64(0)
for _, pw := range a { for _, pw := range a {
itemsSum += pw.p.ph.itemsCount outBytes += pw.p.size
} }
if itemsSum > maxItems { if outBytes > maxOutBytes {
// There is no sense in checking the remaining bigger parts. // There is no sense in checking the remaining bigger parts.
break break
} }
m := float64(itemsSum) / float64(a[len(a)-1].p.ph.itemsCount) m := float64(outBytes) / float64(a[len(a)-1].p.size)
if m < maxM { if m < maxM {
continue continue
} }
@ -1462,11 +1443,12 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
} }
minM := float64(maxPartsToMerge) / 2 minM := float64(maxPartsToMerge) / 2
if minM < 1.7 { if minM < minMergeMultiplier {
minM = 1.7 minM = minMergeMultiplier
} }
if maxM < minM { if maxM < minM {
// There is no sense in merging parts with too small m. // There is no sense in merging parts with too small m,
// since this leads to high disk write IO.
return dst return dst
} }
return append(dst, pws...) return append(dst, pws...)

View file

@ -133,7 +133,7 @@ func (p *part) MustClose() {
p.valuesFile.MustClose() p.valuesFile.MustClose()
p.indexFile.MustClose() p.indexFile.MustClose()
isBig := p.ph.RowsCount > maxRowsPerSmallPart() isBig := p.size > maxSmallPartSize()
p.ibCache.MustClose(isBig) p.ibCache.MustClose(isBig)
} }

View file

@ -35,27 +35,26 @@ var (
historicalSmallIndexBlocksCacheMisses uint64 historicalSmallIndexBlocksCacheMisses uint64
) )
func maxRowsPerSmallPart() uint64 { func maxSmallPartSize() uint64 {
// Small parts are cached in the OS page cache, // Small parts are cached in the OS page cache,
// so limit the number of rows for small part by the remaining free RAM. // so limit their size by the remaining free RAM.
mem := memory.Remaining() mem := memory.Remaining()
// Production data shows that each row occupies ~1 byte in the compressed part.
// It is expected no more than defaultPartsToMerge/2 parts exist // It is expected no more than defaultPartsToMerge/2 parts exist
// in the OS page cache before they are merged into bigger part. // in the OS page cache before they are merged into bigger part.
// Half of the remaining RAM must be left for lib/mergeset parts, // Half of the remaining RAM must be left for lib/mergeset parts,
// so the maxItems is calculated using the below code: // so the maxItems is calculated using the below code:
maxRows := uint64(mem) / defaultPartsToMerge maxSize := uint64(mem) / defaultPartsToMerge
if maxRows < 10e6 { if maxSize < 10e6 {
maxRows = 10e6 maxSize = 10e6
} }
return maxRows return maxSize
} }
// The maximum number of rows per big part. // The maximum size of big part.
// //
// This number limits the maximum time required for building big part. // This number limits the maximum time required for building big part.
// This time shouldn't exceed a few days. // This time shouldn't exceed a few days.
const maxRowsPerBigPart = 1e12 const maxBigPartSize = 1e12
// The maximum number of small parts in the partition. // The maximum number of small parts in the partition.
const maxSmallPartsPerPartition = 256 const maxSmallPartsPerPartition = 256
@ -977,26 +976,21 @@ func SetFinalMergeDelay(delay time.Duration) {
finalMergeDelaySeconds = uint64(delay.Seconds() + 1) finalMergeDelaySeconds = uint64(delay.Seconds() + 1)
} }
func maxRowsByPath(path string) uint64 { func getMaxOutBytes(path string, workersCount int) uint64 {
freeSpace := fs.MustGetFreeSpace(path) n := fs.MustGetFreeSpace(path)
// Divide free space by the max number concurrent merges.
// Calculate the maximum number of rows in the output merge part maxOutBytes := n / uint64(workersCount)
// by dividing the freeSpace by the maximum number of concurrent if maxOutBytes > maxBigPartSize {
// workers for big parts. maxOutBytes = maxBigPartSize
// This assumes each row is compressed into 1 byte
// according to production data.
maxRows := freeSpace / uint64(bigMergeWorkersCount)
if maxRows > maxRowsPerBigPart {
maxRows = maxRowsPerBigPart
} }
return maxRows return maxOutBytes
} }
func (pt *partition) mergeBigParts(isFinal bool) error { func (pt *partition) mergeBigParts(isFinal bool) error {
maxRows := maxRowsByPath(pt.bigPartsPath) maxOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
pt.partsLock.Lock() pt.partsLock.Lock()
pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal) pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxOutBytes, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace) atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace)
@ -1005,29 +999,29 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
func (pt *partition) mergeSmallParts(isFinal bool) error { func (pt *partition) mergeSmallParts(isFinal bool) error {
// Try merging small parts to a big part at first. // Try merging small parts to a big part at first.
maxBigPartRows := maxRowsByPath(pt.bigPartsPath) maxBigPartOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount)
pt.partsLock.Lock() pt.partsLock.Lock()
pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartRows, isFinal) pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartOutBytes, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace) atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace)
rowsCount := getRowsCount(pws) outSize := getPartsSize(pws)
if rowsCount > maxRowsPerSmallPart() { if outSize > maxSmallPartSize() {
// Merge small parts to a big part. // Merge small parts to a big part.
return pt.mergeParts(pws, pt.stopCh) return pt.mergeParts(pws, pt.stopCh)
} }
// Make sure that the output small part fits small parts storage. // Make sure that the output small part fits small parts storage.
maxSmallPartRows := maxRowsByPath(pt.smallPartsPath) maxSmallPartOutBytes := getMaxOutBytes(pt.smallPartsPath, smallMergeWorkersCount)
if rowsCount <= maxSmallPartRows { if outSize <= maxSmallPartOutBytes {
// Merge small parts to a small part. // Merge small parts to a small part.
return pt.mergeParts(pws, pt.stopCh) return pt.mergeParts(pws, pt.stopCh)
} }
// The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartRows limit. // The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartOutBytes limit.
pt.releasePartsToMerge(pws) pt.releasePartsToMerge(pws)
pt.partsLock.Lock() pt.partsLock.Lock()
pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartRows, isFinal) pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartOutBytes, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace) atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace)
@ -1088,13 +1082,15 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
bsrs = append(bsrs, bsr) bsrs = append(bsrs, bsr)
} }
outSize := uint64(0)
outRowsCount := uint64(0) outRowsCount := uint64(0)
outBlocksCount := uint64(0) outBlocksCount := uint64(0)
for _, pw := range pws { for _, pw := range pws {
outSize += pw.p.size
outRowsCount += pw.p.ph.RowsCount outRowsCount += pw.p.ph.RowsCount
outBlocksCount += pw.p.ph.BlocksCount outBlocksCount += pw.p.ph.BlocksCount
} }
isBigPart := outRowsCount > maxRowsPerSmallPart() isBigPart := outSize > maxSmallPartSize()
nocache := isBigPart nocache := isBigPart
// Prepare BlockStreamWriter for destination part. // Prepare BlockStreamWriter for destination part.
@ -1343,9 +1339,9 @@ func (pt *partition) removeStaleParts() {
// getPartsToMerge returns optimal parts to merge from pws. // getPartsToMerge returns optimal parts to merge from pws.
// //
// The returned parts will contain less than maxRows rows. // The summary size of the returned parts must be smaller than maxOutBytes.
// The function returns true if pws contains parts, which cannot be merged because of maxRows limit. // The function returns true if pws contains parts, which cannot be merged because of maxOutBytes limit.
func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partWrapper, bool) { func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) ([]*partWrapper, bool) {
pwsRemaining := make([]*partWrapper, 0, len(pws)) pwsRemaining := make([]*partWrapper, 0, len(pws))
for _, pw := range pws { for _, pw := range pws {
if !pw.isInMerge { if !pw.isInMerge {
@ -1357,11 +1353,11 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partW
needFreeSpace := false needFreeSpace := false
if isFinal { if isFinal {
for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge { for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge {
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
maxPartsToMerge-- maxPartsToMerge--
} }
} else { } else {
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes)
} }
for _, pw := range pms { for _, pw := range pms {
if pw.isInMerge { if pw.isInMerge {
@ -1372,10 +1368,18 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partW
return pms, needFreeSpace return pms, needFreeSpace
} }
// minMergeMultiplier is the minimum multiplier for the size of the output part
// compared to the size of the maximum input part for the merge.
//
// Higher value reduces write amplification (disk write IO induced by the merge),
// while increases the number of unmerged parts.
// The 1.7 is good enough for production workloads.
const minMergeMultiplier = 1.7
// appendPartsToMerge finds optimal parts to merge from src, appends // appendPartsToMerge finds optimal parts to merge from src, appends
// them to dst and returns the result. // them to dst and returns the result.
// The function returns true if src contains parts, which cannot be merged because of maxRows limit. // The function returns true if src contains parts, which cannot be merged because of maxOutBytes limit.
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) ([]*partWrapper, bool) { func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) ([]*partWrapper, bool) {
if len(src) < 2 { if len(src) < 2 {
// There is no need in merging zero or one part :) // There is no need in merging zero or one part :)
return dst, false return dst, false
@ -1387,10 +1391,10 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
// Filter out too big parts. // Filter out too big parts.
// This should reduce N for O(N^2) algorithm below. // This should reduce N for O(N^2) algorithm below.
skippedBigParts := 0 skippedBigParts := 0
maxInPartRows := maxRows / 2 maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier)
tmp := make([]*partWrapper, 0, len(src)) tmp := make([]*partWrapper, 0, len(src))
for _, pw := range src { for _, pw := range src {
if pw.p.ph.RowsCount > maxInPartRows { if pw.p.size > maxInPartBytes {
skippedBigParts++ skippedBigParts++
continue continue
} }
@ -1399,15 +1403,15 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
src = tmp src = tmp
needFreeSpace := skippedBigParts > 1 needFreeSpace := skippedBigParts > 1
// Sort src parts by rows count and backwards timestamp. // Sort src parts by size and backwards timestamp.
// This should improve adjanced points' locality in the merged parts. // This should improve adjanced points' locality in the merged parts.
sort.Slice(src, func(i, j int) bool { sort.Slice(src, func(i, j int) bool {
a := &src[i].p.ph a := src[i].p
b := &src[j].p.ph b := src[j].p
if a.RowsCount == b.RowsCount { if a.size == b.size {
return a.MinTimestamp > b.MinTimestamp return a.ph.MinTimestamp > b.ph.MinTimestamp
} }
return a.RowsCount < b.RowsCount return a.size < b.size
}) })
maxSrcParts := maxPartsToMerge maxSrcParts := maxPartsToMerge
@ -1425,20 +1429,20 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
for i := minSrcParts; i <= maxSrcParts; i++ { for i := minSrcParts; i <= maxSrcParts; i++ {
for j := 0; j <= len(src)-i; j++ { for j := 0; j <= len(src)-i; j++ {
a := src[j : j+i] a := src[j : j+i]
rowsCount := getRowsCount(a) outSize := getPartsSize(a)
if rowsCount > maxRows { if outSize > maxOutBytes {
needFreeSpace = true needFreeSpace = true
} }
if a[0].p.ph.RowsCount*uint64(len(a)) < a[len(a)-1].p.ph.RowsCount { if a[0].p.size*uint64(len(a)) < a[len(a)-1].p.size {
// Do not merge parts with too big difference in rows count, // Do not merge parts with too big difference in size,
// since this results in unbalanced merges. // since this results in unbalanced merges.
continue continue
} }
if rowsCount > maxRows { if outSize > maxOutBytes {
// There is no need in verifying remaining parts with higher number of rows // There is no need in verifying remaining parts with bigger sizes.
break break
} }
m := float64(rowsCount) / float64(a[len(a)-1].p.ph.RowsCount) m := float64(outSize) / float64(a[len(a)-1].p.size)
if m < maxM { if m < maxM {
continue continue
} }
@ -1448,20 +1452,21 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
} }
minM := float64(maxPartsToMerge) / 2 minM := float64(maxPartsToMerge) / 2
if minM < 1.7 { if minM < minMergeMultiplier {
minM = 1.7 minM = minMergeMultiplier
} }
if maxM < minM { if maxM < minM {
// There is no sense in merging parts with too small m. // There is no sense in merging parts with too small m,
// since this leads to high disk write IO.
return dst, needFreeSpace return dst, needFreeSpace
} }
return append(dst, pws...), needFreeSpace return append(dst, pws...), needFreeSpace
} }
func getRowsCount(pws []*partWrapper) uint64 { func getPartsSize(pws []*partWrapper) uint64 {
n := uint64(0) n := uint64(0)
for _, pw := range pws { for _, pw := range pws {
n += pw.p.ph.RowsCount n += pw.p.size
} }
return n return n
} }

View file

@ -6,10 +6,10 @@ import (
"testing" "testing"
) )
func TestPartitionMaxRowsByPath(t *testing.T) { func TestPartitionGetMaxOutBytes(t *testing.T) {
n := maxRowsByPath(".") n := getMaxOutBytes(".", 1)
if n < 1e3 { if n < 1e3 {
t.Fatalf("too small number of rows can be created in the current directory: %d", n) t.Fatalf("too small free space remained in the current directory: %d", n)
} }
} }
@ -35,10 +35,10 @@ func TestAppendPartsToMerge(t *testing.T) {
} }
func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) { func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) {
f := func(a []uint64, maxItems int, expectedNeedFreeSpace bool) { f := func(sizes []uint64, maxOutBytes int, expectedNeedFreeSpace bool) {
t.Helper() t.Helper()
pws := newTestPartWrappersForRowsCount(a) pws := newTestPartWrappersForSizes(sizes)
_, needFreeSpace := appendPartsToMerge(nil, pws, defaultPartsToMerge, uint64(maxItems)) _, needFreeSpace := appendPartsToMerge(nil, pws, defaultPartsToMerge, uint64(maxOutBytes))
if needFreeSpace != expectedNeedFreeSpace { if needFreeSpace != expectedNeedFreeSpace {
t.Fatalf("unexpected needFreeSpace; got %v; want %v", needFreeSpace, expectedNeedFreeSpace) t.Fatalf("unexpected needFreeSpace; got %v; want %v", needFreeSpace, expectedNeedFreeSpace)
} }
@ -46,7 +46,7 @@ func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) {
f(nil, 1000, false) f(nil, 1000, false)
f([]uint64{1000}, 100, false) f([]uint64{1000}, 100, false)
f([]uint64{1000}, 1100, false) f([]uint64{1000}, 1100, false)
f([]uint64{100, 200}, 180, true) f([]uint64{120, 200}, 180, true)
f([]uint64{100, 200}, 310, false) f([]uint64{100, 200}, 310, false)
f([]uint64{100, 110, 109, 1}, 300, true) f([]uint64{100, 110, 109, 1}, 300, true)
f([]uint64{100, 110, 109, 1}, 330, false) f([]uint64{100, 110, 109, 1}, 330, false)
@ -55,8 +55,8 @@ func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) {
func TestAppendPartsToMergeManyParts(t *testing.T) { func TestAppendPartsToMergeManyParts(t *testing.T) {
// Verify that big number of parts are merged into minimal number of parts // Verify that big number of parts are merged into minimal number of parts
// using minimum merges. // using minimum merges.
var a []uint64 var sizes []uint64
maxOutPartRows := uint64(0) maxOutSize := uint64(0)
r := rand.New(rand.NewSource(1)) r := rand.New(rand.NewSource(1))
for i := 0; i < 1024; i++ { for i := 0; i < 1024; i++ {
n := uint64(uint32(r.NormFloat64() * 1e9)) n := uint64(uint32(r.NormFloat64() * 1e9))
@ -64,15 +64,15 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
n = -n n = -n
} }
n++ n++
maxOutPartRows += n maxOutSize += n
a = append(a, n) sizes = append(sizes, n)
} }
pws := newTestPartWrappersForRowsCount(a) pws := newTestPartWrappersForSizes(sizes)
iterationsCount := 0 iterationsCount := 0
rowsMerged := uint64(0) sizeMergedTotal := uint64(0)
for { for {
pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows) pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutSize)
if len(pms) == 0 { if len(pms) == 0 {
break break
} }
@ -81,61 +81,58 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
m[pw] = true m[pw] = true
} }
var pwsNew []*partWrapper var pwsNew []*partWrapper
rowsCount := uint64(0) size := uint64(0)
for _, pw := range pws { for _, pw := range pws {
if m[pw] { if m[pw] {
rowsCount += pw.p.ph.RowsCount size += pw.p.size
} else { } else {
pwsNew = append(pwsNew, pw) pwsNew = append(pwsNew, pw)
} }
} }
pw := &partWrapper{ pw := &partWrapper{
p: &part{}, p: &part{
size: size,
},
} }
pw.p.ph = partHeader{ sizeMergedTotal += size
RowsCount: rowsCount,
}
rowsMerged += rowsCount
pwsNew = append(pwsNew, pw) pwsNew = append(pwsNew, pw)
pws = pwsNew pws = pwsNew
iterationsCount++ iterationsCount++
} }
rowsCount := newTestRowsCountFromPartWrappers(pws) sizes = newTestSizesFromPartWrappers(pws)
rowsTotal := uint64(0) sizeTotal := uint64(0)
for _, rc := range rowsCount { for _, size := range sizes {
rowsTotal += uint64(rc) sizeTotal += uint64(size)
} }
overhead := float64(rowsMerged) / float64(rowsTotal) overhead := float64(sizeMergedTotal) / float64(sizeTotal)
if overhead > 2.1 { if overhead > 2.1 {
t.Fatalf("too big overhead; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f", t.Fatalf("too big overhead; sizes=%d, iterationsCount=%d, sizeTotal=%d, sizeMergedTotal=%d, overhead=%f",
rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead) sizes, iterationsCount, sizeTotal, sizeMergedTotal, overhead)
} }
if len(rowsCount) > 18 { if len(sizes) > 18 {
t.Fatalf("too many rowsCount %d; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f", t.Fatalf("too many sizes %d; sizes=%d, iterationsCount=%d, sizeTotal=%d, sizeMergedTotal=%d, overhead=%f",
len(rowsCount), rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead) len(sizes), sizes, iterationsCount, sizeTotal, sizeMergedTotal, overhead)
} }
} }
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) { func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialSizes, expectedSizes []uint64) {
t.Helper() t.Helper()
pws := newTestPartWrappersForRowsCount(initialRowsCount) pws := newTestPartWrappersForSizes(initialSizes)
// Verify appending to nil. // Verify appending to nil.
pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9) pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9)
rowsCount := newTestRowsCountFromPartWrappers(pms) sizes := newTestSizesFromPartWrappers(pms)
if !reflect.DeepEqual(rowsCount, expectedRowsCount) { if !reflect.DeepEqual(sizes, expectedSizes) {
t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d", t.Fatalf("unexpected size for maxPartsToMerge=%d, initialSizes=%d; got\n%d; want\n%d",
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount) maxPartsToMerge, initialSizes, sizes, expectedSizes)
} }
// Verify appending to prefix // Verify appending to prefix
prefix := []*partWrapper{ prefix := []*partWrapper{
{ {
p: &part{ p: &part{
ph: partHeader{ size: 1234,
RowsCount: 1234,
},
}, },
}, },
{}, {},
@ -143,33 +140,31 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
} }
pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9) pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9)
if !reflect.DeepEqual(pms[:len(prefix)], prefix) { if !reflect.DeepEqual(pms[:len(prefix)], prefix) {
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v", t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialSizes=%d; got\n%+v; want\n%+v",
maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix) maxPartsToMerge, initialSizes, pms[:len(prefix)], prefix)
} }
rowsCount = newTestRowsCountFromPartWrappers(pms[len(prefix):]) sizes = newTestSizesFromPartWrappers(pms[len(prefix):])
if !reflect.DeepEqual(rowsCount, expectedRowsCount) { if !reflect.DeepEqual(sizes, expectedSizes) {
t.Fatalf("unexpected prefixed rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d", t.Fatalf("unexpected prefixed sizes for maxPartsToMerge=%d, initialSizes=%d; got\n%d; want\n%d",
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount) maxPartsToMerge, initialSizes, sizes, expectedSizes)
} }
} }
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 { func newTestSizesFromPartWrappers(pws []*partWrapper) []uint64 {
var rowsCount []uint64 var sizes []uint64
for _, pw := range pws { for _, pw := range pws {
rowsCount = append(rowsCount, pw.p.ph.RowsCount) sizes = append(sizes, pw.p.size)
} }
return rowsCount return sizes
} }
func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper { func newTestPartWrappersForSizes(sizes []uint64) []*partWrapper {
var pws []*partWrapper var pws []*partWrapper
for _, rc := range rowsCount { for _, size := range sizes {
pw := &partWrapper{ pw := &partWrapper{
p: &part{ p: &part{
ph: partHeader{ size: size,
RowsCount: rc,
},
}, },
} }
pws = append(pws, pw) pws = append(pws, pw)