diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index c56dccd71b..47e247ae06 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -42,7 +42,7 @@ type Cache struct { maxBytes int // mu serializes access to curr, prev and mode - // in expirationWatcher and cacheSizeWatcher. + // in expirationWatcher, prevCacheWatcher and cacheSizeWatcher. mu sync.Mutex wg sync.WaitGroup @@ -122,6 +122,11 @@ func (c *Cache) runWatchers(expireDuration time.Duration) { c.expirationWatcher(expireDuration) }() c.wg.Add(1) + go func() { + defer c.wg.Done() + c.prevCacheWatcher() + }() + c.wg.Add(1) go func() { defer c.wg.Done() c.cacheSizeWatcher() @@ -132,7 +137,6 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) { expireDuration += timeJitter(expireDuration / 10) t := time.NewTicker(expireDuration) defer t.Stop() - var csCurr, csPrev fastcache.Stats for { select { case <-c.stopCh: @@ -148,52 +152,61 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) { // Reset prev cache and swap it with the curr cache. prev := c.prev.Load().(*fastcache.Cache) curr := c.curr.Load().(*fastcache.Cache) - csCurr.Reset() - curr.UpdateStats(&csCurr) - csPrev.Reset() - prev.UpdateStats(&csPrev) - c.prev.Store(curr) - - prevGetCalls := csCurr.GetCalls - updateCacheStatsHistory(&c.csHistory, &csPrev) - + var cs fastcache.Stats + prev.UpdateStats(&cs) + updateCacheStatsHistory(&c.csHistory, &cs) prev.Reset() c.curr.Store(prev) c.mu.Unlock() + } +} - // Watch for the usage of the prev cache and drop it whenever it receives - // less than 5% of get calls comparing to the curr cache. - // This allows saving memory. - prev, curr = curr, prev - checkInterval := 10 * time.Second - checkerT := time.NewTicker(checkInterval) - checkerDeadline := time.Now().Add(expireDuration - checkInterval) - for time.Now().Before(checkerDeadline) { - select { - case <-c.stopCh: - break - case <-checkerT.C: - } - c.mu.Lock() - if atomic.LoadUint32(&c.mode) != split { - // Do nothing in non-split mode. - c.mu.Unlock() - break - } - csCurr.Reset() - curr.UpdateStats(&csCurr) - csPrev.Reset() - prev.UpdateStats(&csPrev) - getCalls := csPrev.GetCalls - prevGetCalls - if float64(getCalls) < 0.05*float64(csCurr.GetCalls) { - // The majority of requests are served from the curr cache, - // so the prev cache can be deleted. +func (c *Cache) prevCacheWatcher() { + // Watch for the usage of the prev cache and drop it whenever it receives + // less than 5% of requests comparing to the curr cache during the last 10 seconds. + checkInterval := 10 * time.Second + checkInterval += timeJitter(checkInterval / 10) + t := time.NewTicker(checkInterval) + defer t.Stop() + prevGetCalls := uint64(0) + currGetCalls := uint64(0) + for { + select { + case <-c.stopCh: + return + case <-t.C: + } + c.mu.Lock() + if atomic.LoadUint32(&c.mode) != split { + // Do nothing in non-split mode. + c.mu.Unlock() + return + } + prev := c.prev.Load().(*fastcache.Cache) + curr := c.curr.Load().(*fastcache.Cache) + var csCurr, csPrev fastcache.Stats + curr.UpdateStats(&csCurr) + prev.UpdateStats(&csPrev) + currRequests := csCurr.GetCalls + if currRequests >= currGetCalls { + currRequests -= currGetCalls + } + prevRequests := csPrev.GetCalls + if prevRequests >= prevGetCalls { + prevRequests -= prevGetCalls + } + currGetCalls = csCurr.GetCalls + prevGetCalls = csPrev.GetCalls + if currRequests >= 20 && float64(prevRequests)/float64(currRequests) < 0.05 { + // The majority of requests are served from the curr cache, + // so the prev cache can be deleted in order to free up memory. + if csPrev.EntriesCount > 0 { + updateCacheStatsHistory(&c.csHistory, &csPrev) prev.Reset() - break } } - checkerT.Stop() + c.mu.Unlock() } } @@ -239,11 +252,9 @@ func (c *Cache) cacheSizeWatcher() { prev := c.prev.Load().(*fastcache.Cache) curr := c.curr.Load().(*fastcache.Cache) c.prev.Store(curr) - var cs fastcache.Stats prev.UpdateStats(&cs) updateCacheStatsHistory(&c.csHistory, &cs) - prev.Reset() // use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache // couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit. @@ -268,11 +279,9 @@ func (c *Cache) cacheSizeWatcher() { c.setMode(whole) prev = c.prev.Load().(*fastcache.Cache) c.prev.Store(fastcache.New(1024)) - cs.Reset() prev.UpdateStats(&cs) updateCacheStatsHistory(&c.csHistory, &cs) - prev.Reset() c.mu.Unlock() } @@ -296,9 +305,13 @@ func (c *Cache) Stop() { // Reset resets the cache. func (c *Cache) Reset() { + var cs fastcache.Stats prev := c.prev.Load().(*fastcache.Cache) + prev.UpdateStats(&cs) prev.Reset() curr := c.curr.Load().(*fastcache.Cache) + curr.UpdateStats(&cs) + updateCacheStatsHistory(&c.csHistory, &cs) curr.Reset() // Reset the mode to `split` in the hope the working set size becomes smaller after the reset. c.setMode(split)