diff --git a/lib/mergeset/block_stream_reader_test.go b/lib/mergeset/block_stream_reader_test.go index c9175549d..acc7ec80c 100644 --- a/lib/mergeset/block_stream_reader_test.go +++ b/lib/mergeset/block_stream_reader_test.go @@ -2,16 +2,18 @@ package mergeset import ( "fmt" + "math/rand" "sort" "testing" "time" ) func TestBlockStreamReaderReadFromInmemoryPart(t *testing.T) { + r := rand.New(rand.NewSource(1)) var items []string var ib inmemoryBlock for i := 0; i < 100; i++ { - item := getRandomBytes() + item := getRandomBytes(r) if !ib.Add(item) { break } diff --git a/lib/mergeset/encoding_test.go b/lib/mergeset/encoding_test.go index 8e6ecffd0..fc658ab5b 100644 --- a/lib/mergeset/encoding_test.go +++ b/lib/mergeset/encoding_test.go @@ -4,7 +4,6 @@ import ( "math/rand" "reflect" "sort" - "sync" "testing" "testing/quick" @@ -33,6 +32,8 @@ func TestCommonPrefixLen(t *testing.T) { } func TestInmemoryBlockAdd(t *testing.T) { + r := rand.New(rand.NewSource(1)) + var ib inmemoryBlock for i := 0; i < 30; i++ { @@ -42,7 +43,7 @@ func TestInmemoryBlockAdd(t *testing.T) { // Fill ib. for j := 0; j < i*100+1; j++ { - s := getRandomBytes() + s := getRandomBytes(r) if !ib.Add(s) { // ib is full. break @@ -69,6 +70,7 @@ func TestInmemoryBlockAdd(t *testing.T) { } func TestInmemoryBlockSort(t *testing.T) { + r := rand.New(rand.NewSource(1)) var ib inmemoryBlock for i := 0; i < 100; i++ { @@ -77,8 +79,8 @@ func TestInmemoryBlockSort(t *testing.T) { ib.Reset() // Fill ib. - for j := 0; j < rand.Intn(1500); j++ { - s := getRandomBytes() + for j := 0; j < r.Intn(1500); j++ { + s := getRandomBytes(r) if !ib.Add(s) { // ib is full. break @@ -109,6 +111,7 @@ func TestInmemoryBlockSort(t *testing.T) { } func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { + r := rand.New(rand.NewSource(1)) var ib, ib2 inmemoryBlock var sb storageBlock var firstItem, commonPrefix []byte @@ -121,9 +124,9 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { ib.Reset() // Fill ib. - itemsCount := 2 * (rand.Intn(i+1) + 1) + itemsCount := 2 * (r.Intn(i+1) + 1) for j := 0; j < itemsCount/2; j++ { - s := getRandomBytes() + s := getRandomBytes(r) s = []byte("prefix " + string(s)) if !ib.Add(s) { // ib is full. @@ -132,7 +135,7 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { items = append(items, string(s)) totalLen += len(s) - s = getRandomBytes() + s = getRandomBytes(r) if !ib.Add(s) { // ib is full break @@ -186,10 +189,8 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { } } -func getRandomBytes() []byte { - rndLock.Lock() - iv, ok := quick.Value(bytesType, rnd) - rndLock.Unlock() +func getRandomBytes(r *rand.Rand) []byte { + iv, ok := quick.Value(bytesType, r) if !ok { logger.Panicf("error in quick.Value when generating random string") } @@ -197,8 +198,3 @@ func getRandomBytes() []byte { } var bytesType = reflect.TypeOf([]byte(nil)) - -var ( - rnd = rand.New(rand.NewSource(1)) - rndLock sync.Mutex -) diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 6ba874a67..2298a1991 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -23,8 +23,10 @@ func TestMergeBlockStreams(t *testing.T) { } func TestMultilevelMerge(t *testing.T) { + r := rand.New(rand.NewSource(1)) + // Prepare blocks to merge. - bsrs, items := newTestInmemoryBlockStreamReaders(10, 4000) + bsrs, items := newTestInmemoryBlockStreamReaders(r, 10, 4000) var itemsMerged uint64 // First level merge @@ -70,7 +72,8 @@ func TestMultilevelMerge(t *testing.T) { } func TestMergeForciblyStop(t *testing.T) { - bsrs, _ := newTestInmemoryBlockStreamReaders(20, 4000) + r := rand.New(rand.NewSource(1)) + bsrs, _ := newTestInmemoryBlockStreamReaders(r, 20, 4000) var dstIP inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&dstIP, 1) @@ -88,16 +91,18 @@ func TestMergeForciblyStop(t *testing.T) { func testMergeBlockStreams(t *testing.T, blocksToMerge, maxItemsPerBlock int) { t.Helper() - if err := testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock); err != nil { + r := rand.New(rand.NewSource(1)) + if err := testMergeBlockStreamsSerial(r, blocksToMerge, maxItemsPerBlock); err != nil { t.Fatalf("unexpected error in serial test: %s", err) } const concurrency = 3 ch := make(chan error, concurrency) for i := 0; i < concurrency; i++ { - go func() { - ch <- testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock) - }() + go func(n int) { + rLocal := rand.New(rand.NewSource(int64(n))) + ch <- testMergeBlockStreamsSerial(rLocal, blocksToMerge, maxItemsPerBlock) + }(i) } for i := 0; i < concurrency; i++ { @@ -112,9 +117,9 @@ func testMergeBlockStreams(t *testing.T, blocksToMerge, maxItemsPerBlock int) { } } -func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error { +func testMergeBlockStreamsSerial(r *rand.Rand, blocksToMerge, maxItemsPerBlock int) error { // Prepare blocks to merge. - bsrs, items := newTestInmemoryBlockStreamReaders(blocksToMerge, maxItemsPerBlock) + bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksToMerge, maxItemsPerBlock) // Merge blocks. var itemsMerged uint64 @@ -175,14 +180,14 @@ func testCheckItems(dstIP *inmemoryPart, items []string) error { return nil } -func newTestInmemoryBlockStreamReaders(blocksCount, maxItemsPerBlock int) ([]*blockStreamReader, []string) { +func newTestInmemoryBlockStreamReaders(r *rand.Rand, blocksCount, maxItemsPerBlock int) ([]*blockStreamReader, []string) { var items []string var bsrs []*blockStreamReader for i := 0; i < blocksCount; i++ { var ib inmemoryBlock - itemsPerBlock := rand.Intn(maxItemsPerBlock) + 1 + itemsPerBlock := r.Intn(maxItemsPerBlock) + 1 for j := 0; j < itemsPerBlock; j++ { - item := getRandomBytes() + item := getRandomBytes(r) if !ib.Add(item) { break } diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index c042d44ac..72434a402 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -9,13 +9,14 @@ import ( ) func TestPartSearch(t *testing.T) { - p, items, err := newTestPart(10, 4000) + r := rand.New(rand.NewSource(1)) + p, items, err := newTestPart(r, 10, 4000) if err != nil { t.Fatalf("cannot create test part: %s", err) } t.Run("serial", func(t *testing.T) { - if err := testPartSearchSerial(p, items); err != nil { + if err := testPartSearchSerial(r, p, items); err != nil { t.Fatalf("error in serial part search test: %s", err) } }) @@ -31,9 +32,10 @@ func testPartSearchConcurrent(p *part, items []string) error { const goroutinesCount = 5 ch := make(chan error, goroutinesCount) for i := 0; i < goroutinesCount; i++ { - go func() { - ch <- testPartSearchSerial(p, items) - }() + go func(n int) { + rLocal := rand.New(rand.NewSource(int64(n))) + ch <- testPartSearchSerial(rLocal, p, items) + }(i) } for i := 0; i < goroutinesCount; i++ { select { @@ -48,7 +50,7 @@ func testPartSearchConcurrent(p *part, items []string) error { return nil } -func testPartSearchSerial(p *part, items []string) error { +func testPartSearchSerial(r *rand.Rand, p *part, items []string) error { var ps partSearch ps.Init(p) @@ -88,7 +90,7 @@ func testPartSearchSerial(p *part, items []string) error { // Search for inner items for loop := 0; loop < 100; loop++ { - idx := rand.Intn(len(items)) + idx := r.Intn(len(items)) k = append(k[:0], items[idx]...) ps.Seek(k) n := sort.Search(len(items), func(i int) bool { @@ -143,8 +145,8 @@ func testPartSearchSerial(p *part, items []string) error { return nil } -func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) { - bsrs, items := newTestInmemoryBlockStreamReaders(blocksCount, maxItemsPerBlock) +func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []string, error) { + bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksCount, maxItemsPerBlock) var itemsMerged uint64 var ip inmemoryPart diff --git a/lib/mergeset/table_search_test.go b/lib/mergeset/table_search_test.go index f0ec1f888..181daa38b 100644 --- a/lib/mergeset/table_search_test.go +++ b/lib/mergeset/table_search_test.go @@ -27,7 +27,8 @@ func TestTableSearchSerial(t *testing.T) { const itemsCount = 1e5 items := func() []string { - tb, items, err := newTestTable(path, itemsCount) + r := rand.New(rand.NewSource(1)) + tb, items, err := newTestTable(r, path, itemsCount) if err != nil { t.Fatalf("cannot create test table: %s", err) } @@ -63,7 +64,8 @@ func TestTableSearchConcurrent(t *testing.T) { const itemsCount = 1e5 items := func() []string { - tb, items, err := newTestTable(path, itemsCount) + r := rand.New(rand.NewSource(2)) + tb, items, err := newTestTable(r, path, itemsCount) if err != nil { t.Fatalf("cannot create test table: %s", err) } @@ -148,7 +150,7 @@ func testTableSearchSerial(tb *Table, items []string) error { return nil } -func newTestTable(path string, itemsCount int) (*Table, []string, error) { +func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string, error) { var flushes uint64 flushCallback := func() { atomic.AddUint64(&flushes, 1) @@ -160,7 +162,7 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) { } items := make([]string, itemsCount) for i := 0; i < itemsCount; i++ { - item := fmt.Sprintf("%d:%d", rand.Intn(1e9), i) + item := fmt.Sprintf("%d:%d", r.Intn(1e9), i) tb.AddItems([][]byte{[]byte(item)}) items[i] = item } diff --git a/lib/mergeset/table_search_timing_test.go b/lib/mergeset/table_search_timing_test.go index 555fd8578..12fbcb7ee 100644 --- a/lib/mergeset/table_search_timing_test.go +++ b/lib/mergeset/table_search_timing_test.go @@ -17,6 +17,8 @@ func BenchmarkTableSearch(b *testing.B) { } func benchmarkTableSearch(b *testing.B, itemsCount int) { + r := rand.New(rand.NewSource(1)) + path := fmt.Sprintf("BenchmarkTableSearch-%d", itemsCount) if err := os.RemoveAll(path); err != nil { b.Fatalf("cannot remove %q: %s", path, err) @@ -25,7 +27,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { _ = os.RemoveAll(path) }() - tb, items, err := newTestTable(path, itemsCount) + tb, items, err := newTestTable(r, path, itemsCount) if err != nil { panic(fmt.Errorf("cannot create test table at %q with %d items: %w", path, itemsCount, err)) } @@ -52,7 +54,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { }) randKeys := append([][]byte{}, keys...) - rand.Shuffle(len(randKeys), func(i, j int) { + r.Shuffle(len(randKeys), func(i, j int) { randKeys[i], randKeys[j] = randKeys[j], randKeys[i] }) b.Run("random-keys-exact", func(b *testing.B) { @@ -81,11 +83,12 @@ func benchmarkTableSearchKeysExt(b *testing.B, tb *Table, keys [][]byte, stripSu b.ReportAllocs() b.SetBytes(int64(searchKeysCount * rowsToScan)) b.RunParallel(func(pb *testing.PB) { + r := rand.New(rand.NewSource(1)) var ts TableSearch ts.Init(tb) defer ts.MustClose() for pb.Next() { - startIdx := rand.Intn(len(keys) - searchKeysCount) + startIdx := r.Intn(len(keys) - searchKeysCount) searchKeys := keys[startIdx : startIdx+searchKeysCount] for i, key := range searchKeys { searchKey := key diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index eff20bcb5..6f3de5dee 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -3,6 +3,7 @@ package mergeset import ( "bytes" "fmt" + "math/rand" "os" "sync" "sync/atomic" @@ -61,6 +62,7 @@ func TestTableOpenMultipleTimes(t *testing.T) { } func TestTableAddItemsSerial(t *testing.T) { + r := rand.New(rand.NewSource(1)) const path = "TestTableAddItemsSerial" if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) @@ -80,7 +82,7 @@ func TestTableAddItemsSerial(t *testing.T) { } const itemsCount = 10e3 - testAddItemsSerial(tb, itemsCount) + testAddItemsSerial(r, tb, itemsCount) // Verify items count after pending items flush. tb.DebugFlush() @@ -105,16 +107,16 @@ func TestTableAddItemsSerial(t *testing.T) { t.Fatalf("cannot open %q: %s", path, err) } const moreItemsCount = itemsCount * 3 - testAddItemsSerial(tb, moreItemsCount) + testAddItemsSerial(r, tb, moreItemsCount) tb.MustClose() // Re-open the table and verify itemsCount again. testReopenTable(t, path, itemsCount+moreItemsCount) } -func testAddItemsSerial(tb *Table, itemsCount int) { +func testAddItemsSerial(r *rand.Rand, tb *Table, itemsCount int) { for i := 0; i < itemsCount; i++ { - item := getRandomBytes() + item := getRandomBytes(r) if len(item) > maxInmemoryBlockSize { item = item[:maxInmemoryBlockSize] } @@ -263,16 +265,17 @@ func testAddItemsConcurrent(tb *Table, itemsCount int) { var wg sync.WaitGroup for i := 0; i < goroutinesCount; i++ { wg.Add(1) - go func() { + go func(n int) { defer wg.Done() + r := rand.New(rand.NewSource(int64(n))) for range workCh { - item := getRandomBytes() + item := getRandomBytes(r) if len(item) > maxInmemoryBlockSize { item = item[:maxInmemoryBlockSize] } tb.AddItems([][]byte{item}) } - }() + }(i) } for i := 0; i < itemsCount; i++ { workCh <- i