From 0d7374ad2f68d22de96c1731d83db1343584cb52 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 23 Jan 2022 13:04:57 +0200 Subject: [PATCH] lib/blockcache: optimize blockcache a bit - Optimize Cache.RemoveBlocksFromPart(), so it doesn't need to iterate over all the cached blocks. - Cache blocks if there were no cache misses during the last 2 minutes. This may be the case when new blocks are added simultaneously to the storage and to the cache. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007 --- lib/blockcache/blockcache.go | 94 ++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index 92f03f348c..62920e50c5 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -19,7 +19,7 @@ type Cache struct { misses uint64 // sizeBytes contains an approximate size for all the blocks stored in the cache. - sizeBytes uint64 + sizeBytes int64 // getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes. getMaxSizeBytes func() int @@ -27,13 +27,8 @@ type Cache struct { // mu protects all the fields below. mu sync.RWMutex - // m contains cached blocks. - m map[Key]*cacheEntry - - // perPartEntries contains all the blocks for the given part. - // - // It is needed for fast deletion of blocks belonging to the given part. - perPartEntries map[interface{}]map[uint64]*cacheEntry + // m contains cached blocks keyed by Key.Part and then by Key.Offset + m map[interface{}]map[uint64]*cacheEntry // perKeyMisses contains per-block cache misses. // @@ -62,9 +57,6 @@ type cacheEntry struct { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 lastAccessTime uint64 - // key contains the key for the block. - key Key - // block contains the cached block. block Block } @@ -75,8 +67,7 @@ type cacheEntry struct { func NewCache(getMaxSizeBytes func() int) *Cache { var c Cache c.getMaxSizeBytes = getMaxSizeBytes - c.m = make(map[Key]*cacheEntry) - c.perPartEntries = make(map[interface{}]map[uint64]*cacheEntry) + c.m = make(map[interface{}]map[uint64]*cacheEntry) c.perKeyMisses = make(map[Key]int) go c.cleaner() return &c @@ -85,22 +76,18 @@ func NewCache(getMaxSizeBytes func() int) *Cache { // RemoveBlocksForPart removes all the blocks for the given part from the cache. func (c *Cache) RemoveBlocksForPart(p interface{}) { c.mu.Lock() - for _, e := range c.perPartEntries[p] { - c.deleteEntryLocked(e, false) + sizeBytes := 0 + for _, e := range c.m[p] { + sizeBytes += e.block.SizeBytes() + // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. } - delete(c.perPartEntries, p) + c.updateSizeBytes(-sizeBytes) + delete(c.m, p) c.mu.Unlock() } -func (c *Cache) deleteEntryLocked(e *cacheEntry, removePartEntry bool) { - n := uint64(e.block.SizeBytes()) - atomic.AddUint64(&c.sizeBytes, (^n)+1) - key := e.key - delete(c.m, key) - delete(c.perKeyMisses, key) - if removePartEntry { - delete(c.perPartEntries[key.Part], key.Offset) - } +func (c *Cache) updateSizeBytes(n int) { + atomic.AddInt64(&c.sizeBytes, int64(n)) } // cleaner periodically cleans least recently used entries in c. @@ -124,11 +111,15 @@ func (c *Cache) cleaner() { func (c *Cache) cleanByTimeout() { currentTime := fasttime.UnixTimestamp() c.mu.Lock() - for _, e := range c.m { - // Delete items accessed more than two minutes ago. - // This time should be enough for repeated queries. - if currentTime-atomic.LoadUint64(&e.lastAccessTime) > 2*60 { - c.deleteEntryLocked(e, true) + for _, pes := range c.m { + for offset, e := range pes { + // Delete items accessed more than two minutes ago. + // This time should be enough for repeated queries. + if currentTime-atomic.LoadUint64(&e.lastAccessTime) > 2*60 { + c.updateSizeBytes(-e.block.SizeBytes()) + delete(pes, offset) + // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + } } } c.mu.Unlock() @@ -137,8 +128,12 @@ func (c *Cache) cleanByTimeout() { // GetBlock returns a block for the given key k from c. func (c *Cache) GetBlock(k Key) Block { atomic.AddUint64(&c.requests, 1) + var e *cacheEntry c.mu.RLock() - e := c.m[k] + pes := c.m[k.Part] + if pes != nil { + e = pes[k.Offset] + } c.mu.RUnlock() if e != nil { // Fast path - the block already exists in the cache, so return it to the caller. @@ -156,10 +151,13 @@ func (c *Cache) GetBlock(k Key) Block { return nil } -// PutBlock puts the given block under the given key into c. -func (c *Cache) PutBlock(key Key, block Block) { +// PutBlock puts the given block b under the given key k into c. +func (c *Cache) PutBlock(k Key, b Block) { c.mu.RLock() - doNotCache := c.perKeyMisses[key] < 2 + // If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon. + // Do not cache the entry only if there was only a single unsuccessful attempt to access it. + // This may be one-time-wonders entry, which won't be accessed more, so there is no need in caching it. + doNotCache := c.perKeyMisses[k] == 1 c.mu.RUnlock() if doNotCache { // Do not cache b if it has been requested only once (aka one-time-wonders items). @@ -171,28 +169,30 @@ func (c *Cache) PutBlock(key Key, block Block) { c.mu.Lock() e := &cacheEntry{ lastAccessTime: fasttime.UnixTimestamp(), - key: key, - block: block, + block: b, } - c.m[key] = e - pes := c.perPartEntries[key.Part] + pes := c.m[k.Part] if pes == nil { pes = make(map[uint64]*cacheEntry) - c.perPartEntries[key.Part] = pes + c.m[k.Part] = pes } - pes[key.Offset] = e - n := uint64(e.block.SizeBytes()) - atomic.AddUint64(&c.sizeBytes, n) + pes[k.Offset] = e + c.updateSizeBytes(e.block.SizeBytes()) maxSizeBytes := c.getMaxSizeBytes() if c.SizeBytes() > maxSizeBytes { // Entries in the cache occupy too much space. Free up space by deleting some entries. - for _, e := range c.m { - c.deleteEntryLocked(e, true) - if c.SizeBytes() < maxSizeBytes { - break + for _, pes := range c.m { + for offset, e := range pes { + c.updateSizeBytes(-e.block.SizeBytes()) + delete(pes, offset) + // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + if c.SizeBytes() < maxSizeBytes { + goto end + } } } } +end: c.mu.Unlock() } @@ -206,7 +206,7 @@ func (c *Cache) Len() int { // SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c. func (c *Cache) SizeBytes() int { - return int(atomic.LoadUint64(&c.sizeBytes)) + return int(atomic.LoadInt64(&c.sizeBytes)) } // SizeMaxBytes returns the max allowed size in bytes for c.