lib/workingsetcache: automatically detect when it is better to double cache capacity

This commit is contained in:
Aliaksandr Valialkin 2019-08-15 22:57:43 +03:00
parent 9e0896055d
commit 483de1cc06

View file

@ -1,7 +1,6 @@
package workingsetcache
import (
"flag"
"runtime"
"sync"
"sync/atomic"
@ -10,9 +9,6 @@ import (
"github.com/VictoriaMetrics/fastcache"
)
var oldBehavior = flag.Bool("cache.oldBehavior", false, "Whether to use old behaviour for caches. Old behavior can give better resuts "+
"for low-RAM systems serving big number of time series. Systems with enough RAM would consume more RAM when `-cache.oldBehavior` is enabled")
// Cache is a cache for working set entries.
//
// The cache evicts inactive entries after the given expireDuration.
@ -24,6 +20,17 @@ type Cache struct {
curr atomic.Value
prev atomic.Value
// skipPrev indicates whether to use only curr and skip prev.
//
// This flag is set if curr is filled for more than 50% space.
// In this case using prev would result in RAM waste,
// it is better to use only curr cache with doubled size.
skipPrev uint64
// mu serializes access to curr, prev and skipPrev
// in expirationWorker and cacheSizeWatcher.
mu sync.Mutex
wg sync.WaitGroup
stopCh chan struct{}
@ -35,10 +42,8 @@ type Cache struct {
//
// Stop must be called on the returned cache when it is no longer needed.
func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
if !*oldBehavior {
// Split maxBytes between curr and prev caches.
maxBytes /= 2
}
// Split maxBytes between curr and prev caches.
maxBytes /= 2
curr := fastcache.LoadFromFileOrNew(filePath, maxBytes)
return newWorkingSetCache(curr, maxBytes, expireDuration)
}
@ -48,10 +53,8 @@ func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
//
// Stop must be called on the returned cache when it is no longer needed.
func New(maxBytes int, expireDuration time.Duration) *Cache {
if !*oldBehavior {
// Split maxBytes between curr and prev caches.
maxBytes /= 2
}
// Split maxBytes between curr and prev caches.
maxBytes /= 2
curr := fastcache.New(maxBytes)
return newWorkingSetCache(curr, maxBytes, expireDuration)
}
@ -62,21 +65,33 @@ func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time
c.curr.Store(curr)
c.prev.Store(prev)
c.stopCh = make(chan struct{})
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTicker(expireDuration / 2)
for {
select {
case <-c.stopCh:
return
case <-t.C:
}
if *oldBehavior {
// Keep the curr cache for old behavior.
continue
}
c.expirationWorker(maxBytes, expireDuration)
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.cacheSizeWatcher(maxBytes)
}()
return &c
}
func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) {
t := time.NewTicker(expireDuration / 2)
for {
select {
case <-c.stopCh:
t.Stop()
return
case <-t.C:
}
c.mu.Lock()
if atomic.LoadUint64(&c.skipPrev) != 0 {
// Expire prev cache and create fresh curr cache.
// Do not reuse prev cache, since it can have too big capacity.
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
@ -85,8 +100,39 @@ func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time
curr = fastcache.New(maxBytes)
c.curr.Store(curr)
}
}()
return &c
c.mu.Unlock()
}
}
func (c *Cache) cacheSizeWatcher(maxBytes int) {
t := time.NewTicker(time.Minute)
for {
select {
case <-c.stopCh:
t.Stop()
return
case <-t.C:
}
var cs fastcache.Stats
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&cs)
if cs.BytesSize < uint64(maxBytes)/2 {
continue
}
// curr cache size exceeds 50% of its capacity. It is better
// to double the size of curr cache and stop using prev cache,
// since this will result in higher summary cache capacity.
c.mu.Lock()
curr.Reset()
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr = fastcache.New(maxBytes * 2)
c.curr.Store(curr)
atomic.StoreUint64(&c.skipPrev, 1)
c.mu.Unlock()
return
}
}
// Save safes the cache to filePath.
@ -121,11 +167,11 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
curr := c.curr.Load().(*fastcache.Cache)
fcsOrig := *fcs
curr.UpdateStats(fcs)
if *oldBehavior {
if atomic.LoadUint64(&c.skipPrev) != 0 {
return
}
fcs.Misses = fcsOrig.Misses + atomic.LoadUint64(&c.misses)
fcs.Misses = fcsOrig.Misses + atomic.LoadUint64(&c.misses)
fcsOrig.Reset()
prev := c.prev.Load().(*fastcache.Cache)
prev.UpdateStats(&fcsOrig)
@ -141,7 +187,7 @@ func (c *Cache) Get(dst, key []byte) []byte {
// Fast path - the entry is found in the current cache.
return result
}
if *oldBehavior {
if atomic.LoadUint64(&c.skipPrev) != 0 {
return result
}
@ -164,7 +210,7 @@ func (c *Cache) Has(key []byte) bool {
if curr.Has(key) {
return true
}
if *oldBehavior {
if atomic.LoadUint64(&c.skipPrev) != 0 {
return false
}
prev := c.prev.Load().(*fastcache.Cache)
@ -185,7 +231,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
// Fast path - the entry is found in the current cache.
return result
}
if *oldBehavior {
if atomic.LoadUint64(&c.skipPrev) != 0 {
return result
}