diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1d16570de..b554dc5ce 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -12,6 +12,7 @@ sort: 15 * FEATURE: update Go builder from v1.16.7 to v1.17.0. This improves data ingestion and query performance by up to 5% according to benchmarks. See [the release post for Go1.17](https://go.dev/blog/go1.17). * BUGFIX: rename `sign` function to `sgn` in order to be consistent with PromQL. See [this pull request from Prometheus](https://github.com/prometheus/prometheus/pull/8457). +* BUGFIX: improve the detection of the needed free space for background merge operation. This should prevent from possible out of disk space crashes during big merges. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1560). ## [v1.64.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.64.1) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index d0ebe237b..2d3ccfe74 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -50,13 +50,11 @@ const defaultPartsToMerge = 15 // write amplification. const finalPartsToMerge = 2 -// maxItemsPerPart is the absolute maximum number of items per part. +// maxPartSize is the maximum part size in bytes. // -// This number should be limited by the amount of time required to merge -// such number of items. The required time shouldn't exceed a day. -// -// TODO: adjust this number using production stats. -const maxItemsPerPart = 100e9 +// This number should be limited by the amount of time required to merge parts of this summary size. +// The required time shouldn't exceed a day. +const maxPartSize = 400e9 // maxItemsPerCachedPart is the maximum items per created part by the merge, // which must be cached in the OS page cache. @@ -790,13 +788,15 @@ func (tb *Table) startPartMergers() { } func (tb *Table) mergeExistingParts(isFinal bool) error { - maxItems := tb.maxOutPartItems() - if maxItems > maxItemsPerPart { - maxItems = maxItemsPerPart + n := fs.MustGetFreeSpace(tb.path) + // Divide free space by the max number of concurrent merges. + maxOutBytes := n / uint64(mergeWorkersCount) + if maxOutBytes > maxPartSize { + maxOutBytes = maxPartSize } tb.partsLock.Lock() - pws := getPartsToMerge(tb.parts, maxItems, isFinal) + pws := getPartsToMerge(tb.parts, maxOutBytes, isFinal) tb.partsLock.Unlock() return tb.mergeParts(pws, tb.stopCh, false) @@ -1045,33 +1045,6 @@ func (tb *Table) nextMergeIdx() uint64 { return atomic.AddUint64(&tb.mergeIdx, 1) } -var ( - maxOutPartItemsLock sync.Mutex - maxOutPartItemsDeadline uint64 - lastMaxOutPartItems uint64 -) - -func (tb *Table) maxOutPartItems() uint64 { - maxOutPartItemsLock.Lock() - if maxOutPartItemsDeadline < fasttime.UnixTimestamp() { - lastMaxOutPartItems = tb.maxOutPartItemsSlow() - maxOutPartItemsDeadline = fasttime.UnixTimestamp() + 2 - } - n := lastMaxOutPartItems - maxOutPartItemsLock.Unlock() - return n -} - -func (tb *Table) maxOutPartItemsSlow() uint64 { - freeSpace := fs.MustGetFreeSpace(tb.path) - - // Calculate the maximum number of items in the output merge part - // by dividing the freeSpace by 4 and by the number of concurrent - // mergeWorkersCount. - // This assumes each item is compressed into 4 bytes. - return freeSpace / uint64(mergeWorkersCount) / 4 -} - var mergeWorkersCount = cgroup.AvailableCPUs() func openParts(path string) ([]*partWrapper, error) { @@ -1371,8 +1344,8 @@ func validatePath(pathPrefix, path string) (string, error) { // // if isFinal is set, then merge harder. // -// The returned parts will contain less than maxItems items. -func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partWrapper { +// The summary size of the returned parts must be smaller than the maxOutBytes. +func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*partWrapper { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { if !pw.isInMerge { @@ -1383,11 +1356,11 @@ func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partW var dst []*partWrapper if isFinal { for len(dst) == 0 && maxPartsToMerge >= finalPartsToMerge { - dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxItems) + dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) maxPartsToMerge-- } } else { - dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxItems) + dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) } for _, pw := range dst { if pw.isInMerge { @@ -1398,9 +1371,17 @@ func getPartsToMerge(pws []*partWrapper, maxItems uint64, isFinal bool) []*partW return dst } +// minMergeMultiplier is the minimum multiplier for the size of the output part +// compared to the size of the maximum input part for the merge. +// +// Higher value reduces write amplification (disk write IO induced by the merge), +// while increases the number of unmerged parts. +// The 1.7 is good enough for production workloads. +const minMergeMultiplier = 1.7 + // appendPartsToMerge finds optimal parts to merge from src, appends // them to dst and returns the result. -func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems uint64) []*partWrapper { +func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) []*partWrapper { if len(src) < 2 { // There is no need in merging zero or one part :) return dst @@ -1411,18 +1392,18 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u // Filter out too big parts. // This should reduce N for O(n^2) algorithm below. - maxInPartItems := maxItems / 2 + maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier) tmp := make([]*partWrapper, 0, len(src)) for _, pw := range src { - if pw.p.ph.itemsCount > maxInPartItems { + if pw.p.size > maxInPartBytes { continue } tmp = append(tmp, pw) } src = tmp - // Sort src parts by itemsCount. - sort.Slice(src, func(i, j int) bool { return src[i].p.ph.itemsCount < src[j].p.ph.itemsCount }) + // Sort src parts by size. + sort.Slice(src, func(i, j int) bool { return src[i].p.size < src[j].p.size }) maxSrcParts := maxPartsToMerge if maxSrcParts > len(src) { @@ -1439,20 +1420,20 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u for i := minSrcParts; i <= maxSrcParts; i++ { for j := 0; j <= len(src)-i; j++ { a := src[j : j+i] - if a[0].p.ph.itemsCount*uint64(len(a)) < a[len(a)-1].p.ph.itemsCount { - // Do not merge parts with too big difference in items count, + if a[0].p.size*uint64(len(a)) < a[len(a)-1].p.size { + // Do not merge parts with too big difference in size, // since this results in unbalanced merges. continue } - itemsSum := uint64(0) + outBytes := uint64(0) for _, pw := range a { - itemsSum += pw.p.ph.itemsCount + outBytes += pw.p.size } - if itemsSum > maxItems { + if outBytes > maxOutBytes { // There is no sense in checking the remaining bigger parts. break } - m := float64(itemsSum) / float64(a[len(a)-1].p.ph.itemsCount) + m := float64(outBytes) / float64(a[len(a)-1].p.size) if m < maxM { continue } @@ -1462,11 +1443,12 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u } minM := float64(maxPartsToMerge) / 2 - if minM < 1.7 { - minM = 1.7 + if minM < minMergeMultiplier { + minM = minMergeMultiplier } if maxM < minM { - // There is no sense in merging parts with too small m. + // There is no sense in merging parts with too small m, + // since this leads to high disk write IO. return dst } return append(dst, pws...) diff --git a/lib/storage/part.go b/lib/storage/part.go index 9638df758..d7a68ea18 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -133,7 +133,7 @@ func (p *part) MustClose() { p.valuesFile.MustClose() p.indexFile.MustClose() - isBig := p.ph.RowsCount > maxRowsPerSmallPart() + isBig := p.size > maxSmallPartSize() p.ibCache.MustClose(isBig) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index f69967f92..ff3c8d30f 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -35,27 +35,26 @@ var ( historicalSmallIndexBlocksCacheMisses uint64 ) -func maxRowsPerSmallPart() uint64 { +func maxSmallPartSize() uint64 { // Small parts are cached in the OS page cache, - // so limit the number of rows for small part by the remaining free RAM. + // so limit their size by the remaining free RAM. mem := memory.Remaining() - // Production data shows that each row occupies ~1 byte in the compressed part. // It is expected no more than defaultPartsToMerge/2 parts exist // in the OS page cache before they are merged into bigger part. // Half of the remaining RAM must be left for lib/mergeset parts, // so the maxItems is calculated using the below code: - maxRows := uint64(mem) / defaultPartsToMerge - if maxRows < 10e6 { - maxRows = 10e6 + maxSize := uint64(mem) / defaultPartsToMerge + if maxSize < 10e6 { + maxSize = 10e6 } - return maxRows + return maxSize } -// The maximum number of rows per big part. +// The maximum size of big part. // // This number limits the maximum time required for building big part. // This time shouldn't exceed a few days. -const maxRowsPerBigPart = 1e12 +const maxBigPartSize = 1e12 // The maximum number of small parts in the partition. const maxSmallPartsPerPartition = 256 @@ -977,26 +976,21 @@ func SetFinalMergeDelay(delay time.Duration) { finalMergeDelaySeconds = uint64(delay.Seconds() + 1) } -func maxRowsByPath(path string) uint64 { - freeSpace := fs.MustGetFreeSpace(path) - - // Calculate the maximum number of rows in the output merge part - // by dividing the freeSpace by the maximum number of concurrent - // workers for big parts. - // This assumes each row is compressed into 1 byte - // according to production data. - maxRows := freeSpace / uint64(bigMergeWorkersCount) - if maxRows > maxRowsPerBigPart { - maxRows = maxRowsPerBigPart +func getMaxOutBytes(path string, workersCount int) uint64 { + n := fs.MustGetFreeSpace(path) + // Divide free space by the max number concurrent merges. + maxOutBytes := n / uint64(workersCount) + if maxOutBytes > maxBigPartSize { + maxOutBytes = maxBigPartSize } - return maxRows + return maxOutBytes } func (pt *partition) mergeBigParts(isFinal bool) error { - maxRows := maxRowsByPath(pt.bigPartsPath) + maxOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount) pt.partsLock.Lock() - pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal) + pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxOutBytes, isFinal) pt.partsLock.Unlock() atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace) @@ -1005,29 +999,29 @@ func (pt *partition) mergeBigParts(isFinal bool) error { func (pt *partition) mergeSmallParts(isFinal bool) error { // Try merging small parts to a big part at first. - maxBigPartRows := maxRowsByPath(pt.bigPartsPath) + maxBigPartOutBytes := getMaxOutBytes(pt.bigPartsPath, bigMergeWorkersCount) pt.partsLock.Lock() - pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartRows, isFinal) + pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartOutBytes, isFinal) pt.partsLock.Unlock() atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace) - rowsCount := getRowsCount(pws) - if rowsCount > maxRowsPerSmallPart() { + outSize := getPartsSize(pws) + if outSize > maxSmallPartSize() { // Merge small parts to a big part. return pt.mergeParts(pws, pt.stopCh) } // Make sure that the output small part fits small parts storage. - maxSmallPartRows := maxRowsByPath(pt.smallPartsPath) - if rowsCount <= maxSmallPartRows { + maxSmallPartOutBytes := getMaxOutBytes(pt.smallPartsPath, smallMergeWorkersCount) + if outSize <= maxSmallPartOutBytes { // Merge small parts to a small part. return pt.mergeParts(pws, pt.stopCh) } - // The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartRows limit. + // The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartOutBytes limit. pt.releasePartsToMerge(pws) pt.partsLock.Lock() - pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartRows, isFinal) + pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartOutBytes, isFinal) pt.partsLock.Unlock() atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace) @@ -1088,13 +1082,15 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro bsrs = append(bsrs, bsr) } + outSize := uint64(0) outRowsCount := uint64(0) outBlocksCount := uint64(0) for _, pw := range pws { + outSize += pw.p.size outRowsCount += pw.p.ph.RowsCount outBlocksCount += pw.p.ph.BlocksCount } - isBigPart := outRowsCount > maxRowsPerSmallPart() + isBigPart := outSize > maxSmallPartSize() nocache := isBigPart // Prepare BlockStreamWriter for destination part. @@ -1343,9 +1339,9 @@ func (pt *partition) removeStaleParts() { // getPartsToMerge returns optimal parts to merge from pws. // -// The returned parts will contain less than maxRows rows. -// The function returns true if pws contains parts, which cannot be merged because of maxRows limit. -func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partWrapper, bool) { +// The summary size of the returned parts must be smaller than maxOutBytes. +// The function returns true if pws contains parts, which cannot be merged because of maxOutBytes limit. +func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) ([]*partWrapper, bool) { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { if !pw.isInMerge { @@ -1357,11 +1353,11 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partW needFreeSpace := false if isFinal { for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge { - pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) + pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) maxPartsToMerge-- } } else { - pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) + pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) } for _, pw := range pms { if pw.isInMerge { @@ -1372,10 +1368,18 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partW return pms, needFreeSpace } +// minMergeMultiplier is the minimum multiplier for the size of the output part +// compared to the size of the maximum input part for the merge. +// +// Higher value reduces write amplification (disk write IO induced by the merge), +// while increases the number of unmerged parts. +// The 1.7 is good enough for production workloads. +const minMergeMultiplier = 1.7 + // appendPartsToMerge finds optimal parts to merge from src, appends // them to dst and returns the result. -// The function returns true if src contains parts, which cannot be merged because of maxRows limit. -func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) ([]*partWrapper, bool) { +// The function returns true if src contains parts, which cannot be merged because of maxOutBytes limit. +func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) ([]*partWrapper, bool) { if len(src) < 2 { // There is no need in merging zero or one part :) return dst, false @@ -1387,10 +1391,10 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui // Filter out too big parts. // This should reduce N for O(N^2) algorithm below. skippedBigParts := 0 - maxInPartRows := maxRows / 2 + maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier) tmp := make([]*partWrapper, 0, len(src)) for _, pw := range src { - if pw.p.ph.RowsCount > maxInPartRows { + if pw.p.size > maxInPartBytes { skippedBigParts++ continue } @@ -1399,15 +1403,15 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui src = tmp needFreeSpace := skippedBigParts > 1 - // Sort src parts by rows count and backwards timestamp. + // Sort src parts by size and backwards timestamp. // This should improve adjanced points' locality in the merged parts. sort.Slice(src, func(i, j int) bool { - a := &src[i].p.ph - b := &src[j].p.ph - if a.RowsCount == b.RowsCount { - return a.MinTimestamp > b.MinTimestamp + a := src[i].p + b := src[j].p + if a.size == b.size { + return a.ph.MinTimestamp > b.ph.MinTimestamp } - return a.RowsCount < b.RowsCount + return a.size < b.size }) maxSrcParts := maxPartsToMerge @@ -1425,20 +1429,20 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui for i := minSrcParts; i <= maxSrcParts; i++ { for j := 0; j <= len(src)-i; j++ { a := src[j : j+i] - rowsCount := getRowsCount(a) - if rowsCount > maxRows { + outSize := getPartsSize(a) + if outSize > maxOutBytes { needFreeSpace = true } - if a[0].p.ph.RowsCount*uint64(len(a)) < a[len(a)-1].p.ph.RowsCount { - // Do not merge parts with too big difference in rows count, + if a[0].p.size*uint64(len(a)) < a[len(a)-1].p.size { + // Do not merge parts with too big difference in size, // since this results in unbalanced merges. continue } - if rowsCount > maxRows { - // There is no need in verifying remaining parts with higher number of rows + if outSize > maxOutBytes { + // There is no need in verifying remaining parts with bigger sizes. break } - m := float64(rowsCount) / float64(a[len(a)-1].p.ph.RowsCount) + m := float64(outSize) / float64(a[len(a)-1].p.size) if m < maxM { continue } @@ -1448,20 +1452,21 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui } minM := float64(maxPartsToMerge) / 2 - if minM < 1.7 { - minM = 1.7 + if minM < minMergeMultiplier { + minM = minMergeMultiplier } if maxM < minM { - // There is no sense in merging parts with too small m. + // There is no sense in merging parts with too small m, + // since this leads to high disk write IO. return dst, needFreeSpace } return append(dst, pws...), needFreeSpace } -func getRowsCount(pws []*partWrapper) uint64 { +func getPartsSize(pws []*partWrapper) uint64 { n := uint64(0) for _, pw := range pws { - n += pw.p.ph.RowsCount + n += pw.p.size } return n } diff --git a/lib/storage/partition_test.go b/lib/storage/partition_test.go index 7e3506f98..88cf9cb68 100644 --- a/lib/storage/partition_test.go +++ b/lib/storage/partition_test.go @@ -6,10 +6,10 @@ import ( "testing" ) -func TestPartitionMaxRowsByPath(t *testing.T) { - n := maxRowsByPath(".") +func TestPartitionGetMaxOutBytes(t *testing.T) { + n := getMaxOutBytes(".", 1) if n < 1e3 { - t.Fatalf("too small number of rows can be created in the current directory: %d", n) + t.Fatalf("too small free space remained in the current directory: %d", n) } } @@ -35,10 +35,10 @@ func TestAppendPartsToMerge(t *testing.T) { } func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) { - f := func(a []uint64, maxItems int, expectedNeedFreeSpace bool) { + f := func(sizes []uint64, maxOutBytes int, expectedNeedFreeSpace bool) { t.Helper() - pws := newTestPartWrappersForRowsCount(a) - _, needFreeSpace := appendPartsToMerge(nil, pws, defaultPartsToMerge, uint64(maxItems)) + pws := newTestPartWrappersForSizes(sizes) + _, needFreeSpace := appendPartsToMerge(nil, pws, defaultPartsToMerge, uint64(maxOutBytes)) if needFreeSpace != expectedNeedFreeSpace { t.Fatalf("unexpected needFreeSpace; got %v; want %v", needFreeSpace, expectedNeedFreeSpace) } @@ -46,7 +46,7 @@ func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) { f(nil, 1000, false) f([]uint64{1000}, 100, false) f([]uint64{1000}, 1100, false) - f([]uint64{100, 200}, 180, true) + f([]uint64{120, 200}, 180, true) f([]uint64{100, 200}, 310, false) f([]uint64{100, 110, 109, 1}, 300, true) f([]uint64{100, 110, 109, 1}, 330, false) @@ -55,8 +55,8 @@ func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) { func TestAppendPartsToMergeManyParts(t *testing.T) { // Verify that big number of parts are merged into minimal number of parts // using minimum merges. - var a []uint64 - maxOutPartRows := uint64(0) + var sizes []uint64 + maxOutSize := uint64(0) r := rand.New(rand.NewSource(1)) for i := 0; i < 1024; i++ { n := uint64(uint32(r.NormFloat64() * 1e9)) @@ -64,15 +64,15 @@ func TestAppendPartsToMergeManyParts(t *testing.T) { n = -n } n++ - maxOutPartRows += n - a = append(a, n) + maxOutSize += n + sizes = append(sizes, n) } - pws := newTestPartWrappersForRowsCount(a) + pws := newTestPartWrappersForSizes(sizes) iterationsCount := 0 - rowsMerged := uint64(0) + sizeMergedTotal := uint64(0) for { - pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows) + pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutSize) if len(pms) == 0 { break } @@ -81,61 +81,58 @@ func TestAppendPartsToMergeManyParts(t *testing.T) { m[pw] = true } var pwsNew []*partWrapper - rowsCount := uint64(0) + size := uint64(0) for _, pw := range pws { if m[pw] { - rowsCount += pw.p.ph.RowsCount + size += pw.p.size } else { pwsNew = append(pwsNew, pw) } } pw := &partWrapper{ - p: &part{}, + p: &part{ + size: size, + }, } - pw.p.ph = partHeader{ - RowsCount: rowsCount, - } - rowsMerged += rowsCount + sizeMergedTotal += size pwsNew = append(pwsNew, pw) pws = pwsNew iterationsCount++ } - rowsCount := newTestRowsCountFromPartWrappers(pws) - rowsTotal := uint64(0) - for _, rc := range rowsCount { - rowsTotal += uint64(rc) + sizes = newTestSizesFromPartWrappers(pws) + sizeTotal := uint64(0) + for _, size := range sizes { + sizeTotal += uint64(size) } - overhead := float64(rowsMerged) / float64(rowsTotal) + overhead := float64(sizeMergedTotal) / float64(sizeTotal) if overhead > 2.1 { - t.Fatalf("too big overhead; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f", - rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead) + t.Fatalf("too big overhead; sizes=%d, iterationsCount=%d, sizeTotal=%d, sizeMergedTotal=%d, overhead=%f", + sizes, iterationsCount, sizeTotal, sizeMergedTotal, overhead) } - if len(rowsCount) > 18 { - t.Fatalf("too many rowsCount %d; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f", - len(rowsCount), rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead) + if len(sizes) > 18 { + t.Fatalf("too many sizes %d; sizes=%d, iterationsCount=%d, sizeTotal=%d, sizeMergedTotal=%d, overhead=%f", + len(sizes), sizes, iterationsCount, sizeTotal, sizeMergedTotal, overhead) } } -func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) { +func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialSizes, expectedSizes []uint64) { t.Helper() - pws := newTestPartWrappersForRowsCount(initialRowsCount) + pws := newTestPartWrappersForSizes(initialSizes) // Verify appending to nil. pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9) - rowsCount := newTestRowsCountFromPartWrappers(pms) - if !reflect.DeepEqual(rowsCount, expectedRowsCount) { - t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d", - maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount) + sizes := newTestSizesFromPartWrappers(pms) + if !reflect.DeepEqual(sizes, expectedSizes) { + t.Fatalf("unexpected size for maxPartsToMerge=%d, initialSizes=%d; got\n%d; want\n%d", + maxPartsToMerge, initialSizes, sizes, expectedSizes) } // Verify appending to prefix prefix := []*partWrapper{ { p: &part{ - ph: partHeader{ - RowsCount: 1234, - }, + size: 1234, }, }, {}, @@ -143,33 +140,31 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, } pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9) if !reflect.DeepEqual(pms[:len(prefix)], prefix) { - t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v", - maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix) + t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialSizes=%d; got\n%+v; want\n%+v", + maxPartsToMerge, initialSizes, pms[:len(prefix)], prefix) } - rowsCount = newTestRowsCountFromPartWrappers(pms[len(prefix):]) - if !reflect.DeepEqual(rowsCount, expectedRowsCount) { - t.Fatalf("unexpected prefixed rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d", - maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount) + sizes = newTestSizesFromPartWrappers(pms[len(prefix):]) + if !reflect.DeepEqual(sizes, expectedSizes) { + t.Fatalf("unexpected prefixed sizes for maxPartsToMerge=%d, initialSizes=%d; got\n%d; want\n%d", + maxPartsToMerge, initialSizes, sizes, expectedSizes) } } -func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 { - var rowsCount []uint64 +func newTestSizesFromPartWrappers(pws []*partWrapper) []uint64 { + var sizes []uint64 for _, pw := range pws { - rowsCount = append(rowsCount, pw.p.ph.RowsCount) + sizes = append(sizes, pw.p.size) } - return rowsCount + return sizes } -func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper { +func newTestPartWrappersForSizes(sizes []uint64) []*partWrapper { var pws []*partWrapper - for _, rc := range rowsCount { + for _, size := range sizes { pw := &partWrapper{ p: &part{ - ph: partHeader{ - RowsCount: rc, - }, + size: size, }, } pws = append(pws, pw)