diff --git a/lib/logstorage/cache.go b/lib/logstorage/cache.go new file mode 100644 index 000000000..1fde7dd1e --- /dev/null +++ b/lib/logstorage/cache.go @@ -0,0 +1,83 @@ +package logstorage + +import ( + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" +) + +type cache struct { + curr atomic.Pointer[sync.Map] + prev atomic.Pointer[sync.Map] + + stopCh chan struct{} + wg sync.WaitGroup +} + +func newCache() *cache { + var c cache + c.curr.Store(&sync.Map{}) + c.prev.Store(&sync.Map{}) + + c.stopCh = make(chan struct{}) + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.runCleaner() + }() + return &c +} + +func (c *cache) MustStop() { + close(c.stopCh) + c.wg.Wait() +} + +func (c *cache) runCleaner() { + d := timeutil.AddJitterToDuration(3 * time.Minute) + t := time.NewTicker(d) + defer t.Stop() + for { + select { + case <-t.C: + c.clean() + case <-c.stopCh: + return + } + } +} + +func (c *cache) clean() { + curr := c.curr.Load() + c.prev.Store(curr) + c.curr.Store(&sync.Map{}) +} + +func (c *cache) Get(k []byte) (any, bool) { + kStr := bytesutil.ToUnsafeString(k) + + curr := c.curr.Load() + v, ok := curr.Load(kStr) + if ok { + return v, true + } + + prev := c.prev.Load() + v, ok = prev.Load(kStr) + if ok { + kStr = strings.Clone(kStr) + curr.Store(kStr, v) + return v, true + } + return nil, false +} + +func (c *cache) Set(k []byte, v any) { + kStr := string(k) + curr := c.curr.Load() + curr.Store(kStr, v) +} diff --git a/lib/logstorage/cache_test.go b/lib/logstorage/cache_test.go new file mode 100644 index 000000000..dd34d6e39 --- /dev/null +++ b/lib/logstorage/cache_test.go @@ -0,0 +1,59 @@ +package logstorage + +import ( + "fmt" + "testing" +) + +func TestCache(t *testing.T) { + m := make(map[string]int) + for i := 0; i < 10; i++ { + k := fmt.Sprintf("key_%d", i) + m[k] = i + } + + c := newCache() + defer c.MustStop() + + for kStr := range m { + k := []byte(kStr) + + if v, ok := c.Get(k); ok { + t.Fatalf("unexpected value obtained from the cache for key %q: %v", k, v) + } + c.Set(k, m[kStr]) + v, ok := c.Get(k) + if !ok { + t.Fatalf("cannot obtain value for key %q", k) + } + if n := v.(int); n != m[kStr] { + t.Fatalf("unexpected value obtained for key %q; got %d; want %d", k, n, m[kStr]) + } + } + + // The cached entries should be still visible after a single clean() call. + c.clean() + for kStr := range m { + k := []byte(kStr) + + v, ok := c.Get(k) + if !ok { + t.Fatalf("cannot obtain value for key %q", k) + } + if n := v.(int); n != m[kStr] { + t.Fatalf("unexpected value obtained for key %q; got %d; want %d", k, n, m[kStr]) + } + } + + // The cached entries must be dropped after two clean() calls. + c.clean() + c.clean() + + for kStr := range m { + k := []byte(kStr) + + if v, ok := c.Get(k); ok { + t.Fatalf("unexpected value obtained from the cache for key %q: %v", k, v) + } + } +} diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index fd90ac108..811d2b536 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -500,13 +500,14 @@ func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) { bb := bbPool.Get() bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf) - data := idb.s.filterStreamCache.GetBig(nil, bb.B) + v, ok := idb.s.filterStreamCache.Get(bb.B) bbPool.Put(bb) - if len(data) == 0 { + if !ok { // Cache miss return nil, false } // Cache hit - unpack streamIDs from data. + data := *(v.(*[]byte)) n, nSize := encoding.UnmarshalVarUint64(data) if nSize <= 0 { logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache") @@ -537,7 +538,7 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter // Store marshaled streamIDs to cache. bb := bbPool.Get() bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf) - idb.s.filterStreamCache.SetBig(bb.B, b) + idb.s.filterStreamCache.Set(bb.B, &b) bbPool.Put(bb) } diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go index 439ca002f..cf3fa9b23 100644 --- a/lib/logstorage/partition.go +++ b/lib/logstorage/partition.go @@ -1,7 +1,6 @@ package logstorage import ( - "bytes" "path/filepath" "sort" @@ -160,20 +159,18 @@ func (pt *partition) logIngestedRows(lr *LogRows) { } func (pt *partition) hasStreamIDInCache(sid *streamID) bool { - var result [1]byte - bb := bbPool.Get() bb.B = pt.marshalStreamIDCacheKey(bb.B, sid) - value := pt.s.streamIDCache.Get(result[:0], bb.B) + _, ok := pt.s.streamIDCache.Get(bb.B) bbPool.Put(bb) - return bytes.Equal(value, okValue) + return ok } func (pt *partition) putStreamIDToCache(sid *streamID) { bb := bbPool.Get() bb.B = pt.marshalStreamIDCacheKey(bb.B, sid) - pt.s.streamIDCache.Set(bb.B, okValue) + pt.s.streamIDCache.Set(bb.B, nil) bbPool.Put(bb) } @@ -183,8 +180,6 @@ func (pt *partition) marshalStreamIDCacheKey(dst []byte, sid *streamID) []byte { return dst } -var okValue = []byte("1") - // debugFlush makes sure that all the recently ingested data data becomes searchable func (pt *partition) debugFlush() { pt.ddb.debugFlush() diff --git a/lib/logstorage/partition_test.go b/lib/logstorage/partition_test.go index 7f67ffb45..de297a85c 100644 --- a/lib/logstorage/partition_test.go +++ b/lib/logstorage/partition_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) func TestPartitionLifecycle(t *testing.T) { @@ -183,8 +182,8 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) { // // When the storage is no longer needed, closeTestStorage() must be called. func newTestStorage() *Storage { - streamIDCache := workingsetcache.New(1024 * 1024) - filterStreamCache := workingsetcache.New(1024 * 1024) + streamIDCache := newCache() + filterStreamCache := newCache() return &Storage{ flushInterval: time.Second, streamIDCache: streamIDCache, @@ -194,6 +193,6 @@ func newTestStorage() *Storage { // closeTestStorage closes storage created via newTestStorage(). func closeTestStorage(s *Storage) { - s.streamIDCache.Stop() - s.filterStreamCache.Stop() + s.streamIDCache.MustStop() + s.filterStreamCache.MustStop() } diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 093658f4f..a5de2b88e 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -11,9 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) // StorageStats represents stats for the storage. It may be obtained by calling Storage.UpdateStats(). @@ -136,12 +134,12 @@ type Storage struct { // // It reduces the load on persistent storage during data ingestion by skipping // the check whether the given stream is already registered in the persistent storage. - streamIDCache *workingsetcache.Cache + streamIDCache *cache // filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter). // // It reduces the load on persistent storage during querying by _stream:{...} filter. - filterStreamCache *workingsetcache.Cache + filterStreamCache *cache } type partitionWrapper struct { @@ -243,10 +241,8 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { flockF := fs.MustCreateFlockFile(path) // Load caches - mem := memory.Allowed() - - streamIDCache := workingsetcache.New(mem / 16) - filterStreamCache := workingsetcache.New(mem / 10) + streamIDCache := newCache() + filterStreamCache := newCache() s := &Storage{ path: path, @@ -463,10 +459,10 @@ func (s *Storage) MustClose() { // between VictoriaLogs restarts. This may result in various issues // during data ingestion and querying. - s.streamIDCache.Stop() + s.streamIDCache.MustStop() s.streamIDCache = nil - s.filterStreamCache.Stop() + s.filterStreamCache.MustStop() s.filterStreamCache = nil // release lock file