VictoriaMetrics/lib/logstorage/partition_test.go
Aliaksandr Valialkin 62e4baf556
lib/logstorage: use simpler in-memory cache instead of workingsetcache for caching recently ingested _stream values and recently queried set of streams
These caches aren't expected to grow big, so it is OK to use the most simplest cache based on sync.Map.
The benefit of this cache compared to workingsetcache is better scalability on systems with many CPU cores,
since it doesn't use mutexes at fast path.
An additional benefit is lower memory usage on average, since the size of in-memory cache equals
working set for the last 3 minutes.

The downside is that there is no upper bound for the cache size, so it may grow big during workload spikes.
But this is very unlikely for typical workloads.

(cherry picked from commit 0f24078146)
2024-10-18 11:42:16 +02:00

198 lines
5.6 KiB
Go

package logstorage
import (
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
)
func TestPartitionLifecycle(t *testing.T) {
t.Parallel()
path := t.Name()
var ddbStats DatadbStats
s := newTestStorage()
for i := 0; i < 3; i++ {
mustCreatePartition(path)
for j := 0; j < 2; j++ {
pt := mustOpenPartition(s, path)
ddbStats.reset()
pt.ddb.updateStats(&ddbStats)
if n := ddbStats.RowsCount(); n != 0 {
t.Fatalf("unexpected non-zero number of entries in empty partition: %d", n)
}
if ddbStats.InmemoryParts != 0 {
t.Fatalf("unexpected non-zero number of in-memory parts in empty partition: %d", ddbStats.InmemoryParts)
}
if ddbStats.SmallParts != 0 {
t.Fatalf("unexpected non-zero number of small file parts in empty partition: %d", ddbStats.SmallParts)
}
if ddbStats.BigParts != 0 {
t.Fatalf("unexpected non-zero number of big file parts in empty partition: %d", ddbStats.BigParts)
}
if ddbStats.CompressedInmemorySize != 0 {
t.Fatalf("unexpected non-zero size of inmemory parts for empty partition")
}
if ddbStats.CompressedSmallPartSize != 0 {
t.Fatalf("unexpected non-zero size of small file parts for empty partition")
}
if ddbStats.CompressedBigPartSize != 0 {
t.Fatalf("unexpected non-zero size of big file parts for empty partition")
}
time.Sleep(10 * time.Millisecond)
mustClosePartition(pt)
}
mustDeletePartition(path)
}
closeTestStorage(s)
}
func TestPartitionMustAddRowsSerial(t *testing.T) {
t.Parallel()
path := t.Name()
var ddbStats DatadbStats
s := newTestStorage()
mustCreatePartition(path)
pt := mustOpenPartition(s, path)
// Try adding the same entry at a time.
totalRowsCount := uint64(0)
for i := 0; i < 100; i++ {
lr := newTestLogRows(1, 1, 0)
totalRowsCount += uint64(len(lr.timestamps))
pt.mustAddRows(lr)
ddbStats.reset()
pt.ddb.updateStats(&ddbStats)
if n := ddbStats.RowsCount(); n != totalRowsCount {
t.Fatalf("unexpected number of entries in partition; got %d; want %d", n, totalRowsCount)
}
}
// Try adding different entry at a time.
for i := 0; i < 100; i++ {
lr := newTestLogRows(1, 1, int64(i))
totalRowsCount += uint64(len(lr.timestamps))
pt.mustAddRows(lr)
ddbStats.reset()
pt.ddb.updateStats(&ddbStats)
if n := ddbStats.RowsCount(); n != totalRowsCount {
t.Fatalf("unexpected number of entries in partition; got %d; want %d", n, totalRowsCount)
}
}
// Re-open the partition and verify the number of entries remains the same
mustClosePartition(pt)
pt = mustOpenPartition(s, path)
ddbStats.reset()
pt.ddb.updateStats(&ddbStats)
if n := ddbStats.RowsCount(); n != totalRowsCount {
t.Fatalf("unexpected number of entries after re-opening the partition; got %d; want %d", n, totalRowsCount)
}
if ddbStats.InmemoryParts != 0 {
t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts)
}
if ddbStats.SmallParts+ddbStats.BigParts == 0 {
t.Fatalf("the number of small parts must be greater than 0 after re-opening the partition")
}
// Try adding entries for multiple streams at a time
for i := 0; i < 5; i++ {
lr := newTestLogRows(3, 7, 0)
totalRowsCount += uint64(len(lr.timestamps))
pt.mustAddRows(lr)
ddbStats.reset()
pt.ddb.updateStats(&ddbStats)
if n := ddbStats.RowsCount(); n != totalRowsCount {
t.Fatalf("unexpected number of entries in partition; got %d; want %d", n, totalRowsCount)
}
time.Sleep(time.Millisecond)
}
// Re-open the partition and verify the number of entries remains the same
mustClosePartition(pt)
pt = mustOpenPartition(s, path)
ddbStats.reset()
pt.ddb.updateStats(&ddbStats)
if n := ddbStats.RowsCount(); n != totalRowsCount {
t.Fatalf("unexpected number of entries after re-opening the partition; got %d; want %d", n, totalRowsCount)
}
if ddbStats.InmemoryParts != 0 {
t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts)
}
if ddbStats.SmallParts+ddbStats.BigParts == 0 {
t.Fatalf("the number of file parts must be greater than 0 after re-opening the partition")
}
mustClosePartition(pt)
mustDeletePartition(path)
closeTestStorage(s)
}
func TestPartitionMustAddRowsConcurrent(t *testing.T) {
t.Parallel()
path := t.Name()
s := newTestStorage()
mustCreatePartition(path)
pt := mustOpenPartition(s, path)
const workersCount = 3
var totalRowsCount atomic.Uint64
doneCh := make(chan struct{}, workersCount)
for i := 0; i < cap(doneCh); i++ {
go func() {
for j := 0; j < 7; j++ {
lr := newTestLogRows(5, 10, int64(j))
pt.mustAddRows(lr)
totalRowsCount.Add(uint64(len(lr.timestamps)))
}
doneCh <- struct{}{}
}()
}
timer := timerpool.Get(time.Second)
defer timerpool.Put(timer)
for i := 0; i < cap(doneCh); i++ {
select {
case <-doneCh:
case <-timer.C:
t.Fatalf("timeout")
}
}
var ddbStats DatadbStats
pt.ddb.updateStats(&ddbStats)
if n := ddbStats.RowsCount(); n != totalRowsCount.Load() {
t.Fatalf("unexpected number of entries; got %d; want %d", n, totalRowsCount.Load())
}
mustClosePartition(pt)
mustDeletePartition(path)
closeTestStorage(s)
}
// newTestStorage creates new storage for tests.
//
// When the storage is no longer needed, closeTestStorage() must be called.
func newTestStorage() *Storage {
streamIDCache := newCache()
filterStreamCache := newCache()
return &Storage{
flushInterval: time.Second,
streamIDCache: streamIDCache,
filterStreamCache: filterStreamCache,
}
}
// closeTestStorage closes storage created via newTestStorage().
func closeTestStorage(s *Storage) {
s.streamIDCache.MustStop()
s.filterStreamCache.MustStop()
}