lib/storage: properly limit the number of output rows in small and big parts storage

Previously small parts storage didn't take into account the available disk space for big parts.
This commit is contained in:
Aliaksandr Valialkin 2019-08-25 14:10:43 +03:00
parent 3308279c4e
commit 1402a6b981
2 changed files with 19 additions and 13 deletions

View file

@ -808,8 +808,8 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error {
}
}
func (pt *partition) maxOutPartRows() uint64 {
freeSpace := mustGetFreeDiskSpace(pt.bigPartsPath)
func maxRowsByPath(path string) uint64 {
freeSpace := mustGetFreeDiskSpace(path)
// Calculate the maximum number of rows in the output merge part
// by dividing the freeSpace by the number of concurrent
@ -817,7 +817,11 @@ func (pt *partition) maxOutPartRows() uint64 {
// This assumes each row is compressed into 1 byte. Production
// simulation shows that each row usually occupies up to 0.5 bytes,
// so this is quite safe assumption.
return freeSpace / uint64(mergeWorkers)
maxRows := freeSpace / uint64(mergeWorkers)
if maxRows > maxRowsPerBigPart {
maxRows = maxRowsPerBigPart
}
return maxRows
}
func mustGetFreeDiskSpace(path string) uint64 {
@ -861,10 +865,7 @@ type freeSpaceEntry struct {
}
func (pt *partition) mergeBigParts(isFinal bool) error {
maxRows := pt.maxOutPartRows()
if maxRows > maxRowsPerBigPart {
maxRows = maxRowsPerBigPart
}
maxRows := maxRowsByPath(pt.bigPartsPath)
pt.partsLock.Lock()
pws := getPartsToMerge(pt.bigParts, maxRows, isFinal)
@ -883,7 +884,15 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
}
func (pt *partition) mergeSmallParts(isFinal bool) error {
maxRows := uint64(maxRowsPerSmallPart * defaultPartsToMerge)
maxRows := maxRowsByPath(pt.smallPartsPath)
if maxRows > maxRowsPerSmallPart {
// The output part may go to big part,
// so make sure it as enough space.
maxBigPartRows := maxRowsByPath(pt.bigPartsPath)
if maxRows > maxBigPartRows {
maxRows = maxBigPartRows
}
}
pt.partsLock.Lock()
pws := getPartsToMerge(pt.smallParts, maxRows, isFinal)

View file

@ -6,11 +6,8 @@ import (
"testing"
)
func TestPartitionMaxOutPartRows(t *testing.T) {
pt := &partition{
bigPartsPath: ".",
}
n := pt.maxOutPartRows()
func TestPartitionMaxRowsByPath(t *testing.T) {
n := maxRowsByPath(".")
if n < 1e3 {
t.Fatalf("too small number of rows can be created in the current directory: %d", n)
}