mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
lib/mergeset: improve test coverage (#6118)
Add test to cover the code path with overflowing shards buffers and
triggering merge to partition.
This test covers the code path which leaded to
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
(cherry picked from commit 329c3cbdf0
)
This commit is contained in:
parent
f22b2a0563
commit
f6eb8912b0
2 changed files with 61 additions and 1 deletions
|
@ -167,7 +167,7 @@ var rawItemsShardsPerTable = func() int {
|
||||||
return cpus * multiplier
|
return cpus * multiplier
|
||||||
}()
|
}()
|
||||||
|
|
||||||
const maxBlocksPerShard = 256
|
var maxBlocksPerShard = 256
|
||||||
|
|
||||||
func (riss *rawItemsShards) init() {
|
func (riss *rawItemsShards) init() {
|
||||||
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
|
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
|
||||||
|
|
|
@ -176,6 +176,66 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
||||||
_ = os.RemoveAll(path)
|
_ = os.RemoveAll(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTableAddItemsConcurrentStress(t *testing.T) {
|
||||||
|
const path = "TestTableAddItemsConcurrentStress"
|
||||||
|
if err := os.RemoveAll(path); err != nil {
|
||||||
|
t.Fatalf("cannot remove %q: %s", path, err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = os.RemoveAll(path)
|
||||||
|
}()
|
||||||
|
|
||||||
|
rawItemsShardsPerTableOrig := rawItemsShardsPerTable
|
||||||
|
maxBlocksPerShardOrig := maxBlocksPerShard
|
||||||
|
rawItemsShardsPerTable = 10
|
||||||
|
maxBlocksPerShard = 3
|
||||||
|
defer func() {
|
||||||
|
rawItemsShardsPerTable = rawItemsShardsPerTableOrig
|
||||||
|
maxBlocksPerShard = maxBlocksPerShardOrig
|
||||||
|
}()
|
||||||
|
|
||||||
|
var flushes atomic.Uint64
|
||||||
|
flushCallback := func() {
|
||||||
|
flushes.Add(1)
|
||||||
|
}
|
||||||
|
prepareBlock := func(data []byte, items []Item) ([]byte, []Item) {
|
||||||
|
return data, items
|
||||||
|
}
|
||||||
|
|
||||||
|
blocksNeeded := rawItemsShardsPerTable * maxBlocksPerShard * 10
|
||||||
|
testAddItems := func(tb *Table) {
|
||||||
|
itemsBatch := make([][]byte, 0)
|
||||||
|
|
||||||
|
for j := 0; j < blocksNeeded; j++ {
|
||||||
|
item := bytes.Repeat([]byte{byte(j)}, maxInmemoryBlockSize-10)
|
||||||
|
itemsBatch = append(itemsBatch, item)
|
||||||
|
}
|
||||||
|
tb.AddItems(itemsBatch)
|
||||||
|
}
|
||||||
|
|
||||||
|
var isReadOnly atomic.Bool
|
||||||
|
tb := MustOpenTable(path, flushCallback, prepareBlock, &isReadOnly)
|
||||||
|
|
||||||
|
testAddItems(tb)
|
||||||
|
|
||||||
|
// Verify items count after pending items flush.
|
||||||
|
tb.DebugFlush()
|
||||||
|
if flushes.Load() == 0 {
|
||||||
|
t.Fatalf("unexpected zero flushes")
|
||||||
|
}
|
||||||
|
|
||||||
|
var m TableMetrics
|
||||||
|
tb.UpdateMetrics(&m)
|
||||||
|
if n := m.TotalItemsCount(); n != uint64(blocksNeeded) {
|
||||||
|
t.Fatalf("unexpected itemsCount; got %d; want %v", n, blocksNeeded)
|
||||||
|
}
|
||||||
|
|
||||||
|
tb.MustClose()
|
||||||
|
|
||||||
|
// Re-open the table and make sure itemsCount remains the same.
|
||||||
|
testReopenTable(t, path, blocksNeeded)
|
||||||
|
}
|
||||||
|
|
||||||
func TestTableAddItemsConcurrent(t *testing.T) {
|
func TestTableAddItemsConcurrent(t *testing.T) {
|
||||||
const path = "TestTableAddItemsConcurrent"
|
const path = "TestTableAddItemsConcurrent"
|
||||||
if err := os.RemoveAll(path); err != nil {
|
if err := os.RemoveAll(path); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue