mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/workingsetcache: properly switch to whole
mode
Previously the switch from `split` to `whole` mode had been performed too early, e.g. when the current cache size became bigger than 1/4 of the allowed cache size. Now it is performed when the current cache size becomes bigger than 1/2 of the allowed cache size. This change can reduce memory usage for data ingestion path when big number of active time series are ingested.
This commit is contained in:
parent
43103be011
commit
8f973e34fb
2 changed files with 66 additions and 53 deletions
|
@ -7,7 +7,7 @@ sort: 15
|
|||
## tip
|
||||
|
||||
* FEATURE: vmagent: dynamically reload client TLS certificates from disk on every [mTLS connection](https://developers.cloudflare.com/cloudflare-one/identity/devices/mutual-tls-authentication). This should allow using `vmagent` with [Istio service mesh](https://istio.io/latest/about/service-mesh/). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420).
|
||||
* FEATURE: reduce memory usage when performing heavy queries over high number of time series.
|
||||
* FEATURE: reduce memory usage when performing heavy queries over big number of time series.
|
||||
|
||||
* BUGFIX: vmagent: remove `{ %space %}` typo in `/targets` output. The typo has been introduced in v1.62.0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1408).
|
||||
* BUGFIX: vmagent: fix CSS styles on `/targets` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1422).
|
||||
|
|
|
@ -33,10 +33,13 @@ type Cache struct {
|
|||
// In this case using prev would result in RAM waste,
|
||||
// it is better to use only curr cache with doubled size.
|
||||
// After the process of switching, this flag will be set to whole.
|
||||
mode uint64
|
||||
mode uint32
|
||||
|
||||
// The maximum cache size in bytes.
|
||||
maxBytes int
|
||||
|
||||
// mu serializes access to curr, prev and mode
|
||||
// in expirationWorker and cacheSizeWatcher.
|
||||
// in expirationWatcher and cacheSizeWatcher.
|
||||
mu sync.Mutex
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
@ -59,23 +62,19 @@ func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache {
|
|||
// The cache couldn't be loaded with maxBytes size.
|
||||
// This may mean that the cache is split into curr and prev caches.
|
||||
// Try loading it again with maxBytes / 2 size.
|
||||
maxBytes /= 2
|
||||
curr = fastcache.LoadFromFileOrNew(filePath, maxBytes)
|
||||
return newWorkingSetCache(curr, maxBytes, expireDuration)
|
||||
prev := fastcache.LoadFromFileOrNew(filePath, maxBytes/2)
|
||||
c := newCacheInternal(fastcache.New(1024), prev, maxBytes, split)
|
||||
c.runWatchers(expireDuration)
|
||||
return c
|
||||
}
|
||||
|
||||
// The cache has been successfully loaded in full.
|
||||
// Set its' mode to `whole`.
|
||||
// There is no need in starting expirationWorker and cacheSizeWatcher.
|
||||
var c Cache
|
||||
c.curr.Store(curr)
|
||||
c.prev.Store(fastcache.New(1024))
|
||||
c.stopCh = make(chan struct{})
|
||||
atomic.StoreUint64(&c.mode, whole)
|
||||
return &c
|
||||
// There is no need in runWatchers call.
|
||||
return newCacheInternal(curr, fastcache.New(1024), maxBytes, whole)
|
||||
}
|
||||
|
||||
// New creates new cache with the given maxBytes size and the given expireDuration
|
||||
// New creates new cache with the given maxBytes capcity and the given expireDuration
|
||||
// for inactive entries.
|
||||
//
|
||||
// Stop must be called on the returned cache when it is no longer needed.
|
||||
|
@ -83,31 +82,34 @@ func New(maxBytes int, expireDuration time.Duration) *Cache {
|
|||
// Split maxBytes between curr and prev caches.
|
||||
maxBytes /= 2
|
||||
curr := fastcache.New(maxBytes)
|
||||
return newWorkingSetCache(curr, maxBytes, expireDuration)
|
||||
c := newCacheInternal(curr, fastcache.New(1024), maxBytes, split)
|
||||
c.runWatchers(expireDuration)
|
||||
return c
|
||||
}
|
||||
|
||||
func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time.Duration) *Cache {
|
||||
prev := fastcache.New(1024)
|
||||
func newCacheInternal(curr, prev *fastcache.Cache, maxBytes, mode int) *Cache {
|
||||
var c Cache
|
||||
c.curr.Store(curr)
|
||||
c.prev.Store(prev)
|
||||
c.stopCh = make(chan struct{})
|
||||
atomic.StoreUint64(&c.mode, split)
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.expirationWorker(maxBytes, expireDuration)
|
||||
}()
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.cacheSizeWatcher(maxBytes)
|
||||
}()
|
||||
c.setMode(mode)
|
||||
return &c
|
||||
}
|
||||
|
||||
func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) {
|
||||
func (c *Cache) runWatchers(expireDuration time.Duration) {
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.expirationWatcher(expireDuration)
|
||||
}()
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.cacheSizeWatcher()
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
||||
t := time.NewTicker(expireDuration / 2)
|
||||
for {
|
||||
select {
|
||||
|
@ -118,22 +120,25 @@ func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) {
|
|||
}
|
||||
|
||||
c.mu.Lock()
|
||||
if atomic.LoadUint64(&c.mode) == split {
|
||||
// Expire prev cache and create fresh curr cache.
|
||||
// Do not reuse prev cache, since it can have too big capacity.
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev.Reset()
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.UpdateStats(&c.historicalStats)
|
||||
c.prev.Store(curr)
|
||||
curr = fastcache.New(maxBytes)
|
||||
c.curr.Store(curr)
|
||||
if atomic.LoadUint32(&c.mode) != split {
|
||||
// Stop the expirationWatcher on non-split mode.
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Expire prev cache and create fresh curr cache with c.maxBytes/2 capacity.
|
||||
// Do not reuse prev cache, since it can have too big capacity.
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev.Reset()
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.UpdateStats(&c.historicalStats)
|
||||
c.prev.Store(curr)
|
||||
curr = fastcache.New(c.maxBytes / 2)
|
||||
c.curr.Store(curr)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) cacheSizeWatcher(maxBytes int) {
|
||||
func (c *Cache) cacheSizeWatcher() {
|
||||
t := time.NewTicker(time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
|
@ -146,7 +151,7 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) {
|
|||
var cs fastcache.Stats
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.UpdateStats(&cs)
|
||||
if cs.BytesSize >= uint64(maxBytes)/2 {
|
||||
if cs.BytesSize >= uint64(c.maxBytes)/2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -159,18 +164,18 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) {
|
|||
// 1) switch to mode=switching
|
||||
// 2) move curr cache to prev
|
||||
// 3) create curr with the double size
|
||||
// 4) wait until curr size exceeds maxBytes/2, i.e. it is populated with new data
|
||||
// 4) wait until curr size exceeds c.maxBytes/2, i.e. it is populated with new data
|
||||
// 5) switch to mode=whole
|
||||
// 6) drop prev
|
||||
|
||||
c.mu.Lock()
|
||||
atomic.StoreUint64(&c.mode, switching)
|
||||
c.setMode(switching)
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev.Reset()
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.UpdateStats(&c.historicalStats)
|
||||
c.prev.Store(curr)
|
||||
c.curr.Store(fastcache.New(maxBytes * 2))
|
||||
c.curr.Store(fastcache.New(c.maxBytes))
|
||||
c.mu.Unlock()
|
||||
|
||||
for {
|
||||
|
@ -182,20 +187,20 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) {
|
|||
var cs fastcache.Stats
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.UpdateStats(&cs)
|
||||
if cs.BytesSize >= uint64(maxBytes)/2 {
|
||||
if cs.BytesSize >= uint64(c.maxBytes)/2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
atomic.StoreUint64(&c.mode, whole)
|
||||
c.setMode(whole)
|
||||
prev = c.prev.Load().(*fastcache.Cache)
|
||||
prev.Reset()
|
||||
c.prev.Store(fastcache.New(1024))
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Save safes the cache to filePath.
|
||||
// Save saves the cache to filePath.
|
||||
func (c *Cache) Save(filePath string) error {
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
concurrency := cgroup.AvailableCPUs()
|
||||
|
@ -219,7 +224,15 @@ func (c *Cache) Reset() {
|
|||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr.Reset()
|
||||
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
|
||||
atomic.StoreUint64(&c.mode, split)
|
||||
c.setMode(split)
|
||||
}
|
||||
|
||||
func (c *Cache) setMode(mode int) {
|
||||
atomic.StoreUint32(&c.mode, uint32(mode))
|
||||
}
|
||||
|
||||
func (c *Cache) loadMode() int {
|
||||
return int(atomic.LoadUint32(&c.mode))
|
||||
}
|
||||
|
||||
// UpdateStats updates fcs with cache stats.
|
||||
|
@ -235,7 +248,7 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
|
|||
fcs.Collisions += atomic.LoadUint64(&hs.Collisions)
|
||||
fcs.Corruptions += atomic.LoadUint64(&hs.Corruptions)
|
||||
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
if c.loadMode() == whole {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -257,7 +270,7 @@ func (c *Cache) Get(dst, key []byte) []byte {
|
|||
// Fast path - the entry is found in the current cache.
|
||||
return result
|
||||
}
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
if c.loadMode() == whole {
|
||||
return result
|
||||
}
|
||||
|
||||
|
@ -279,7 +292,7 @@ func (c *Cache) Has(key []byte) bool {
|
|||
if curr.Has(key) {
|
||||
return true
|
||||
}
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
if c.loadMode() == whole {
|
||||
return false
|
||||
}
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
|
@ -300,7 +313,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
|
|||
// Fast path - the entry is found in the current cache.
|
||||
return result
|
||||
}
|
||||
if atomic.LoadUint64(&c.mode) == whole {
|
||||
if c.loadMode() == whole {
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue