diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0ab11e46b..8b10b12ec 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -7,7 +7,7 @@ sort: 15 ## tip * FEATURE: vmagent: dynamically reload client TLS certificates from disk on every [mTLS connection](https://developers.cloudflare.com/cloudflare-one/identity/devices/mutual-tls-authentication). This should allow using `vmagent` with [Istio service mesh](https://istio.io/latest/about/service-mesh/). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1420). -* FEATURE: reduce memory usage when performing heavy queries over high number of time series. +* FEATURE: reduce memory usage when performing heavy queries over big number of time series. * BUGFIX: vmagent: remove `{ %space %}` typo in `/targets` output. The typo has been introduced in v1.62.0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1408). * BUGFIX: vmagent: fix CSS styles on `/targets` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1422). diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 03d1ce1d8..f73f500de 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -33,10 +33,13 @@ type Cache struct { // In this case using prev would result in RAM waste, // it is better to use only curr cache with doubled size. // After the process of switching, this flag will be set to whole. - mode uint64 + mode uint32 + + // The maximum cache size in bytes. + maxBytes int // mu serializes access to curr, prev and mode - // in expirationWorker and cacheSizeWatcher. + // in expirationWatcher and cacheSizeWatcher. mu sync.Mutex wg sync.WaitGroup @@ -59,23 +62,19 @@ func Load(filePath string, maxBytes int, expireDuration time.Duration) *Cache { // The cache couldn't be loaded with maxBytes size. // This may mean that the cache is split into curr and prev caches. // Try loading it again with maxBytes / 2 size. - maxBytes /= 2 - curr = fastcache.LoadFromFileOrNew(filePath, maxBytes) - return newWorkingSetCache(curr, maxBytes, expireDuration) + prev := fastcache.LoadFromFileOrNew(filePath, maxBytes/2) + c := newCacheInternal(fastcache.New(1024), prev, maxBytes, split) + c.runWatchers(expireDuration) + return c } // The cache has been successfully loaded in full. // Set its' mode to `whole`. - // There is no need in starting expirationWorker and cacheSizeWatcher. - var c Cache - c.curr.Store(curr) - c.prev.Store(fastcache.New(1024)) - c.stopCh = make(chan struct{}) - atomic.StoreUint64(&c.mode, whole) - return &c + // There is no need in runWatchers call. + return newCacheInternal(curr, fastcache.New(1024), maxBytes, whole) } -// New creates new cache with the given maxBytes size and the given expireDuration +// New creates new cache with the given maxBytes capcity and the given expireDuration // for inactive entries. // // Stop must be called on the returned cache when it is no longer needed. @@ -83,31 +82,34 @@ func New(maxBytes int, expireDuration time.Duration) *Cache { // Split maxBytes between curr and prev caches. maxBytes /= 2 curr := fastcache.New(maxBytes) - return newWorkingSetCache(curr, maxBytes, expireDuration) + c := newCacheInternal(curr, fastcache.New(1024), maxBytes, split) + c.runWatchers(expireDuration) + return c } -func newWorkingSetCache(curr *fastcache.Cache, maxBytes int, expireDuration time.Duration) *Cache { - prev := fastcache.New(1024) +func newCacheInternal(curr, prev *fastcache.Cache, maxBytes, mode int) *Cache { var c Cache c.curr.Store(curr) c.prev.Store(prev) c.stopCh = make(chan struct{}) - atomic.StoreUint64(&c.mode, split) - - c.wg.Add(1) - go func() { - defer c.wg.Done() - c.expirationWorker(maxBytes, expireDuration) - }() - c.wg.Add(1) - go func() { - defer c.wg.Done() - c.cacheSizeWatcher(maxBytes) - }() + c.setMode(mode) return &c } -func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) { +func (c *Cache) runWatchers(expireDuration time.Duration) { + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.expirationWatcher(expireDuration) + }() + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.cacheSizeWatcher() + }() +} + +func (c *Cache) expirationWatcher(expireDuration time.Duration) { t := time.NewTicker(expireDuration / 2) for { select { @@ -118,22 +120,25 @@ func (c *Cache) expirationWorker(maxBytes int, expireDuration time.Duration) { } c.mu.Lock() - if atomic.LoadUint64(&c.mode) == split { - // Expire prev cache and create fresh curr cache. - // Do not reuse prev cache, since it can have too big capacity. - prev := c.prev.Load().(*fastcache.Cache) - prev.Reset() - curr := c.curr.Load().(*fastcache.Cache) - curr.UpdateStats(&c.historicalStats) - c.prev.Store(curr) - curr = fastcache.New(maxBytes) - c.curr.Store(curr) + if atomic.LoadUint32(&c.mode) != split { + // Stop the expirationWatcher on non-split mode. + c.mu.Unlock() + return } + // Expire prev cache and create fresh curr cache with c.maxBytes/2 capacity. + // Do not reuse prev cache, since it can have too big capacity. + prev := c.prev.Load().(*fastcache.Cache) + prev.Reset() + curr := c.curr.Load().(*fastcache.Cache) + curr.UpdateStats(&c.historicalStats) + c.prev.Store(curr) + curr = fastcache.New(c.maxBytes / 2) + c.curr.Store(curr) c.mu.Unlock() } } -func (c *Cache) cacheSizeWatcher(maxBytes int) { +func (c *Cache) cacheSizeWatcher() { t := time.NewTicker(time.Minute) defer t.Stop() @@ -146,7 +151,7 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) { var cs fastcache.Stats curr := c.curr.Load().(*fastcache.Cache) curr.UpdateStats(&cs) - if cs.BytesSize >= uint64(maxBytes)/2 { + if cs.BytesSize >= uint64(c.maxBytes)/2 { break } } @@ -159,18 +164,18 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) { // 1) switch to mode=switching // 2) move curr cache to prev // 3) create curr with the double size - // 4) wait until curr size exceeds maxBytes/2, i.e. it is populated with new data + // 4) wait until curr size exceeds c.maxBytes/2, i.e. it is populated with new data // 5) switch to mode=whole // 6) drop prev c.mu.Lock() - atomic.StoreUint64(&c.mode, switching) + c.setMode(switching) prev := c.prev.Load().(*fastcache.Cache) prev.Reset() curr := c.curr.Load().(*fastcache.Cache) curr.UpdateStats(&c.historicalStats) c.prev.Store(curr) - c.curr.Store(fastcache.New(maxBytes * 2)) + c.curr.Store(fastcache.New(c.maxBytes)) c.mu.Unlock() for { @@ -182,20 +187,20 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) { var cs fastcache.Stats curr := c.curr.Load().(*fastcache.Cache) curr.UpdateStats(&cs) - if cs.BytesSize >= uint64(maxBytes)/2 { + if cs.BytesSize >= uint64(c.maxBytes)/2 { break } } c.mu.Lock() - atomic.StoreUint64(&c.mode, whole) + c.setMode(whole) prev = c.prev.Load().(*fastcache.Cache) prev.Reset() c.prev.Store(fastcache.New(1024)) c.mu.Unlock() } -// Save safes the cache to filePath. +// Save saves the cache to filePath. func (c *Cache) Save(filePath string) error { curr := c.curr.Load().(*fastcache.Cache) concurrency := cgroup.AvailableCPUs() @@ -219,7 +224,15 @@ func (c *Cache) Reset() { curr := c.curr.Load().(*fastcache.Cache) curr.Reset() // Reset the mode to `split` in the hope the working set size becomes smaller after the reset. - atomic.StoreUint64(&c.mode, split) + c.setMode(split) +} + +func (c *Cache) setMode(mode int) { + atomic.StoreUint32(&c.mode, uint32(mode)) +} + +func (c *Cache) loadMode() int { + return int(atomic.LoadUint32(&c.mode)) } // UpdateStats updates fcs with cache stats. @@ -235,7 +248,7 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) { fcs.Collisions += atomic.LoadUint64(&hs.Collisions) fcs.Corruptions += atomic.LoadUint64(&hs.Corruptions) - if atomic.LoadUint64(&c.mode) == whole { + if c.loadMode() == whole { return } @@ -257,7 +270,7 @@ func (c *Cache) Get(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if atomic.LoadUint64(&c.mode) == whole { + if c.loadMode() == whole { return result } @@ -279,7 +292,7 @@ func (c *Cache) Has(key []byte) bool { if curr.Has(key) { return true } - if atomic.LoadUint64(&c.mode) == whole { + if c.loadMode() == whole { return false } prev := c.prev.Load().(*fastcache.Cache) @@ -300,7 +313,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { // Fast path - the entry is found in the current cache. return result } - if atomic.LoadUint64(&c.mode) == whole { + if c.loadMode() == whole { return result }