mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/workingsetcache: adjust switching from mode=split
to mode=whole
smoothly and load cachefile successfully
This commit is contained in:
parent
4d636c244d
commit
e2274714b1
1 changed files with 85 additions and 28 deletions
|
@ -9,6 +9,13 @@ import (
|
|||
"github.com/VictoriaMetrics/fastcache"
|
||||
)
|
||||
|
||||
// Cache modes.
|
||||
const (
|
||||
split = 0
|
||||
switching = 1
|
||||
whole = 2
|
||||
)
|
||||
|
||||
// Cache is a cache for working set entries.
|
||||
//
|
||||
// The cache evicts inactive entries after the given expireDuration.
|
||||
|
@ -20,14 +27,15 @@ type Cache struct {
|
|||
curr atomic.Value
|
||||
prev atomic.Value
|
||||
|
||||
// skipPrev indicates whether to use only curr and skip prev.
|
||||
// mode indicates whether to use only curr and skip prev.
|
||||
//
|
||||
// This flag is set if curr is filled for more than 50% space.
|
||||
// This flag is set to switching if curr is filled for more than 50% space.
|
||||
// In this case using prev would result in RAM waste,
|
||||
// it is better to use only curr cache with doubled size.
|
||||
skipPrev uint64
|
||||
// After the process of switching, this flag will be set to whole.
|
||||
mode uint64
|
||||
|
||||
// mu serializes access to curr, prev and skipPrev
|
||||
// mu serializes access to curr, prev and mode
|
||||
// in expirationWorker and cacheSizeWatcher.
|
||||
mu sync.Mutex
|
||||
|
||||
|
@ -42,10 +50,28 @@ type Cache struct {
|
|||
//
|
||||
// Stop must be called on the returned cache when it is no longer needed.
|
||||
func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
|
||||
// Split maxBytes between curr and prev caches.
|
||||
maxBytes /= 2
|
||||
curr := fastcache.LoadFromFileOrNew(filePath, maxBytes)
|
||||
return newWorkingSetCache(curr, maxBytes, expireDuration)
|
||||
var cs fastcache.Stats
|
||||
curr.UpdateStats(&cs)
|
||||
if cs.EntriesCount == 0 {
|
||||
curr.Reset()
|
||||
// The cache couldn't be loaded with maxBytes size.
|
||||
// This may mean that the cache is split into curr and prev caches.
|
||||
// Try loading it again with maxBytes / 2 size.
|
||||
maxBytes /= 2
|
||||
curr = fastcache.LoadFromFileOrNew(filePath, maxBytes)
|
||||
return newWorkingSetCache(curr, maxBytes, expireDuration)
|
||||
}
|
||||
|
||||
// The cache has been successfully loaded in full.
|
||||
// Set its' mode to `whole`.
|
||||
// There is no need in starting expirationWorker and cacheSizeWatcher.
|
||||
var c Cache
|
||||
c.curr.Store(curr)
|
||||
c.prev.Store(fastcache.New(1024))
|
||||
c.stopCh = make(chan struct{})
|
||||
atomic.StoreUint64(&c.mode, whole)
|
||||
return &c
|
||||
}
|
||||
|
||||
// New creates new cache with the given maxBytes size and the given expireDuration
|
||||
|
@ -65,6 +91,7 @@ func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time
|
|||
c.curr.Store(curr)
|
||||
c.prev.Store(prev)
|
||||
c.stopCh = make(chan struct{})
|
||||
atomic.StoreUint64(&c.mode, split)
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -90,7 +117,7 @@ func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) {
|
|||
}
|
||||
|
||||
c.mu.Lock()
|
||||
if atomic.LoadUint64(&c.skipPrev) != 0 {
|
||||
if atomic.LoadUint64(&c.mode) == split {
|
||||
// Expire prev cache and create fresh curr cache.
|
||||
// Do not reuse prev cache, since it can have too big capacity.
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
|
@ -106,33 +133,63 @@ func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) {
|
|||
|
||||
func (c *Cache) cacheSizeWatcher(maxBytes int) {
|
||||
t := time.NewTicker(time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
var cs fastcache.Stats
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.UpdateStats(&cs)
|
||||
if cs.BytesSize < uint64(maxBytes)/2 {
|
||||
continue
|
||||
if cs.BytesSize >= uint64(maxBytes)/2 {
|
||||
break
|
||||
}
|
||||
|
||||
// curr cache size exceeds 50% of its capacity. It is better
|
||||
// to double the size of curr cache and stop using prev cache,
|
||||
// since this will result in higher summary cache capacity.
|
||||
c.mu.Lock()
|
||||
curr.Reset()
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev.Reset()
|
||||
curr = fastcache.New(maxBytes * 2)
|
||||
c.curr.Store(curr)
|
||||
atomic.StoreUint64(&c.skipPrev, 1)
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// curr cache size exceeds 50% of its capacity. It is better
|
||||
// to double the size of curr cache and stop using prev cache,
|
||||
// since this will result in higher summary cache capacity.
|
||||
//
|
||||
// Do this in the following steps:
|
||||
// 1) switch to mode=switching
|
||||
// 2) move curr cache to prev
|
||||
// 3) create curr with the double size
|
||||
// 4) wait until curr size exceeds maxBytes/2, i.e. it is populated with new data
|
||||
// 5) switch to mode=whole
|
||||
// 6) drop prev
|
||||
|
||||
c.mu.Lock()
|
||||
atomic.StoreUint64(&c.mode, switching)
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev.Reset()
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
c.prev.Store(curr)
|
||||
c.curr.Store(fastcache.New(maxBytes * 2))
|
||||
c.mu.Unlock()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
var cs fastcache.Stats
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.UpdateStats(&cs)
|
||||
if cs.BytesSize >= uint64(maxBytes)/2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
atomic.StoreUint64(&c.mode, whole)
|
||||
prev = c.prev.Load().(*fastcache.Cache)
|
||||
prev.Reset()
|
||||
c.prev.Store(fastcache.New(1024))
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Save safes the cache to filePath.
|
||||
|
@ -167,7 +224,7 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
|
|||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
fcsOrig := *fcs
|
||||
curr.UpdateStats(fcs)
|
||||
if atomic.LoadUint64(&c.skipPrev) != 0 {
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -187,7 +244,7 @@ func (c *Cache) Get(dst, key []byte) []byte {
|
|||
// Fast path - the entry is found in the current cache.
|
||||
return result
|
||||
}
|
||||
if atomic.LoadUint64(&c.skipPrev) != 0 {
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
return result
|
||||
}
|
||||
|
||||
|
@ -210,7 +267,7 @@ func (c *Cache) Has(key []byte) bool {
|
|||
if curr.Has(key) {
|
||||
return true
|
||||
}
|
||||
if atomic.LoadUint64(&c.skipPrev) != 0 {
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
return false
|
||||
}
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
|
@ -231,7 +288,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
|
|||
// Fast path - the entry is found in the current cache.
|
||||
return result
|
||||
}
|
||||
if atomic.LoadUint64(&c.skipPrev) != 0 {
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue