2019-08-13 18:35:19 +00:00
package workingsetcache
import (
2022-11-16 10:37:55 +00:00
"flag"
2019-08-13 18:35:19 +00:00
"sync"
"sync/atomic"
"time"
2021-07-06 07:39:56 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2019-08-13 18:35:19 +00:00
"github.com/VictoriaMetrics/fastcache"
)
2022-11-16 10:37:55 +00:00
var prevCacheRemovalPercent = flag . Float64 ( "prevCacheRemovalPercent" , 0.2 , "The previous cache is removed when the percent of requests it serves becomes lower than this value. " +
"Higher values reduce average memory usage at the cost of higher CPU usage" )
2019-09-08 20:21:13 +00:00
// Cache modes.
const (
split = 0
switching = 1
whole = 2
)
2022-10-20 18:48:23 +00:00
const defaultExpireDuration = 20 * time . Minute
2022-02-23 11:39:11 +00:00
2019-08-13 18:35:19 +00:00
// Cache is a cache for working set entries.
//
// The cache evicts inactive entries after the given expireDuration.
// Recently accessed entries survive expireDuration.
type Cache struct {
curr atomic . Value
prev atomic . Value
2022-10-20 06:12:14 +00:00
// csHistory holds cache stats history
csHistory fastcache . Stats
2021-12-02 23:20:04 +00:00
2019-09-08 20:21:13 +00:00
// mode indicates whether to use only curr and skip prev.
2019-08-15 19:57:43 +00:00
//
2019-09-08 20:21:13 +00:00
// This flag is set to switching if curr is filled for more than 50% space.
2019-08-15 19:57:43 +00:00
// In this case using prev would result in RAM waste,
// it is better to use only curr cache with doubled size.
2019-09-08 20:21:13 +00:00
// After the process of switching, this flag will be set to whole.
2021-07-05 12:07:38 +00:00
mode uint32
2022-02-07 22:10:10 +00:00
// The maxBytes value passed to New() or to Load().
maxBytes int
2019-09-08 20:21:13 +00:00
// mu serializes access to curr, prev and mode
2022-10-20 18:44:53 +00:00
// in expirationWatcher, prevCacheWatcher and cacheSizeWatcher.
2019-08-15 19:57:43 +00:00
mu sync . Mutex
2019-08-13 18:35:19 +00:00
wg sync . WaitGroup
stopCh chan struct { }
}
// Load loads the cache from filePath and limits its size to maxBytes
2022-02-23 11:39:11 +00:00
// and evicts inactive entries in 20 minutes.
//
// Stop must be called on the returned cache when it is no longer needed.
func Load ( filePath string , maxBytes int ) * Cache {
return LoadWithExpire ( filePath , maxBytes , defaultExpireDuration )
}
// LoadWithExpire loads the cache from filePath and limits its size to maxBytes
2019-08-13 18:35:19 +00:00
// and evicts inactive entires after expireDuration.
//
// Stop must be called on the returned cache when it is no longer needed.
2022-02-23 11:39:11 +00:00
func LoadWithExpire ( filePath string , maxBytes int , expireDuration time . Duration ) * Cache {
2019-08-13 18:35:19 +00:00
curr := fastcache . LoadFromFileOrNew ( filePath , maxBytes )
2019-09-08 20:21:13 +00:00
var cs fastcache . Stats
curr . UpdateStats ( & cs )
if cs . EntriesCount == 0 {
curr . Reset ( )
// The cache couldn't be loaded with maxBytes size.
// This may mean that the cache is split into curr and prev caches.
// Try loading it again with maxBytes / 2 size.
2022-02-23 11:39:11 +00:00
// Put the loaded cache into `prev` instead of `curr`
// in order to limit the growth of the cache for the current period of time.
prev := fastcache . LoadFromFileOrNew ( filePath , maxBytes / 2 )
curr := fastcache . New ( maxBytes / 2 )
2022-02-07 22:10:10 +00:00
c := newCacheInternal ( curr , prev , split , maxBytes )
2021-07-05 12:07:38 +00:00
c . runWatchers ( expireDuration )
return c
2019-09-08 20:21:13 +00:00
}
// The cache has been successfully loaded in full.
// Set its' mode to `whole`.
2021-07-05 12:07:38 +00:00
// There is no need in runWatchers call.
2021-07-05 14:11:57 +00:00
prev := fastcache . New ( 1024 )
2022-02-07 22:10:10 +00:00
return newCacheInternal ( curr , prev , whole , maxBytes )
2019-08-13 18:35:19 +00:00
}
2022-02-23 11:39:11 +00:00
// New creates new cache with the given maxBytes capacity.
//
// Stop must be called on the returned cache when it is no longer needed.
func New ( maxBytes int ) * Cache {
return NewWithExpire ( maxBytes , defaultExpireDuration )
}
// NewWithExpire creates new cache with the given maxBytes capacity and the given expireDuration
2019-08-13 18:35:19 +00:00
// for inactive entries.
//
// Stop must be called on the returned cache when it is no longer needed.
2022-02-23 11:39:11 +00:00
func NewWithExpire ( maxBytes int , expireDuration time . Duration ) * Cache {
2021-07-05 14:11:57 +00:00
curr := fastcache . New ( maxBytes / 2 )
prev := fastcache . New ( 1024 )
2022-02-07 22:10:10 +00:00
c := newCacheInternal ( curr , prev , split , maxBytes )
2021-07-05 12:07:38 +00:00
c . runWatchers ( expireDuration )
return c
2019-08-13 18:35:19 +00:00
}
2022-02-07 22:10:10 +00:00
func newCacheInternal ( curr , prev * fastcache . Cache , mode , maxBytes int ) * Cache {
2019-08-13 18:35:19 +00:00
var c Cache
2022-02-07 22:10:10 +00:00
c . maxBytes = maxBytes
2019-08-13 18:35:19 +00:00
c . curr . Store ( curr )
c . prev . Store ( prev )
c . stopCh = make ( chan struct { } )
2021-07-05 12:07:38 +00:00
c . setMode ( mode )
return & c
}
2019-08-15 19:57:43 +00:00
2021-07-05 12:07:38 +00:00
func ( c * Cache ) runWatchers ( expireDuration time . Duration ) {
2019-08-15 19:57:43 +00:00
c . wg . Add ( 1 )
go func ( ) {
defer c . wg . Done ( )
2021-07-05 12:07:38 +00:00
c . expirationWatcher ( expireDuration )
2019-08-15 19:57:43 +00:00
} ( )
2019-08-13 18:35:19 +00:00
c . wg . Add ( 1 )
2022-10-20 18:44:53 +00:00
go func ( ) {
defer c . wg . Done ( )
c . prevCacheWatcher ( )
} ( )
c . wg . Add ( 1 )
2019-08-13 18:35:19 +00:00
go func ( ) {
defer c . wg . Done ( )
2021-07-05 12:07:38 +00:00
c . cacheSizeWatcher ( )
2019-08-15 19:57:43 +00:00
} ( )
}
2019-08-13 18:35:19 +00:00
2021-07-05 12:07:38 +00:00
func ( c * Cache ) expirationWatcher ( expireDuration time . Duration ) {
2022-10-20 05:42:41 +00:00
expireDuration += timeJitter ( expireDuration / 10 )
t := time . NewTicker ( expireDuration )
2022-10-20 07:33:56 +00:00
defer t . Stop ( )
2019-08-15 19:57:43 +00:00
for {
select {
case <- c . stopCh :
return
case <- t . C :
}
c . mu . Lock ( )
2021-07-05 12:07:38 +00:00
if atomic . LoadUint32 ( & c . mode ) != split {
// Stop the expirationWatcher on non-split mode.
c . mu . Unlock ( )
return
2019-08-13 18:35:19 +00:00
}
2022-04-05 17:37:40 +00:00
// Reset prev cache and swap it with the curr cache.
2021-07-05 12:07:38 +00:00
prev := c . prev . Load ( ) . ( * fastcache . Cache )
curr := c . curr . Load ( ) . ( * fastcache . Cache )
c . prev . Store ( curr )
2022-10-20 18:44:53 +00:00
var cs fastcache . Stats
prev . UpdateStats ( & cs )
updateCacheStatsHistory ( & c . csHistory , & cs )
2022-04-05 17:37:40 +00:00
prev . Reset ( )
c . curr . Store ( prev )
2019-08-15 19:57:43 +00:00
c . mu . Unlock ( )
2022-10-20 18:44:53 +00:00
}
}
2022-10-20 07:33:56 +00:00
2022-10-20 18:44:53 +00:00
func ( c * Cache ) prevCacheWatcher ( ) {
2022-11-16 10:37:55 +00:00
p := * prevCacheRemovalPercent / 100
if p <= 0 {
// There is no need in removing the previous cache.
return
}
minCurrRequests := uint64 ( 1 / p )
2022-10-20 18:44:53 +00:00
// Watch for the usage of the prev cache and drop it whenever it receives
2022-11-16 10:37:55 +00:00
// less than prevCacheRemovalPercent requests comparing to the curr cache during the last 30 seconds.
checkInterval := 30 * time . Second
2022-10-20 18:44:53 +00:00
checkInterval += timeJitter ( checkInterval / 10 )
t := time . NewTicker ( checkInterval )
defer t . Stop ( )
prevGetCalls := uint64 ( 0 )
currGetCalls := uint64 ( 0 )
for {
select {
case <- c . stopCh :
return
case <- t . C :
}
c . mu . Lock ( )
if atomic . LoadUint32 ( & c . mode ) != split {
// Do nothing in non-split mode.
c . mu . Unlock ( )
return
}
prev := c . prev . Load ( ) . ( * fastcache . Cache )
curr := c . curr . Load ( ) . ( * fastcache . Cache )
var csCurr , csPrev fastcache . Stats
curr . UpdateStats ( & csCurr )
prev . UpdateStats ( & csPrev )
currRequests := csCurr . GetCalls
if currRequests >= currGetCalls {
currRequests -= currGetCalls
}
prevRequests := csPrev . GetCalls
if prevRequests >= prevGetCalls {
prevRequests -= prevGetCalls
}
currGetCalls = csCurr . GetCalls
prevGetCalls = csPrev . GetCalls
2022-11-16 10:37:55 +00:00
if currRequests >= minCurrRequests && float64 ( prevRequests ) / float64 ( currRequests ) < p {
2022-10-20 18:44:53 +00:00
// The majority of requests are served from the curr cache,
// so the prev cache can be deleted in order to free up memory.
if csPrev . EntriesCount > 0 {
updateCacheStatsHistory ( & c . csHistory , & csPrev )
2022-10-20 07:33:56 +00:00
prev . Reset ( )
}
}
2022-10-20 18:44:53 +00:00
c . mu . Unlock ( )
2019-08-15 19:57:43 +00:00
}
}
2021-07-05 12:07:38 +00:00
func ( c * Cache ) cacheSizeWatcher ( ) {
2022-10-20 05:42:41 +00:00
checkInterval := 1500 * time . Millisecond
checkInterval += timeJitter ( checkInterval / 10 )
t := time . NewTicker ( checkInterval )
2019-09-08 20:21:13 +00:00
defer t . Stop ( )
2021-12-02 08:28:45 +00:00
var maxBytesSize uint64
2019-08-15 19:57:43 +00:00
for {
select {
case <- c . stopCh :
return
case <- t . C :
}
2022-02-23 13:59:21 +00:00
if c . loadMode ( ) != split {
continue
}
2019-08-15 19:57:43 +00:00
var cs fastcache . Stats
curr := c . curr . Load ( ) . ( * fastcache . Cache )
curr . UpdateStats ( & cs )
2021-12-02 08:28:45 +00:00
if cs . BytesSize >= uint64 ( 0.9 * float64 ( cs . MaxBytesSize ) ) {
maxBytesSize = cs . MaxBytesSize
2019-09-08 20:21:13 +00:00
break
2019-08-15 19:57:43 +00:00
}
2019-09-08 20:21:13 +00:00
}
2019-08-15 19:57:43 +00:00
2021-12-02 08:28:45 +00:00
// curr cache size exceeds 90% of its capacity. It is better
2019-09-08 20:21:13 +00:00
// to double the size of curr cache and stop using prev cache,
// since this will result in higher summary cache capacity.
//
// Do this in the following steps:
// 1) switch to mode=switching
// 2) move curr cache to prev
2022-02-23 13:59:21 +00:00
// 3) create curr cache with doubled size
// 4) wait until curr cache size exceeds maxBytesSize, i.e. it is populated with new data
2019-09-08 20:21:13 +00:00
// 5) switch to mode=whole
2022-02-23 13:59:21 +00:00
// 6) drop prev cache
2019-09-08 20:21:13 +00:00
c . mu . Lock ( )
2021-07-05 12:07:38 +00:00
c . setMode ( switching )
2019-09-08 20:21:13 +00:00
prev := c . prev . Load ( ) . ( * fastcache . Cache )
curr := c . curr . Load ( ) . ( * fastcache . Cache )
c . prev . Store ( curr )
2022-10-20 06:12:14 +00:00
var cs fastcache . Stats
prev . UpdateStats ( & cs )
updateCacheStatsHistory ( & c . csHistory , & cs )
2022-04-05 17:37:40 +00:00
prev . Reset ( )
2022-02-07 22:10:10 +00:00
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
c . curr . Store ( fastcache . New ( c . maxBytes ) )
2019-09-08 20:21:13 +00:00
c . mu . Unlock ( )
for {
select {
case <- c . stopCh :
return
case <- t . C :
}
var cs fastcache . Stats
curr := c . curr . Load ( ) . ( * fastcache . Cache )
curr . UpdateStats ( & cs )
2021-12-02 08:28:45 +00:00
if cs . BytesSize >= maxBytesSize {
2019-09-08 20:21:13 +00:00
break
}
2019-08-15 19:57:43 +00:00
}
2019-09-08 20:21:13 +00:00
c . mu . Lock ( )
2021-07-05 12:07:38 +00:00
c . setMode ( whole )
2019-09-08 20:21:13 +00:00
prev = c . prev . Load ( ) . ( * fastcache . Cache )
c . prev . Store ( fastcache . New ( 1024 ) )
2022-10-20 06:12:14 +00:00
cs . Reset ( )
prev . UpdateStats ( & cs )
updateCacheStatsHistory ( & c . csHistory , & cs )
2022-04-05 17:37:40 +00:00
prev . Reset ( )
2019-09-08 20:21:13 +00:00
c . mu . Unlock ( )
2019-08-13 18:35:19 +00:00
}
2021-07-05 12:07:38 +00:00
// Save saves the cache to filePath.
2019-08-13 18:35:19 +00:00
func ( c * Cache ) Save ( filePath string ) error {
curr := c . curr . Load ( ) . ( * fastcache . Cache )
2020-12-08 18:49:32 +00:00
concurrency := cgroup . AvailableCPUs ( )
2019-08-13 18:35:19 +00:00
return curr . SaveToFileConcurrent ( filePath , concurrency )
}
// Stop stops the cache.
//
// The cache cannot be used after the Stop call.
func ( c * Cache ) Stop ( ) {
close ( c . stopCh )
c . wg . Wait ( )
c . Reset ( )
}
// Reset resets the cache.
func ( c * Cache ) Reset ( ) {
2022-10-20 18:44:53 +00:00
var cs fastcache . Stats
2019-08-13 18:35:19 +00:00
prev := c . prev . Load ( ) . ( * fastcache . Cache )
2022-10-20 18:44:53 +00:00
prev . UpdateStats ( & cs )
2019-08-13 18:35:19 +00:00
prev . Reset ( )
curr := c . curr . Load ( ) . ( * fastcache . Cache )
2022-10-20 18:44:53 +00:00
curr . UpdateStats ( & cs )
updateCacheStatsHistory ( & c . csHistory , & cs )
2019-08-13 18:35:19 +00:00
curr . Reset ( )
2021-07-01 08:49:16 +00:00
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
2021-07-05 12:07:38 +00:00
c . setMode ( split )
}
func ( c * Cache ) setMode ( mode int ) {
atomic . StoreUint32 ( & c . mode , uint32 ( mode ) )
}
func ( c * Cache ) loadMode ( ) int {
return int ( atomic . LoadUint32 ( & c . mode ) )
2019-08-13 18:35:19 +00:00
}
// UpdateStats updates fcs with cache stats.
func ( c * Cache ) UpdateStats ( fcs * fastcache . Stats ) {
2022-10-20 06:12:14 +00:00
updateCacheStatsHistory ( fcs , & c . csHistory )
2021-07-06 07:39:56 +00:00
var cs fastcache . Stats
2019-08-13 18:35:19 +00:00
curr := c . curr . Load ( ) . ( * fastcache . Cache )
2021-07-06 07:39:56 +00:00
curr . UpdateStats ( & cs )
2022-10-20 06:12:14 +00:00
updateCacheStats ( fcs , & cs )
2019-08-13 18:35:19 +00:00
prev := c . prev . Load ( ) . ( * fastcache . Cache )
2021-07-06 07:39:56 +00:00
cs . Reset ( )
prev . UpdateStats ( & cs )
2022-10-20 06:12:14 +00:00
updateCacheStats ( fcs , & cs )
}
func updateCacheStats ( dst , src * fastcache . Stats ) {
dst . GetCalls += src . GetCalls
dst . SetCalls += src . SetCalls
dst . Misses += src . Misses
dst . Collisions += src . Collisions
dst . Corruptions += src . Corruptions
dst . EntriesCount += src . EntriesCount
dst . BytesSize += src . BytesSize
dst . MaxBytesSize += src . MaxBytesSize
}
func updateCacheStatsHistory ( dst , src * fastcache . Stats ) {
atomic . AddUint64 ( & dst . GetCalls , atomic . LoadUint64 ( & src . GetCalls ) )
atomic . AddUint64 ( & dst . SetCalls , atomic . LoadUint64 ( & src . SetCalls ) )
atomic . AddUint64 ( & dst . Misses , atomic . LoadUint64 ( & src . Misses ) )
atomic . AddUint64 ( & dst . Collisions , atomic . LoadUint64 ( & src . Collisions ) )
atomic . AddUint64 ( & dst . Corruptions , atomic . LoadUint64 ( & src . Corruptions ) )
// Do not add EntriesCount, BytesSize and MaxBytesSize, since these metrics
// are calculated from c.curr and c.prev caches.
2019-08-13 18:35:19 +00:00
}
// Get appends the found value for the given key to dst and returns the result.
func ( c * Cache ) Get ( dst , key [ ] byte ) [ ] byte {
curr := c . curr . Load ( ) . ( * fastcache . Cache )
result := curr . Get ( dst , key )
if len ( result ) > len ( dst ) {
// Fast path - the entry is found in the current cache.
return result
}
2021-07-05 12:07:38 +00:00
if c . loadMode ( ) == whole {
2021-07-06 07:39:56 +00:00
// Nothing found.
2019-08-13 18:35:19 +00:00
return result
}
// Search for the entry in the previous cache.
prev := c . prev . Load ( ) . ( * fastcache . Cache )
result = prev . Get ( dst , key )
if len ( result ) <= len ( dst ) {
// Nothing found.
return result
}
// Cache the found entry in the current cache.
curr . Set ( key , result [ len ( dst ) : ] )
return result
}
2021-07-06 07:39:56 +00:00
// Has verifies whether the cache contains the given key.
2019-08-13 18:35:19 +00:00
func ( c * Cache ) Has ( key [ ] byte ) bool {
curr := c . curr . Load ( ) . ( * fastcache . Cache )
if curr . Has ( key ) {
return true
}
2021-07-05 12:07:38 +00:00
if c . loadMode ( ) == whole {
2019-08-13 18:35:19 +00:00
return false
}
prev := c . prev . Load ( ) . ( * fastcache . Cache )
2021-07-06 07:39:56 +00:00
if ! prev . Has ( key ) {
return false
}
// Cache the found entry in the current cache.
tmpBuf := tmpBufPool . Get ( )
tmpBuf . B = prev . Get ( tmpBuf . B , key )
curr . Set ( key , tmpBuf . B )
tmpBufPool . Put ( tmpBuf )
return true
2019-08-13 18:35:19 +00:00
}
2021-07-06 07:39:56 +00:00
var tmpBufPool bytesutil . ByteBufferPool
2019-08-13 18:35:19 +00:00
// Set sets the given value for the given key.
func ( c * Cache ) Set ( key , value [ ] byte ) {
curr := c . curr . Load ( ) . ( * fastcache . Cache )
curr . Set ( key , value )
}
// GetBig appends the found value for the given key to dst and returns the result.
func ( c * Cache ) GetBig ( dst , key [ ] byte ) [ ] byte {
curr := c . curr . Load ( ) . ( * fastcache . Cache )
result := curr . GetBig ( dst , key )
if len ( result ) > len ( dst ) {
// Fast path - the entry is found in the current cache.
return result
}
2021-07-05 12:07:38 +00:00
if c . loadMode ( ) == whole {
2021-07-06 07:39:56 +00:00
// Nothing found.
2019-08-13 18:35:19 +00:00
return result
}
// Search for the entry in the previous cache.
prev := c . prev . Load ( ) . ( * fastcache . Cache )
result = prev . GetBig ( dst , key )
if len ( result ) <= len ( dst ) {
// Nothing found.
return result
}
// Cache the found entry in the current cache.
curr . SetBig ( key , result [ len ( dst ) : ] )
return result
}
// SetBig sets the given value for the given key.
func ( c * Cache ) SetBig ( key , value [ ] byte ) {
curr := c . curr . Load ( ) . ( * fastcache . Cache )
curr . SetBig ( key , value )
}
2022-10-20 05:42:41 +00:00
func timeJitter ( d time . Duration ) time . Duration {
n := float64 ( time . Now ( ) . UnixNano ( ) % 1e9 ) / 1e9
return time . Duration ( float64 ( d ) * n )
}