VictoriaMetrics/lib/storage/partition_test.go
Aliaksandr Valialkin b8a5ee2e93 lib/{mergeset,storage}: allow merging smaller number of small parts
While this may increase CPU and disk IO usage needed for background merge,
this also recudes CPU usage during queries in production. This is because
such queries tend to read recently added data and it is better to have lower number
of parts for such data in order to reduce CPU usage.

This partially reverts ebf8da3730
2021-02-21 21:43:37 +02:00

160 lines
4.8 KiB
Go

package storage
import (
"math/rand"
"reflect"
"testing"
)
func TestPartitionMaxRowsByPath(t *testing.T) {
n := maxRowsByPath(".")
if n < 1e3 {
t.Fatalf("too small number of rows can be created in the current directory: %d", n)
}
}
func TestAppendPartsToMerge(t *testing.T) {
testAppendPartsToMerge(t, 2, []uint64{}, nil)
testAppendPartsToMerge(t, 2, []uint64{123}, nil)
testAppendPartsToMerge(t, 2, []uint64{4, 2}, nil)
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 1}, nil)
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 10, 9, 7, 3, 1}, []uint64{3, 7, 9, 10})
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2})
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2, 4, 8})
testAppendPartsToMerge(t, 2, []uint64{1, 1}, []uint64{1, 1})
testAppendPartsToMerge(t, 2, []uint64{2, 2, 2}, []uint64{2, 2})
testAppendPartsToMerge(t, 2, []uint64{4, 2, 4}, []uint64{4, 4})
testAppendPartsToMerge(t, 2, []uint64{1, 3, 7, 2}, nil)
testAppendPartsToMerge(t, 3, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
testAppendPartsToMerge(t, 4, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
testAppendPartsToMerge(t, 5, []uint64{1, 3, 7, 2}, nil)
testAppendPartsToMerge(t, 4, []uint64{1e6, 3e6, 7e6, 2e6}, []uint64{1e6, 2e6, 3e6})
testAppendPartsToMerge(t, 4, []uint64{2, 3, 7, 2}, []uint64{2, 2, 3})
testAppendPartsToMerge(t, 5, []uint64{2, 3, 7, 2}, nil)
testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11})
}
func TestAppendPartsToMergeManyParts(t *testing.T) {
// Verify that big number of parts are merged into minimal number of parts
// using minimum merges.
var a []uint64
maxOutPartRows := uint64(0)
r := rand.New(rand.NewSource(1))
for i := 0; i < 1024; i++ {
n := uint64(uint32(r.NormFloat64() * 1e9))
if n < 0 {
n = -n
}
n++
maxOutPartRows += n
a = append(a, n)
}
pws := newTestPartWrappersForRowsCount(a)
iterationsCount := 0
rowsMerged := uint64(0)
for {
pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows)
if len(pms) == 0 {
break
}
m := make(map[*partWrapper]bool)
for _, pw := range pms {
m[pw] = true
}
var pwsNew []*partWrapper
rowsCount := uint64(0)
for _, pw := range pws {
if m[pw] {
rowsCount += pw.p.ph.RowsCount
} else {
pwsNew = append(pwsNew, pw)
}
}
pw := &partWrapper{
p: &part{},
}
pw.p.ph = partHeader{
RowsCount: rowsCount,
}
rowsMerged += rowsCount
pwsNew = append(pwsNew, pw)
pws = pwsNew
iterationsCount++
}
rowsCount := newTestRowsCountFromPartWrappers(pws)
rowsTotal := uint64(0)
for _, rc := range rowsCount {
rowsTotal += uint64(rc)
}
overhead := float64(rowsMerged) / float64(rowsTotal)
if overhead > 2.1 {
t.Fatalf("too big overhead; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
}
if len(rowsCount) > 18 {
t.Fatalf("too many rowsCount %d; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
len(rowsCount), rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
}
}
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) {
t.Helper()
pws := newTestPartWrappersForRowsCount(initialRowsCount)
// Verify appending to nil.
pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9)
rowsCount := newTestRowsCountFromPartWrappers(pms)
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount)
}
// Verify appending to prefix
prefix := []*partWrapper{
{
p: &part{
ph: partHeader{
RowsCount: 1234,
},
},
},
{},
{},
}
pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9)
if !reflect.DeepEqual(pms[:len(prefix)], prefix) {
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v",
maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix)
}
rowsCount = newTestRowsCountFromPartWrappers(pms[len(prefix):])
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
t.Fatalf("unexpected prefixed rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount)
}
}
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 {
var rowsCount []uint64
for _, pw := range pws {
rowsCount = append(rowsCount, pw.p.ph.RowsCount)
}
return rowsCount
}
func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper {
var pws []*partWrapper
for _, rc := range rowsCount {
pw := &partWrapper{
p: &part{
ph: partHeader{
RowsCount: rc,
},
},
}
pws = append(pws, pw)
}
return pws
}