From 0f240781461d85cc1b3b69c01dcb7cd37f5640cd Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 18 Oct 2024 02:15:03 +0200 Subject: [PATCH] lib/logstorage: use simpler in-memory cache instead of workingsetcache for caching recently ingested _stream values and recently queried set of streams These caches aren't expected to grow big, so it is OK to use the most simplest cache based on sync.Map. The benefit of this cache compared to workingsetcache is better scalability on systems with many CPU cores, since it doesn't use mutexes at fast path. An additional benefit is lower memory usage on average, since the size of in-memory cache equals working set for the last 3 minutes. The downside is that there is no upper bound for the cache size, so it may grow big during workload spikes. But this is very unlikely for typical workloads. --- lib/logstorage/cache.go | 83 ++++++++++++++++++++++++++++++++ lib/logstorage/cache_test.go | 59 +++++++++++++++++++++++ lib/logstorage/indexdb.go | 7 +-- lib/logstorage/partition.go | 11 ++--- lib/logstorage/partition_test.go | 9 ++-- lib/logstorage/storage.go | 16 +++--- 6 files changed, 159 insertions(+), 26 deletions(-) create mode 100644 lib/logstorage/cache.go create mode 100644 lib/logstorage/cache_test.go diff --git a/lib/logstorage/cache.go b/lib/logstorage/cache.go new file mode 100644 index 000000000..1fde7dd1e --- /dev/null +++ b/lib/logstorage/cache.go @@ -0,0 +1,83 @@ +package logstorage + +import ( + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" +) + +type cache struct { + curr atomic.Pointer[sync.Map] + prev atomic.Pointer[sync.Map] + + stopCh chan struct{} + wg sync.WaitGroup +} + +func newCache() *cache { + var c cache + c.curr.Store(&sync.Map{}) + c.prev.Store(&sync.Map{}) + + c.stopCh = make(chan struct{}) + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.runCleaner() + }() + return &c +} + +func (c *cache) MustStop() { + close(c.stopCh) + c.wg.Wait() +} + +func (c *cache) runCleaner() { + d := timeutil.AddJitterToDuration(3 * time.Minute) + t := time.NewTicker(d) + defer t.Stop() + for { + select { + case <-t.C: + c.clean() + case <-c.stopCh: + return + } + } +} + +func (c *cache) clean() { + curr := c.curr.Load() + c.prev.Store(curr) + c.curr.Store(&sync.Map{}) +} + +func (c *cache) Get(k []byte) (any, bool) { + kStr := bytesutil.ToUnsafeString(k) + + curr := c.curr.Load() + v, ok := curr.Load(kStr) + if ok { + return v, true + } + + prev := c.prev.Load() + v, ok = prev.Load(kStr) + if ok { + kStr = strings.Clone(kStr) + curr.Store(kStr, v) + return v, true + } + return nil, false +} + +func (c *cache) Set(k []byte, v any) { + kStr := string(k) + curr := c.curr.Load() + curr.Store(kStr, v) +} diff --git a/lib/logstorage/cache_test.go b/lib/logstorage/cache_test.go new file mode 100644 index 000000000..dd34d6e39 --- /dev/null +++ b/lib/logstorage/cache_test.go @@ -0,0 +1,59 @@ +package logstorage + +import ( + "fmt" + "testing" +) + +func TestCache(t *testing.T) { + m := make(map[string]int) + for i := 0; i < 10; i++ { + k := fmt.Sprintf("key_%d", i) + m[k] = i + } + + c := newCache() + defer c.MustStop() + + for kStr := range m { + k := []byte(kStr) + + if v, ok := c.Get(k); ok { + t.Fatalf("unexpected value obtained from the cache for key %q: %v", k, v) + } + c.Set(k, m[kStr]) + v, ok := c.Get(k) + if !ok { + t.Fatalf("cannot obtain value for key %q", k) + } + if n := v.(int); n != m[kStr] { + t.Fatalf("unexpected value obtained for key %q; got %d; want %d", k, n, m[kStr]) + } + } + + // The cached entries should be still visible after a single clean() call. + c.clean() + for kStr := range m { + k := []byte(kStr) + + v, ok := c.Get(k) + if !ok { + t.Fatalf("cannot obtain value for key %q", k) + } + if n := v.(int); n != m[kStr] { + t.Fatalf("unexpected value obtained for key %q; got %d; want %d", k, n, m[kStr]) + } + } + + // The cached entries must be dropped after two clean() calls. + c.clean() + c.clean() + + for kStr := range m { + k := []byte(kStr) + + if v, ok := c.Get(k); ok { + t.Fatalf("unexpected value obtained from the cache for key %q: %v", k, v) + } + } +} diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index fd90ac108..811d2b536 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -500,13 +500,14 @@ func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) { bb := bbPool.Get() bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf) - data := idb.s.filterStreamCache.GetBig(nil, bb.B) + v, ok := idb.s.filterStreamCache.Get(bb.B) bbPool.Put(bb) - if len(data) == 0 { + if !ok { // Cache miss return nil, false } // Cache hit - unpack streamIDs from data. + data := *(v.(*[]byte)) n, nSize := encoding.UnmarshalVarUint64(data) if nSize <= 0 { logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache") @@ -537,7 +538,7 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter // Store marshaled streamIDs to cache. bb := bbPool.Get() bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf) - idb.s.filterStreamCache.SetBig(bb.B, b) + idb.s.filterStreamCache.Set(bb.B, &b) bbPool.Put(bb) } diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go index 439ca002f..cf3fa9b23 100644 --- a/lib/logstorage/partition.go +++ b/lib/logstorage/partition.go @@ -1,7 +1,6 @@ package logstorage import ( - "bytes" "path/filepath" "sort" @@ -160,20 +159,18 @@ func (pt *partition) logIngestedRows(lr *LogRows) { } func (pt *partition) hasStreamIDInCache(sid *streamID) bool { - var result [1]byte - bb := bbPool.Get() bb.B = pt.marshalStreamIDCacheKey(bb.B, sid) - value := pt.s.streamIDCache.Get(result[:0], bb.B) + _, ok := pt.s.streamIDCache.Get(bb.B) bbPool.Put(bb) - return bytes.Equal(value, okValue) + return ok } func (pt *partition) putStreamIDToCache(sid *streamID) { bb := bbPool.Get() bb.B = pt.marshalStreamIDCacheKey(bb.B, sid) - pt.s.streamIDCache.Set(bb.B, okValue) + pt.s.streamIDCache.Set(bb.B, nil) bbPool.Put(bb) } @@ -183,8 +180,6 @@ func (pt *partition) marshalStreamIDCacheKey(dst []byte, sid *streamID) []byte { return dst } -var okValue = []byte("1") - // debugFlush makes sure that all the recently ingested data data becomes searchable func (pt *partition) debugFlush() { pt.ddb.debugFlush() diff --git a/lib/logstorage/partition_test.go b/lib/logstorage/partition_test.go index 7f67ffb45..de297a85c 100644 --- a/lib/logstorage/partition_test.go +++ b/lib/logstorage/partition_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) func TestPartitionLifecycle(t *testing.T) { @@ -183,8 +182,8 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) { // // When the storage is no longer needed, closeTestStorage() must be called. func newTestStorage() *Storage { - streamIDCache := workingsetcache.New(1024 * 1024) - filterStreamCache := workingsetcache.New(1024 * 1024) + streamIDCache := newCache() + filterStreamCache := newCache() return &Storage{ flushInterval: time.Second, streamIDCache: streamIDCache, @@ -194,6 +193,6 @@ func newTestStorage() *Storage { // closeTestStorage closes storage created via newTestStorage(). func closeTestStorage(s *Storage) { - s.streamIDCache.Stop() - s.filterStreamCache.Stop() + s.streamIDCache.MustStop() + s.filterStreamCache.MustStop() } diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 093658f4f..a5de2b88e 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -11,9 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) // StorageStats represents stats for the storage. It may be obtained by calling Storage.UpdateStats(). @@ -136,12 +134,12 @@ type Storage struct { // // It reduces the load on persistent storage during data ingestion by skipping // the check whether the given stream is already registered in the persistent storage. - streamIDCache *workingsetcache.Cache + streamIDCache *cache // filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter). // // It reduces the load on persistent storage during querying by _stream:{...} filter. - filterStreamCache *workingsetcache.Cache + filterStreamCache *cache } type partitionWrapper struct { @@ -243,10 +241,8 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { flockF := fs.MustCreateFlockFile(path) // Load caches - mem := memory.Allowed() - - streamIDCache := workingsetcache.New(mem / 16) - filterStreamCache := workingsetcache.New(mem / 10) + streamIDCache := newCache() + filterStreamCache := newCache() s := &Storage{ path: path, @@ -463,10 +459,10 @@ func (s *Storage) MustClose() { // between VictoriaLogs restarts. This may result in various issues // during data ingestion and querying. - s.streamIDCache.Stop() + s.streamIDCache.MustStop() s.streamIDCache = nil - s.filterStreamCache.Stop() + s.filterStreamCache.MustStop() s.filterStreamCache = nil // release lock file