diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 8e6145563..7883bd449 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -1,7 +1,6 @@ package workingsetcache import ( - "flag" "runtime" "sync" "sync/atomic" @@ -10,9 +9,6 @@ import ( "github.com/VictoriaMetrics/fastcache" ) -var oldBehavior = flag.Bool("cache.oldBehavior", false, "Whether to use old behaviour for caches. Old behavior can give better resuts "+ - "for low-RAM systems serving big number of time series. Systems with enough RAM would consume more RAM when `-cache.oldBehavior` is enabled") - // Cache is a cache for working set entries. // // The cache evicts inactive entries after the given expireDuration. @@ -24,6 +20,17 @@ type Cache struct { curr atomic.Value prev atomic.Value + // skipPrev indicates whether to use only curr and skip prev. + // + // This flag is set if curr is filled for more than 50% space. + // In this case using prev would result in RAM waste, + // it is better to use only curr cache with doubled size. + skipPrev uint64 + + // mu serializes access to curr, prev and skipPrev + // in expirationWorker and cacheSizeWatcher. + mu sync.Mutex + wg sync.WaitGroup stopCh chan struct{} @@ -35,10 +42,8 @@ type Cache struct { // // Stop must be called on the returned cache when it is no longer needed. func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache { - if !*oldBehavior { - // Split maxBytes between curr and prev caches. - maxBytes /= 2 - } + // Split maxBytes between curr and prev caches. + maxBytes /= 2 curr := fastcache.LoadFromFileOrNew(filePath, maxBytes) return newWorkingSetCache(curr, maxBytes, expireDuration) } @@ -48,10 +53,8 @@ func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache { // // Stop must be called on the returned cache when it is no longer needed. func New(maxBytes int, expireDuration time.Duration) *Cache { - if !*oldBehavior { - // Split maxBytes between curr and prev caches. - maxBytes /= 2 - } + // Split maxBytes between curr and prev caches. + maxBytes /= 2 curr := fastcache.New(maxBytes) return newWorkingSetCache(curr, maxBytes, expireDuration) } @@ -62,21 +65,33 @@ func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time c.curr.Store(curr) c.prev.Store(prev) c.stopCh = make(chan struct{}) + c.wg.Add(1) go func() { defer c.wg.Done() - t := time.NewTicker(expireDuration / 2) - for { - select { - case <-c.stopCh: - return - case <-t.C: - } - if *oldBehavior { - // Keep the curr cache for old behavior. - continue - } + c.expirationWorker(maxBytes, expireDuration) + }() + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.cacheSizeWatcher(maxBytes) + }() + return &c +} +func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) { + t := time.NewTicker(expireDuration / 2) + for { + select { + case <-c.stopCh: + t.Stop() + return + case <-t.C: + } + + c.mu.Lock() + if atomic.LoadUint64(&c.skipPrev) != 0 { + // Expire prev cache and create fresh curr cache. // Do not reuse prev cache, since it can have too big capacity. prev := c.prev.Load().(*fastcache.Cache) prev.Reset() @@ -85,8 +100,39 @@ func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time curr = fastcache.New(maxBytes) c.curr.Store(curr) } - }() - return &c + c.mu.Unlock() + } +} + +func (c *Cache) cacheSizeWatcher(maxBytes int) { + t := time.NewTicker(time.Minute) + for { + select { + case <-c.stopCh: + t.Stop() + return + case <-t.C: + } + var cs fastcache.Stats + curr := c.curr.Load().(*fastcache.Cache) + curr.UpdateStats(&cs) + if cs.BytesSize < uint64(maxBytes)/2 { + continue + } + + // curr cache size exceeds 50% of its capacity. It is better + // to double the size of curr cache and stop using prev cache, + // since this will result in higher summary cache capacity. + c.mu.Lock() + curr.Reset() + prev := c.prev.Load().(*fastcache.Cache) + prev.Reset() + curr = fastcache.New(maxBytes * 2) + c.curr.Store(curr) + atomic.StoreUint64(&c.skipPrev, 1) + c.mu.Unlock() + return + } } // Save safes the cache to filePath. @@ -121,11 +167,11 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) { curr := c.curr.Load().(*fastcache.Cache) fcsOrig := *fcs curr.UpdateStats(fcs) - if *oldBehavior { + if atomic.LoadUint64(&c.skipPrev) != 0 { return } - fcs.Misses = fcsOrig.Misses + atomic.LoadUint64(&c.misses) + fcs.Misses = fcsOrig.Misses + atomic.LoadUint64(&c.misses) fcsOrig.Reset() prev := c.prev.Load().(*fastcache.Cache) prev.UpdateStats(&fcsOrig) @@ -141,7 +187,7 @@ func (c *Cache) Get(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if *oldBehavior { + if atomic.LoadUint64(&c.skipPrev) != 0 { return result } @@ -164,7 +210,7 @@ func (c *Cache) Has(key []byte) bool { if curr.Has(key) { return true } - if *oldBehavior { + if atomic.LoadUint64(&c.skipPrev) != 0 { return false } prev := c.prev.Load().(*fastcache.Cache) @@ -185,7 +231,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if *oldBehavior { + if atomic.LoadUint64(&c.skipPrev) != 0 { return result }