lib/blockcache: evict entries from the cache in LRU order

This should improve hit rate for smaller caches
This commit is contained in:
Aliaksandr Valialkin 2022-02-21 17:44:19 +02:00
parent 5808774e06
commit 5d45ea1003
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -1,6 +1,7 @@
package blockcache
import (
"container/heap"
"sync"
"sync/atomic"
"time"
@ -135,7 +136,7 @@ func (c *Cache) Misses() uint64 {
func (c *Cache) cleaner() {
ticker := time.NewTicker(57 * time.Second)
defer ticker.Stop()
perKeyMissesTicker := time.NewTicker(7 * time.Minute)
perKeyMissesTicker := time.NewTicker(3 * time.Minute)
defer perKeyMissesTicker.Stop()
for {
select {
@ -176,7 +177,7 @@ type cache struct {
getMaxSizeBytes func() int
// mu protects all the fields below.
mu sync.RWMutex
mu sync.Mutex
// m contains cached blocks keyed by Key.Part and then by Key.Offset
m map[interface{}]map[uint64]*cacheEntry
@ -185,6 +186,9 @@ type cache struct {
//
// Blocks with less than 2 cache misses aren't stored in the cache in order to prevent from eviction for frequently accessed items.
perKeyMisses map[Key]int
// The heap for removing the least recently used entries from m.
lah lastAccessHeap
}
// Key represents a key, which uniquely identifies the Block.
@ -208,13 +212,17 @@ type Block interface {
}
type cacheEntry struct {
// Atomically updated fields must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
// The timestamp in seconds for the last access to the given entry.
lastAccessTime uint64
// block contains the cached block.
block Block
// heapIdx is the index for the entry in lastAccessHeap.
heapIdx int
// k contains the associated key for the given block.
k Key
// b contains the cached block.
b Block
}
func newCache(getMaxSizeBytes func() int) *cache {
@ -227,14 +235,16 @@ func newCache(getMaxSizeBytes func() int) *cache {
func (c *cache) RemoveBlocksForPart(p interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
sizeBytes := 0
for _, e := range c.m[p] {
sizeBytes += e.block.SizeBytes()
sizeBytes += e.b.SizeBytes()
heap.Remove(&c.lah, e.heapIdx)
// do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later.
}
c.updateSizeBytes(-sizeBytes)
delete(c.m, p)
c.mu.Unlock()
}
func (c *cache) updateSizeBytes(n int) {
@ -248,55 +258,56 @@ func (c *cache) cleanPerKeyMisses() {
}
func (c *cache) cleanByTimeout() {
// Delete items accessed more than five minutes ago.
// Delete items accessed more than three minutes ago.
// This time should be enough for repeated queries.
lastAccessTime := fasttime.UnixTimestamp() - 5*60
lastAccessTime := fasttime.UnixTimestamp() - 3*60
c.mu.Lock()
defer c.mu.Unlock()
for _, pes := range c.m {
for offset, e := range pes {
if lastAccessTime > atomic.LoadUint64(&e.lastAccessTime) {
c.updateSizeBytes(-e.block.SizeBytes())
delete(pes, offset)
// do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later.
}
for len(c.lah) > 0 {
e := c.lah[0]
if lastAccessTime < e.lastAccessTime {
break
}
c.updateSizeBytes(-e.b.SizeBytes())
pes := c.m[e.k.Part]
delete(pes, e.k.Offset)
heap.Pop(&c.lah)
}
}
func (c *cache) GetBlock(k Key) Block {
atomic.AddUint64(&c.requests, 1)
var e *cacheEntry
c.mu.RLock()
c.mu.Lock()
defer c.mu.Unlock()
pes := c.m[k.Part]
if pes != nil {
e = pes[k.Offset]
}
c.mu.RUnlock()
if e != nil {
// Fast path - the block already exists in the cache, so return it to the caller.
currentTime := fasttime.UnixTimestamp()
if atomic.LoadUint64(&e.lastAccessTime) != currentTime {
atomic.StoreUint64(&e.lastAccessTime, currentTime)
if e != nil {
// Fast path - the block already exists in the cache, so return it to the caller.
currentTime := fasttime.UnixTimestamp()
if e.lastAccessTime != currentTime {
e.lastAccessTime = currentTime
heap.Fix(&c.lah, e.heapIdx)
}
return e.b
}
return e.block
}
// Slow path - the entry is missing in the cache.
c.mu.Lock()
c.perKeyMisses[k]++
c.mu.Unlock()
atomic.AddUint64(&c.misses, 1)
return nil
}
func (c *cache) PutBlock(k Key, b Block) {
c.mu.RLock()
c.mu.Lock()
defer c.mu.Unlock()
// If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon.
// Do not cache the entry only if there was only a single unsuccessful attempt to access it.
// This may be one-time-wonders entry, which won't be accessed more, so there is no need in caching it.
doNotCache := c.perKeyMisses[k] == 1
c.mu.RUnlock()
if doNotCache {
// Do not cache b if it has been requested only once (aka one-time-wonders items).
// This should reduce memory usage for the cache.
@ -304,9 +315,6 @@ func (c *cache) PutBlock(k Key, b Block) {
}
// Store b in the cache.
c.mu.Lock()
defer c.mu.Unlock()
pes := c.m[k.Part]
if pes == nil {
pes = make(map[uint64]*cacheEntry)
@ -317,33 +325,30 @@ func (c *cache) PutBlock(k Key, b Block) {
}
e := &cacheEntry{
lastAccessTime: fasttime.UnixTimestamp(),
block: b,
k: k,
b: b,
}
heap.Push(&c.lah, e)
pes[k.Offset] = e
c.updateSizeBytes(e.block.SizeBytes())
c.updateSizeBytes(e.b.SizeBytes())
maxSizeBytes := c.getMaxSizeBytes()
if c.SizeBytes() > maxSizeBytes {
// Entries in the cache occupy too much space. Free up space by deleting some entries.
for _, pes := range c.m {
for offset, e := range pes {
c.updateSizeBytes(-e.block.SizeBytes())
delete(pes, offset)
// do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later.
if c.SizeBytes() < maxSizeBytes {
return
}
}
}
for c.SizeBytes() > maxSizeBytes && len(c.lah) > 0 {
e := c.lah[0]
c.updateSizeBytes(-e.b.SizeBytes())
pes := c.m[e.k.Part]
delete(pes, e.k.Offset)
heap.Pop(&c.lah)
}
}
func (c *cache) Len() int {
c.mu.RLock()
c.mu.Lock()
defer c.mu.Unlock()
n := 0
for _, m := range c.m {
n += len(m)
}
c.mu.RUnlock()
return n
}
@ -362,3 +367,35 @@ func (c *cache) Requests() uint64 {
func (c *cache) Misses() uint64 {
return atomic.LoadUint64(&c.misses)
}
// lastAccessHeap implements heap.Interface
type lastAccessHeap []*cacheEntry
func (lah *lastAccessHeap) Len() int {
return len(*lah)
}
func (lah *lastAccessHeap) Swap(i, j int) {
h := *lah
a := h[i]
b := h[j]
a.heapIdx = j
b.heapIdx = i
h[i] = b
h[j] = a
}
func (lah *lastAccessHeap) Less(i, j int) bool {
h := *lah
return h[i].lastAccessTime < h[j].lastAccessTime
}
func (lah *lastAccessHeap) Push(x interface{}) {
e := x.(*cacheEntry)
h := *lah
e.heapIdx = len(h)
*lah = append(h, e)
}
func (lah *lastAccessHeap) Pop() interface{} {
h := *lah
e := h[len(h)-1]
*lah = h[:len(h)-1]
return e
}