mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/blockcache: optimize blockcache a bit
- Optimize Cache.RemoveBlocksFromPart(), so it doesn't need to iterate over all the cached blocks. - Cache blocks if there were no cache misses during the last 2 minutes. This may be the case when new blocks are added simultaneously to the storage and to the cache. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007
This commit is contained in:
parent
ceb1376267
commit
0d7374ad2f
1 changed files with 47 additions and 47 deletions
|
@ -19,7 +19,7 @@ type Cache struct {
|
|||
misses uint64
|
||||
|
||||
// sizeBytes contains an approximate size for all the blocks stored in the cache.
|
||||
sizeBytes uint64
|
||||
sizeBytes int64
|
||||
|
||||
// getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes.
|
||||
getMaxSizeBytes func() int
|
||||
|
@ -27,13 +27,8 @@ type Cache struct {
|
|||
// mu protects all the fields below.
|
||||
mu sync.RWMutex
|
||||
|
||||
// m contains cached blocks.
|
||||
m map[Key]*cacheEntry
|
||||
|
||||
// perPartEntries contains all the blocks for the given part.
|
||||
//
|
||||
// It is needed for fast deletion of blocks belonging to the given part.
|
||||
perPartEntries map[interface{}]map[uint64]*cacheEntry
|
||||
// m contains cached blocks keyed by Key.Part and then by Key.Offset
|
||||
m map[interface{}]map[uint64]*cacheEntry
|
||||
|
||||
// perKeyMisses contains per-block cache misses.
|
||||
//
|
||||
|
@ -62,9 +57,6 @@ type cacheEntry struct {
|
|||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
lastAccessTime uint64
|
||||
|
||||
// key contains the key for the block.
|
||||
key Key
|
||||
|
||||
// block contains the cached block.
|
||||
block Block
|
||||
}
|
||||
|
@ -75,8 +67,7 @@ type cacheEntry struct {
|
|||
func NewCache(getMaxSizeBytes func() int) *Cache {
|
||||
var c Cache
|
||||
c.getMaxSizeBytes = getMaxSizeBytes
|
||||
c.m = make(map[Key]*cacheEntry)
|
||||
c.perPartEntries = make(map[interface{}]map[uint64]*cacheEntry)
|
||||
c.m = make(map[interface{}]map[uint64]*cacheEntry)
|
||||
c.perKeyMisses = make(map[Key]int)
|
||||
go c.cleaner()
|
||||
return &c
|
||||
|
@ -85,22 +76,18 @@ func NewCache(getMaxSizeBytes func() int) *Cache {
|
|||
// RemoveBlocksForPart removes all the blocks for the given part from the cache.
|
||||
func (c *Cache) RemoveBlocksForPart(p interface{}) {
|
||||
c.mu.Lock()
|
||||
for _, e := range c.perPartEntries[p] {
|
||||
c.deleteEntryLocked(e, false)
|
||||
sizeBytes := 0
|
||||
for _, e := range c.m[p] {
|
||||
sizeBytes += e.block.SizeBytes()
|
||||
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
|
||||
}
|
||||
delete(c.perPartEntries, p)
|
||||
c.updateSizeBytes(-sizeBytes)
|
||||
delete(c.m, p)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Cache) deleteEntryLocked(e *cacheEntry, removePartEntry bool) {
|
||||
n := uint64(e.block.SizeBytes())
|
||||
atomic.AddUint64(&c.sizeBytes, (^n)+1)
|
||||
key := e.key
|
||||
delete(c.m, key)
|
||||
delete(c.perKeyMisses, key)
|
||||
if removePartEntry {
|
||||
delete(c.perPartEntries[key.Part], key.Offset)
|
||||
}
|
||||
func (c *Cache) updateSizeBytes(n int) {
|
||||
atomic.AddInt64(&c.sizeBytes, int64(n))
|
||||
}
|
||||
|
||||
// cleaner periodically cleans least recently used entries in c.
|
||||
|
@ -124,11 +111,15 @@ func (c *Cache) cleaner() {
|
|||
func (c *Cache) cleanByTimeout() {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
c.mu.Lock()
|
||||
for _, e := range c.m {
|
||||
// Delete items accessed more than two minutes ago.
|
||||
// This time should be enough for repeated queries.
|
||||
if currentTime-atomic.LoadUint64(&e.lastAccessTime) > 2*60 {
|
||||
c.deleteEntryLocked(e, true)
|
||||
for _, pes := range c.m {
|
||||
for offset, e := range pes {
|
||||
// Delete items accessed more than two minutes ago.
|
||||
// This time should be enough for repeated queries.
|
||||
if currentTime-atomic.LoadUint64(&e.lastAccessTime) > 2*60 {
|
||||
c.updateSizeBytes(-e.block.SizeBytes())
|
||||
delete(pes, offset)
|
||||
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
|
||||
}
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
@ -137,8 +128,12 @@ func (c *Cache) cleanByTimeout() {
|
|||
// GetBlock returns a block for the given key k from c.
|
||||
func (c *Cache) GetBlock(k Key) Block {
|
||||
atomic.AddUint64(&c.requests, 1)
|
||||
var e *cacheEntry
|
||||
c.mu.RLock()
|
||||
e := c.m[k]
|
||||
pes := c.m[k.Part]
|
||||
if pes != nil {
|
||||
e = pes[k.Offset]
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
if e != nil {
|
||||
// Fast path - the block already exists in the cache, so return it to the caller.
|
||||
|
@ -156,10 +151,13 @@ func (c *Cache) GetBlock(k Key) Block {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PutBlock puts the given block under the given key into c.
|
||||
func (c *Cache) PutBlock(key Key, block Block) {
|
||||
// PutBlock puts the given block b under the given key k into c.
|
||||
func (c *Cache) PutBlock(k Key, b Block) {
|
||||
c.mu.RLock()
|
||||
doNotCache := c.perKeyMisses[key] < 2
|
||||
// If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon.
|
||||
// Do not cache the entry only if there was only a single unsuccessful attempt to access it.
|
||||
// This may be one-time-wonders entry, which won't be accessed more, so there is no need in caching it.
|
||||
doNotCache := c.perKeyMisses[k] == 1
|
||||
c.mu.RUnlock()
|
||||
if doNotCache {
|
||||
// Do not cache b if it has been requested only once (aka one-time-wonders items).
|
||||
|
@ -171,28 +169,30 @@ func (c *Cache) PutBlock(key Key, block Block) {
|
|||
c.mu.Lock()
|
||||
e := &cacheEntry{
|
||||
lastAccessTime: fasttime.UnixTimestamp(),
|
||||
key: key,
|
||||
block: block,
|
||||
block: b,
|
||||
}
|
||||
c.m[key] = e
|
||||
pes := c.perPartEntries[key.Part]
|
||||
pes := c.m[k.Part]
|
||||
if pes == nil {
|
||||
pes = make(map[uint64]*cacheEntry)
|
||||
c.perPartEntries[key.Part] = pes
|
||||
c.m[k.Part] = pes
|
||||
}
|
||||
pes[key.Offset] = e
|
||||
n := uint64(e.block.SizeBytes())
|
||||
atomic.AddUint64(&c.sizeBytes, n)
|
||||
pes[k.Offset] = e
|
||||
c.updateSizeBytes(e.block.SizeBytes())
|
||||
maxSizeBytes := c.getMaxSizeBytes()
|
||||
if c.SizeBytes() > maxSizeBytes {
|
||||
// Entries in the cache occupy too much space. Free up space by deleting some entries.
|
||||
for _, e := range c.m {
|
||||
c.deleteEntryLocked(e, true)
|
||||
if c.SizeBytes() < maxSizeBytes {
|
||||
break
|
||||
for _, pes := range c.m {
|
||||
for offset, e := range pes {
|
||||
c.updateSizeBytes(-e.block.SizeBytes())
|
||||
delete(pes, offset)
|
||||
// do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later.
|
||||
if c.SizeBytes() < maxSizeBytes {
|
||||
goto end
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
end:
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
|
@ -206,7 +206,7 @@ func (c *Cache) Len() int {
|
|||
|
||||
// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c.
|
||||
func (c *Cache) SizeBytes() int {
|
||||
return int(atomic.LoadUint64(&c.sizeBytes))
|
||||
return int(atomic.LoadInt64(&c.sizeBytes))
|
||||
}
|
||||
|
||||
// SizeMaxBytes returns the max allowed size in bytes for c.
|
||||
|
|
Loading…
Reference in a new issue