VictoriaMetrics/lib/workingsetcache/cache.go
Aliaksandr Valialkin 8f973e34fb lib/workingsetcache: properly switch to whole mode
Previously the switch from `split` to `whole` mode had been performed too early,
e.g. when the current cache size became bigger than 1/4 of the allowed cache size.

Now it is performed when the current cache size becomes bigger than 1/2 of the allowed cache size.

This change can reduce memory usage for data ingestion path when big number of active time series are ingested.
2021-07-05 15:16:11 +03:00

336 lines
8.5 KiB
Go

package workingsetcache
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/fastcache"
)
// Cache modes.
const (
split = 0
switching = 1
whole = 2
)
// Cache is a cache for working set entries.
//
// The cache evicts inactive entries after the given expireDuration.
// Recently accessed entries survive expireDuration.
//
// Comparing to fastcache, this cache minimizes the required RAM size
// to values smaller than maxBytes.
type Cache struct {
curr atomic.Value
prev atomic.Value
// mode indicates whether to use only curr and skip prev.
//
// This flag is set to switching if curr is filled for more than 50% space.
// In this case using prev would result in RAM waste,
// it is better to use only curr cache with doubled size.
// After the process of switching, this flag will be set to whole.
mode uint32
// The maximum cache size in bytes.
maxBytes int
// mu serializes access to curr, prev and mode
// in expirationWatcher and cacheSizeWatcher.
mu sync.Mutex
wg sync.WaitGroup
stopCh chan struct{}
// historicalStats keeps historical counters from fastcache.Stats
historicalStats fastcache.Stats
}
// Load loads the cache from filePath and limits its size to maxBytes
// and evicts inactive entires after expireDuration.
//
// Stop must be called on the returned cache when it is no longer needed.
func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
curr := fastcache.LoadFromFileOrNew(filePath, maxBytes)
var cs fastcache.Stats
curr.UpdateStats(&cs)
if cs.EntriesCount == 0 {
curr.Reset()
// The cache couldn't be loaded with maxBytes size.
// This may mean that the cache is split into curr and prev caches.
// Try loading it again with maxBytes / 2 size.
prev := fastcache.LoadFromFileOrNew(filePath, maxBytes/2)
c := newCacheInternal(fastcache.New(1024), prev, maxBytes, split)
c.runWatchers(expireDuration)
return c
}
// The cache has been successfully loaded in full.
// Set its' mode to `whole`.
// There is no need in runWatchers call.
return newCacheInternal(curr, fastcache.New(1024), maxBytes, whole)
}
// New creates new cache with the given maxBytes capcity and the given expireDuration
// for inactive entries.
//
// Stop must be called on the returned cache when it is no longer needed.
func New(maxBytes int, expireDuration time.Duration) *Cache {
// Split maxBytes between curr and prev caches.
maxBytes /= 2
curr := fastcache.New(maxBytes)
c := newCacheInternal(curr, fastcache.New(1024), maxBytes, split)
c.runWatchers(expireDuration)
return c
}
func newCacheInternal(curr, prev *fastcache.Cache, maxBytes, mode int) *Cache {
var c Cache
c.curr.Store(curr)
c.prev.Store(prev)
c.stopCh = make(chan struct{})
c.setMode(mode)
return &c
}
func (c *Cache) runWatchers(expireDuration time.Duration) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.expirationWatcher(expireDuration)
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.cacheSizeWatcher()
}()
}
func (c *Cache) expirationWatcher(expireDuration time.Duration) {
t := time.NewTicker(expireDuration / 2)
for {
select {
case <-c.stopCh:
t.Stop()
return
case <-t.C:
}
c.mu.Lock()
if atomic.LoadUint32(&c.mode) != split {
// Stop the expirationWatcher on non-split mode.
c.mu.Unlock()
return
}
// Expire prev cache and create fresh curr cache with c.maxBytes/2 capacity.
// Do not reuse prev cache, since it can have too big capacity.
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&c.historicalStats)
c.prev.Store(curr)
curr = fastcache.New(c.maxBytes / 2)
c.curr.Store(curr)
c.mu.Unlock()
}
}
func (c *Cache) cacheSizeWatcher() {
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-c.stopCh:
return
case <-t.C:
}
var cs fastcache.Stats
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&cs)
if cs.BytesSize >= uint64(c.maxBytes)/2 {
break
}
}
// curr cache size exceeds 50% of its capacity. It is better
// to double the size of curr cache and stop using prev cache,
// since this will result in higher summary cache capacity.
//
// Do this in the following steps:
// 1) switch to mode=switching
// 2) move curr cache to prev
// 3) create curr with the double size
// 4) wait until curr size exceeds c.maxBytes/2, i.e. it is populated with new data
// 5) switch to mode=whole
// 6) drop prev
c.mu.Lock()
c.setMode(switching)
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&c.historicalStats)
c.prev.Store(curr)
c.curr.Store(fastcache.New(c.maxBytes))
c.mu.Unlock()
for {
select {
case <-c.stopCh:
return
case <-t.C:
}
var cs fastcache.Stats
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&cs)
if cs.BytesSize >= uint64(c.maxBytes)/2 {
break
}
}
c.mu.Lock()
c.setMode(whole)
prev = c.prev.Load().(*fastcache.Cache)
prev.Reset()
c.prev.Store(fastcache.New(1024))
c.mu.Unlock()
}
// Save saves the cache to filePath.
func (c *Cache) Save(filePath string) error {
curr := c.curr.Load().(*fastcache.Cache)
concurrency := cgroup.AvailableCPUs()
return curr.SaveToFileConcurrent(filePath, concurrency)
}
// Stop stops the cache.
//
// The cache cannot be used after the Stop call.
func (c *Cache) Stop() {
close(c.stopCh)
c.wg.Wait()
c.Reset()
}
// Reset resets the cache.
func (c *Cache) Reset() {
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
curr.Reset()
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
c.setMode(split)
}
func (c *Cache) setMode(mode int) {
atomic.StoreUint32(&c.mode, uint32(mode))
}
func (c *Cache) loadMode() int {
return int(atomic.LoadUint32(&c.mode))
}
// UpdateStats updates fcs with cache stats.
func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(fcs)
// Add counters from historical stats
hs := &c.historicalStats
fcs.GetCalls += atomic.LoadUint64(&hs.GetCalls)
fcs.SetCalls += atomic.LoadUint64(&hs.SetCalls)
fcs.Misses += atomic.LoadUint64(&hs.Misses)
fcs.Collisions += atomic.LoadUint64(&hs.Collisions)
fcs.Corruptions += atomic.LoadUint64(&hs.Corruptions)
if c.loadMode() == whole {
return
}
// Add stats for entries from the previous cache
// Do not add counters from the previous cache, since they are already
// taken into account via c.historicalStats.
prev := c.prev.Load().(*fastcache.Cache)
var fcsTmp fastcache.Stats
prev.UpdateStats(&fcsTmp)
fcs.EntriesCount += fcsTmp.EntriesCount
fcs.BytesSize += fcsTmp.BytesSize
}
// Get appends the found value for the given key to dst and returns the result.
func (c *Cache) Get(dst, key []byte) []byte {
curr := c.curr.Load().(*fastcache.Cache)
result := curr.Get(dst, key)
if len(result) > len(dst) {
// Fast path - the entry is found in the current cache.
return result
}
if c.loadMode() == whole {
return result
}
// Search for the entry in the previous cache.
prev := c.prev.Load().(*fastcache.Cache)
result = prev.Get(dst, key)
if len(result) <= len(dst) {
// Nothing found.
return result
}
// Cache the found entry in the current cache.
curr.Set(key, result[len(dst):])
return result
}
// Has verifies whether the cahce contains the given key.
func (c *Cache) Has(key []byte) bool {
curr := c.curr.Load().(*fastcache.Cache)
if curr.Has(key) {
return true
}
if c.loadMode() == whole {
return false
}
prev := c.prev.Load().(*fastcache.Cache)
return prev.Has(key)
}
// Set sets the given value for the given key.
func (c *Cache) Set(key, value []byte) {
curr := c.curr.Load().(*fastcache.Cache)
curr.Set(key, value)
}
// GetBig appends the found value for the given key to dst and returns the result.
func (c *Cache) GetBig(dst, key []byte) []byte {
curr := c.curr.Load().(*fastcache.Cache)
result := curr.GetBig(dst, key)
if len(result) > len(dst) {
// Fast path - the entry is found in the current cache.
return result
}
if c.loadMode() == whole {
return result
}
// Search for the entry in the previous cache.
prev := c.prev.Load().(*fastcache.Cache)
result = prev.GetBig(dst, key)
if len(result) <= len(dst) {
// Nothing found.
return result
}
// Cache the found entry in the current cache.
curr.SetBig(key, result[len(dst):])
return result
}
// SetBig sets the given value for the given key.
func (c *Cache) SetBig(key, value []byte) {
curr := c.curr.Load().(*fastcache.Cache)
curr.SetBig(key, value)
}