diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 7883bd4493..00d2159cf4 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -9,6 +9,13 @@ import ( "github.com/VictoriaMetrics/fastcache" ) +// Cache modes. +const ( + split = 0 + switching = 1 + whole = 2 +) + // Cache is a cache for working set entries. // // The cache evicts inactive entries after the given expireDuration. @@ -20,14 +27,15 @@ type Cache struct { curr atomic.Value prev atomic.Value - // skipPrev indicates whether to use only curr and skip prev. + // mode indicates whether to use only curr and skip prev. // - // This flag is set if curr is filled for more than 50% space. + // This flag is set to switching if curr is filled for more than 50% space. // In this case using prev would result in RAM waste, // it is better to use only curr cache with doubled size. - skipPrev uint64 + // After the process of switching, this flag will be set to whole. + mode uint64 - // mu serializes access to curr, prev and skipPrev + // mu serializes access to curr, prev and mode // in expirationWorker and cacheSizeWatcher. mu sync.Mutex @@ -42,10 +50,28 @@ type Cache struct { // // Stop must be called on the returned cache when it is no longer needed. func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache { - // Split maxBytes between curr and prev caches. - maxBytes /= 2 curr := fastcache.LoadFromFileOrNew(filePath, maxBytes) - return newWorkingSetCache(curr, maxBytes, expireDuration) + var cs fastcache.Stats + curr.UpdateStats(&cs) + if cs.EntriesCount == 0 { + curr.Reset() + // The cache couldn't be loaded with maxBytes size. + // This may mean that the cache is split into curr and prev caches. + // Try loading it again with maxBytes / 2 size. + maxBytes /= 2 + curr = fastcache.LoadFromFileOrNew(filePath, maxBytes) + return newWorkingSetCache(curr, maxBytes, expireDuration) + } + + // The cache has been successfully loaded in full. + // Set its' mode to `whole`. + // There is no need in starting expirationWorker and cacheSizeWatcher. + var c Cache + c.curr.Store(curr) + c.prev.Store(fastcache.New(1024)) + c.stopCh = make(chan struct{}) + atomic.StoreUint64(&c.mode, whole) + return &c } // New creates new cache with the given maxBytes size and the given expireDuration @@ -65,6 +91,7 @@ func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time c.curr.Store(curr) c.prev.Store(prev) c.stopCh = make(chan struct{}) + atomic.StoreUint64(&c.mode, split) c.wg.Add(1) go func() { @@ -90,7 +117,7 @@ func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) { } c.mu.Lock() - if atomic.LoadUint64(&c.skipPrev) != 0 { + if atomic.LoadUint64(&c.mode) == split { // Expire prev cache and create fresh curr cache. // Do not reuse prev cache, since it can have too big capacity. prev := c.prev.Load().(*fastcache.Cache) @@ -106,33 +133,63 @@ func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) { func (c *Cache) cacheSizeWatcher(maxBytes int) { t := time.NewTicker(time.Minute) + defer t.Stop() + for { select { case <-c.stopCh: - t.Stop() return case <-t.C: } var cs fastcache.Stats curr := c.curr.Load().(*fastcache.Cache) curr.UpdateStats(&cs) - if cs.BytesSize < uint64(maxBytes)/2 { - continue + if cs.BytesSize >= uint64(maxBytes)/2 { + break } - - // curr cache size exceeds 50% of its capacity. It is better - // to double the size of curr cache and stop using prev cache, - // since this will result in higher summary cache capacity. - c.mu.Lock() - curr.Reset() - prev := c.prev.Load().(*fastcache.Cache) - prev.Reset() - curr = fastcache.New(maxBytes * 2) - c.curr.Store(curr) - atomic.StoreUint64(&c.skipPrev, 1) - c.mu.Unlock() - return } + + // curr cache size exceeds 50% of its capacity. It is better + // to double the size of curr cache and stop using prev cache, + // since this will result in higher summary cache capacity. + // + // Do this in the following steps: + // 1) switch to mode=switching + // 2) move curr cache to prev + // 3) create curr with the double size + // 4) wait until curr size exceeds maxBytes/2, i.e. it is populated with new data + // 5) switch to mode=whole + // 6) drop prev + + c.mu.Lock() + atomic.StoreUint64(&c.mode, switching) + prev := c.prev.Load().(*fastcache.Cache) + prev.Reset() + curr := c.curr.Load().(*fastcache.Cache) + c.prev.Store(curr) + c.curr.Store(fastcache.New(maxBytes * 2)) + c.mu.Unlock() + + for { + select { + case <-c.stopCh: + return + case <-t.C: + } + var cs fastcache.Stats + curr := c.curr.Load().(*fastcache.Cache) + curr.UpdateStats(&cs) + if cs.BytesSize >= uint64(maxBytes)/2 { + break + } + } + + c.mu.Lock() + atomic.StoreUint64(&c.mode, whole) + prev = c.prev.Load().(*fastcache.Cache) + prev.Reset() + c.prev.Store(fastcache.New(1024)) + c.mu.Unlock() } // Save safes the cache to filePath. @@ -167,7 +224,7 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) { curr := c.curr.Load().(*fastcache.Cache) fcsOrig := *fcs curr.UpdateStats(fcs) - if atomic.LoadUint64(&c.skipPrev) != 0 { + if atomic.LoadUint64(&c.mode) == whole { return } @@ -187,7 +244,7 @@ func (c *Cache) Get(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if atomic.LoadUint64(&c.skipPrev) != 0 { + if atomic.LoadUint64(&c.mode) == whole { return result } @@ -210,7 +267,7 @@ func (c *Cache) Has(key []byte) bool { if curr.Has(key) { return true } - if atomic.LoadUint64(&c.skipPrev) != 0 { + if atomic.LoadUint64(&c.mode) == whole { return false } prev := c.prev.Load().(*fastcache.Cache) @@ -231,7 +288,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if atomic.LoadUint64(&c.skipPrev) != 0 { + if atomic.LoadUint64(&c.mode) == whole { return result }