mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
48656dcc38
While this may increase CPU and disk IO usage needed for background merge,
this also recudes CPU usage during queries in production. This is because
such queries tend to read recently added data and it is better to have lower number
of parts for such data in order to reduce CPU usage.
This partially reverts ebf8da3730
160 lines
4.8 KiB
Go
160 lines
4.8 KiB
Go
package storage
|
|
|
|
import (
|
|
"math/rand"
|
|
"reflect"
|
|
"testing"
|
|
)
|
|
|
|
func TestPartitionMaxRowsByPath(t *testing.T) {
|
|
n := maxRowsByPath(".")
|
|
if n < 1e3 {
|
|
t.Fatalf("too small number of rows can be created in the current directory: %d", n)
|
|
}
|
|
}
|
|
|
|
func TestAppendPartsToMerge(t *testing.T) {
|
|
testAppendPartsToMerge(t, 2, []uint64{}, nil)
|
|
testAppendPartsToMerge(t, 2, []uint64{123}, nil)
|
|
testAppendPartsToMerge(t, 2, []uint64{4, 2}, nil)
|
|
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 1}, nil)
|
|
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 10, 9, 7, 3, 1}, []uint64{3, 7, 9, 10})
|
|
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2})
|
|
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2, 4, 8})
|
|
testAppendPartsToMerge(t, 2, []uint64{1, 1}, []uint64{1, 1})
|
|
testAppendPartsToMerge(t, 2, []uint64{2, 2, 2}, []uint64{2, 2})
|
|
testAppendPartsToMerge(t, 2, []uint64{4, 2, 4}, []uint64{4, 4})
|
|
testAppendPartsToMerge(t, 2, []uint64{1, 3, 7, 2}, nil)
|
|
testAppendPartsToMerge(t, 3, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
|
|
testAppendPartsToMerge(t, 4, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
|
|
testAppendPartsToMerge(t, 5, []uint64{1, 3, 7, 2}, nil)
|
|
testAppendPartsToMerge(t, 4, []uint64{1e6, 3e6, 7e6, 2e6}, []uint64{1e6, 2e6, 3e6})
|
|
testAppendPartsToMerge(t, 4, []uint64{2, 3, 7, 2}, []uint64{2, 2, 3})
|
|
testAppendPartsToMerge(t, 5, []uint64{2, 3, 7, 2}, nil)
|
|
testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11})
|
|
}
|
|
|
|
func TestAppendPartsToMergeManyParts(t *testing.T) {
|
|
// Verify that big number of parts are merged into minimal number of parts
|
|
// using minimum merges.
|
|
var a []uint64
|
|
maxOutPartRows := uint64(0)
|
|
r := rand.New(rand.NewSource(1))
|
|
for i := 0; i < 1024; i++ {
|
|
n := uint64(uint32(r.NormFloat64() * 1e9))
|
|
if n < 0 {
|
|
n = -n
|
|
}
|
|
n++
|
|
maxOutPartRows += n
|
|
a = append(a, n)
|
|
}
|
|
pws := newTestPartWrappersForRowsCount(a)
|
|
|
|
iterationsCount := 0
|
|
rowsMerged := uint64(0)
|
|
for {
|
|
pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows)
|
|
if len(pms) == 0 {
|
|
break
|
|
}
|
|
m := make(map[*partWrapper]bool)
|
|
for _, pw := range pms {
|
|
m[pw] = true
|
|
}
|
|
var pwsNew []*partWrapper
|
|
rowsCount := uint64(0)
|
|
for _, pw := range pws {
|
|
if m[pw] {
|
|
rowsCount += pw.p.ph.RowsCount
|
|
} else {
|
|
pwsNew = append(pwsNew, pw)
|
|
}
|
|
}
|
|
pw := &partWrapper{
|
|
p: &part{},
|
|
}
|
|
pw.p.ph = partHeader{
|
|
RowsCount: rowsCount,
|
|
}
|
|
rowsMerged += rowsCount
|
|
pwsNew = append(pwsNew, pw)
|
|
pws = pwsNew
|
|
iterationsCount++
|
|
}
|
|
rowsCount := newTestRowsCountFromPartWrappers(pws)
|
|
rowsTotal := uint64(0)
|
|
for _, rc := range rowsCount {
|
|
rowsTotal += uint64(rc)
|
|
}
|
|
overhead := float64(rowsMerged) / float64(rowsTotal)
|
|
if overhead > 2.1 {
|
|
t.Fatalf("too big overhead; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
|
|
rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
|
|
}
|
|
if len(rowsCount) > 18 {
|
|
t.Fatalf("too many rowsCount %d; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
|
|
len(rowsCount), rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
|
|
}
|
|
}
|
|
|
|
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) {
|
|
t.Helper()
|
|
|
|
pws := newTestPartWrappersForRowsCount(initialRowsCount)
|
|
|
|
// Verify appending to nil.
|
|
pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9)
|
|
rowsCount := newTestRowsCountFromPartWrappers(pms)
|
|
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
|
|
t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
|
|
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount)
|
|
}
|
|
|
|
// Verify appending to prefix
|
|
prefix := []*partWrapper{
|
|
{
|
|
p: &part{
|
|
ph: partHeader{
|
|
RowsCount: 1234,
|
|
},
|
|
},
|
|
},
|
|
{},
|
|
{},
|
|
}
|
|
pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9)
|
|
if !reflect.DeepEqual(pms[:len(prefix)], prefix) {
|
|
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v",
|
|
maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix)
|
|
}
|
|
|
|
rowsCount = newTestRowsCountFromPartWrappers(pms[len(prefix):])
|
|
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
|
|
t.Fatalf("unexpected prefixed rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
|
|
maxPartsToMerge, initialRowsCount, rowsCount, expectedRowsCount)
|
|
}
|
|
}
|
|
|
|
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 {
|
|
var rowsCount []uint64
|
|
for _, pw := range pws {
|
|
rowsCount = append(rowsCount, pw.p.ph.RowsCount)
|
|
}
|
|
return rowsCount
|
|
}
|
|
|
|
func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper {
|
|
var pws []*partWrapper
|
|
for _, rc := range rowsCount {
|
|
pw := &partWrapper{
|
|
p: &part{
|
|
ph: partHeader{
|
|
RowsCount: rc,
|
|
},
|
|
},
|
|
}
|
|
pws = append(pws, pw)
|
|
}
|
|
return pws
|
|
}
|