lib/mergeset: use deterministic random generator in tests

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3683
This commit is contained in:
Aliaksandr Valialkin 2023-01-23 19:43:39 -08:00
parent f8dcbe4abd
commit 4c7062b408
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 64 additions and 51 deletions

View file

@ -2,16 +2,18 @@ package mergeset
import (
"fmt"
"math/rand"
"sort"
"testing"
"time"
)
func TestBlockStreamReaderReadFromInmemoryPart(t *testing.T) {
r := rand.New(rand.NewSource(1))
var items []string
var ib inmemoryBlock
for i := 0; i < 100; i++ {
item := getRandomBytes()
item := getRandomBytes(r)
if !ib.Add(item) {
break
}

View file

@ -4,7 +4,6 @@ import (
"math/rand"
"reflect"
"sort"
"sync"
"testing"
"testing/quick"
@ -33,6 +32,8 @@ func TestCommonPrefixLen(t *testing.T) {
}
func TestInmemoryBlockAdd(t *testing.T) {
r := rand.New(rand.NewSource(1))
var ib inmemoryBlock
for i := 0; i < 30; i++ {
@ -42,7 +43,7 @@ func TestInmemoryBlockAdd(t *testing.T) {
// Fill ib.
for j := 0; j < i*100+1; j++ {
s := getRandomBytes()
s := getRandomBytes(r)
if !ib.Add(s) {
// ib is full.
break
@ -69,6 +70,7 @@ func TestInmemoryBlockAdd(t *testing.T) {
}
func TestInmemoryBlockSort(t *testing.T) {
r := rand.New(rand.NewSource(1))
var ib inmemoryBlock
for i := 0; i < 100; i++ {
@ -77,8 +79,8 @@ func TestInmemoryBlockSort(t *testing.T) {
ib.Reset()
// Fill ib.
for j := 0; j < rand.Intn(1500); j++ {
s := getRandomBytes()
for j := 0; j < r.Intn(1500); j++ {
s := getRandomBytes(r)
if !ib.Add(s) {
// ib is full.
break
@ -109,6 +111,7 @@ func TestInmemoryBlockSort(t *testing.T) {
}
func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
r := rand.New(rand.NewSource(1))
var ib, ib2 inmemoryBlock
var sb storageBlock
var firstItem, commonPrefix []byte
@ -121,9 +124,9 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
ib.Reset()
// Fill ib.
itemsCount := 2 * (rand.Intn(i+1) + 1)
itemsCount := 2 * (r.Intn(i+1) + 1)
for j := 0; j < itemsCount/2; j++ {
s := getRandomBytes()
s := getRandomBytes(r)
s = []byte("prefix " + string(s))
if !ib.Add(s) {
// ib is full.
@ -132,7 +135,7 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
items = append(items, string(s))
totalLen += len(s)
s = getRandomBytes()
s = getRandomBytes(r)
if !ib.Add(s) {
// ib is full
break
@ -186,10 +189,8 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) {
}
}
func getRandomBytes() []byte {
rndLock.Lock()
iv, ok := quick.Value(bytesType, rnd)
rndLock.Unlock()
func getRandomBytes(r *rand.Rand) []byte {
iv, ok := quick.Value(bytesType, r)
if !ok {
logger.Panicf("error in quick.Value when generating random string")
}
@ -197,8 +198,3 @@ func getRandomBytes() []byte {
}
var bytesType = reflect.TypeOf([]byte(nil))
var (
rnd = rand.New(rand.NewSource(1))
rndLock sync.Mutex
)

View file

@ -23,8 +23,10 @@ func TestMergeBlockStreams(t *testing.T) {
}
func TestMultilevelMerge(t *testing.T) {
r := rand.New(rand.NewSource(1))
// Prepare blocks to merge.
bsrs, items := newTestInmemoryBlockStreamReaders(10, 4000)
bsrs, items := newTestInmemoryBlockStreamReaders(r, 10, 4000)
var itemsMerged uint64
// First level merge
@ -70,7 +72,8 @@ func TestMultilevelMerge(t *testing.T) {
}
func TestMergeForciblyStop(t *testing.T) {
bsrs, _ := newTestInmemoryBlockStreamReaders(20, 4000)
r := rand.New(rand.NewSource(1))
bsrs, _ := newTestInmemoryBlockStreamReaders(r, 20, 4000)
var dstIP inmemoryPart
var bsw blockStreamWriter
bsw.InitFromInmemoryPart(&dstIP, 1)
@ -88,16 +91,18 @@ func TestMergeForciblyStop(t *testing.T) {
func testMergeBlockStreams(t *testing.T, blocksToMerge, maxItemsPerBlock int) {
t.Helper()
if err := testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock); err != nil {
r := rand.New(rand.NewSource(1))
if err := testMergeBlockStreamsSerial(r, blocksToMerge, maxItemsPerBlock); err != nil {
t.Fatalf("unexpected error in serial test: %s", err)
}
const concurrency = 3
ch := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
ch <- testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock)
}()
go func(n int) {
rLocal := rand.New(rand.NewSource(int64(n)))
ch <- testMergeBlockStreamsSerial(rLocal, blocksToMerge, maxItemsPerBlock)
}(i)
}
for i := 0; i < concurrency; i++ {
@ -112,9 +117,9 @@ func testMergeBlockStreams(t *testing.T, blocksToMerge, maxItemsPerBlock int) {
}
}
func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error {
func testMergeBlockStreamsSerial(r *rand.Rand, blocksToMerge, maxItemsPerBlock int) error {
// Prepare blocks to merge.
bsrs, items := newTestInmemoryBlockStreamReaders(blocksToMerge, maxItemsPerBlock)
bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksToMerge, maxItemsPerBlock)
// Merge blocks.
var itemsMerged uint64
@ -175,14 +180,14 @@ func testCheckItems(dstIP *inmemoryPart, items []string) error {
return nil
}
func newTestInmemoryBlockStreamReaders(blocksCount, maxItemsPerBlock int) ([]*blockStreamReader, []string) {
func newTestInmemoryBlockStreamReaders(r *rand.Rand, blocksCount, maxItemsPerBlock int) ([]*blockStreamReader, []string) {
var items []string
var bsrs []*blockStreamReader
for i := 0; i < blocksCount; i++ {
var ib inmemoryBlock
itemsPerBlock := rand.Intn(maxItemsPerBlock) + 1
itemsPerBlock := r.Intn(maxItemsPerBlock) + 1
for j := 0; j < itemsPerBlock; j++ {
item := getRandomBytes()
item := getRandomBytes(r)
if !ib.Add(item) {
break
}

View file

@ -9,13 +9,14 @@ import (
)
func TestPartSearch(t *testing.T) {
p, items, err := newTestPart(10, 4000)
r := rand.New(rand.NewSource(1))
p, items, err := newTestPart(r, 10, 4000)
if err != nil {
t.Fatalf("cannot create test part: %s", err)
}
t.Run("serial", func(t *testing.T) {
if err := testPartSearchSerial(p, items); err != nil {
if err := testPartSearchSerial(r, p, items); err != nil {
t.Fatalf("error in serial part search test: %s", err)
}
})
@ -31,9 +32,10 @@ func testPartSearchConcurrent(p *part, items []string) error {
const goroutinesCount = 5
ch := make(chan error, goroutinesCount)
for i := 0; i < goroutinesCount; i++ {
go func() {
ch <- testPartSearchSerial(p, items)
}()
go func(n int) {
rLocal := rand.New(rand.NewSource(int64(n)))
ch <- testPartSearchSerial(rLocal, p, items)
}(i)
}
for i := 0; i < goroutinesCount; i++ {
select {
@ -48,7 +50,7 @@ func testPartSearchConcurrent(p *part, items []string) error {
return nil
}
func testPartSearchSerial(p *part, items []string) error {
func testPartSearchSerial(r *rand.Rand, p *part, items []string) error {
var ps partSearch
ps.Init(p)
@ -88,7 +90,7 @@ func testPartSearchSerial(p *part, items []string) error {
// Search for inner items
for loop := 0; loop < 100; loop++ {
idx := rand.Intn(len(items))
idx := r.Intn(len(items))
k = append(k[:0], items[idx]...)
ps.Seek(k)
n := sort.Search(len(items), func(i int) bool {
@ -143,8 +145,8 @@ func testPartSearchSerial(p *part, items []string) error {
return nil
}
func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) {
bsrs, items := newTestInmemoryBlockStreamReaders(blocksCount, maxItemsPerBlock)
func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []string, error) {
bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksCount, maxItemsPerBlock)
var itemsMerged uint64
var ip inmemoryPart

View file

@ -27,7 +27,8 @@ func TestTableSearchSerial(t *testing.T) {
const itemsCount = 1e5
items := func() []string {
tb, items, err := newTestTable(path, itemsCount)
r := rand.New(rand.NewSource(1))
tb, items, err := newTestTable(r, path, itemsCount)
if err != nil {
t.Fatalf("cannot create test table: %s", err)
}
@ -63,7 +64,8 @@ func TestTableSearchConcurrent(t *testing.T) {
const itemsCount = 1e5
items := func() []string {
tb, items, err := newTestTable(path, itemsCount)
r := rand.New(rand.NewSource(2))
tb, items, err := newTestTable(r, path, itemsCount)
if err != nil {
t.Fatalf("cannot create test table: %s", err)
}
@ -148,7 +150,7 @@ func testTableSearchSerial(tb *Table, items []string) error {
return nil
}
func newTestTable(path string, itemsCount int) (*Table, []string, error) {
func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string, error) {
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
@ -160,7 +162,7 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
}
items := make([]string, itemsCount)
for i := 0; i < itemsCount; i++ {
item := fmt.Sprintf("%d:%d", rand.Intn(1e9), i)
item := fmt.Sprintf("%d:%d", r.Intn(1e9), i)
tb.AddItems([][]byte{[]byte(item)})
items[i] = item
}

View file

@ -17,6 +17,8 @@ func BenchmarkTableSearch(b *testing.B) {
}
func benchmarkTableSearch(b *testing.B, itemsCount int) {
r := rand.New(rand.NewSource(1))
path := fmt.Sprintf("BenchmarkTableSearch-%d", itemsCount)
if err := os.RemoveAll(path); err != nil {
b.Fatalf("cannot remove %q: %s", path, err)
@ -25,7 +27,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
_ = os.RemoveAll(path)
}()
tb, items, err := newTestTable(path, itemsCount)
tb, items, err := newTestTable(r, path, itemsCount)
if err != nil {
panic(fmt.Errorf("cannot create test table at %q with %d items: %w", path, itemsCount, err))
}
@ -52,7 +54,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
})
randKeys := append([][]byte{}, keys...)
rand.Shuffle(len(randKeys), func(i, j int) {
r.Shuffle(len(randKeys), func(i, j int) {
randKeys[i], randKeys[j] = randKeys[j], randKeys[i]
})
b.Run("random-keys-exact", func(b *testing.B) {
@ -81,11 +83,12 @@ func benchmarkTableSearchKeysExt(b *testing.B, tb *Table, keys [][]byte, stripSu
b.ReportAllocs()
b.SetBytes(int64(searchKeysCount * rowsToScan))
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(1))
var ts TableSearch
ts.Init(tb)
defer ts.MustClose()
for pb.Next() {
startIdx := rand.Intn(len(keys) - searchKeysCount)
startIdx := r.Intn(len(keys) - searchKeysCount)
searchKeys := keys[startIdx : startIdx+searchKeysCount]
for i, key := range searchKeys {
searchKey := key

View file

@ -3,6 +3,7 @@ package mergeset
import (
"bytes"
"fmt"
"math/rand"
"os"
"sync"
"sync/atomic"
@ -61,6 +62,7 @@ func TestTableOpenMultipleTimes(t *testing.T) {
}
func TestTableAddItemsSerial(t *testing.T) {
r := rand.New(rand.NewSource(1))
const path = "TestTableAddItemsSerial"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
@ -80,7 +82,7 @@ func TestTableAddItemsSerial(t *testing.T) {
}
const itemsCount = 10e3
testAddItemsSerial(tb, itemsCount)
testAddItemsSerial(r, tb, itemsCount)
// Verify items count after pending items flush.
tb.DebugFlush()
@ -105,16 +107,16 @@ func TestTableAddItemsSerial(t *testing.T) {
t.Fatalf("cannot open %q: %s", path, err)
}
const moreItemsCount = itemsCount * 3
testAddItemsSerial(tb, moreItemsCount)
testAddItemsSerial(r, tb, moreItemsCount)
tb.MustClose()
// Re-open the table and verify itemsCount again.
testReopenTable(t, path, itemsCount+moreItemsCount)
}
func testAddItemsSerial(tb *Table, itemsCount int) {
func testAddItemsSerial(r *rand.Rand, tb *Table, itemsCount int) {
for i := 0; i < itemsCount; i++ {
item := getRandomBytes()
item := getRandomBytes(r)
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
@ -263,16 +265,17 @@ func testAddItemsConcurrent(tb *Table, itemsCount int) {
var wg sync.WaitGroup
for i := 0; i < goroutinesCount; i++ {
wg.Add(1)
go func() {
go func(n int) {
defer wg.Done()
r := rand.New(rand.NewSource(int64(n)))
for range workCh {
item := getRandomBytes()
item := getRandomBytes(r)
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
tb.AddItems([][]byte{item})
}
}()
}(i)
}
for i := 0; i < itemsCount; i++ {
workCh <- i