diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 52cc74ea8..628634c3d 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -167,7 +167,7 @@ var rawItemsShardsPerTable = func() int { return cpus * multiplier }() -const maxBlocksPerShard = 256 +var maxBlocksPerShard = 256 func (riss *rawItemsShards) init() { riss.shards = make([]rawItemsShard, rawItemsShardsPerTable) diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index e044ab34a..810dd5a04 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -176,6 +176,66 @@ func TestTableCreateSnapshotAt(t *testing.T) { _ = os.RemoveAll(path) } +func TestTableAddItemsConcurrentStress(t *testing.T) { + const path = "TestTableAddItemsConcurrentStress" + if err := os.RemoveAll(path); err != nil { + t.Fatalf("cannot remove %q: %s", path, err) + } + defer func() { + _ = os.RemoveAll(path) + }() + + rawItemsShardsPerTableOrig := rawItemsShardsPerTable + maxBlocksPerShardOrig := maxBlocksPerShard + rawItemsShardsPerTable = 10 + maxBlocksPerShard = 3 + defer func() { + rawItemsShardsPerTable = rawItemsShardsPerTableOrig + maxBlocksPerShard = maxBlocksPerShardOrig + }() + + var flushes atomic.Uint64 + flushCallback := func() { + flushes.Add(1) + } + prepareBlock := func(data []byte, items []Item) ([]byte, []Item) { + return data, items + } + + blocksNeeded := rawItemsShardsPerTable * maxBlocksPerShard * 10 + testAddItems := func(tb *Table) { + itemsBatch := make([][]byte, 0) + + for j := 0; j < blocksNeeded; j++ { + item := bytes.Repeat([]byte{byte(j)}, maxInmemoryBlockSize-10) + itemsBatch = append(itemsBatch, item) + } + tb.AddItems(itemsBatch) + } + + var isReadOnly atomic.Bool + tb := MustOpenTable(path, flushCallback, prepareBlock, &isReadOnly) + + testAddItems(tb) + + // Verify items count after pending items flush. + tb.DebugFlush() + if flushes.Load() == 0 { + t.Fatalf("unexpected zero flushes") + } + + var m TableMetrics + tb.UpdateMetrics(&m) + if n := m.TotalItemsCount(); n != uint64(blocksNeeded) { + t.Fatalf("unexpected itemsCount; got %d; want %v", n, blocksNeeded) + } + + tb.MustClose() + + // Re-open the table and make sure itemsCount remains the same. + testReopenTable(t, path, blocksNeeded) +} + func TestTableAddItemsConcurrent(t *testing.T) { const path = "TestTableAddItemsConcurrent" if err := os.RemoveAll(path); err != nil {