mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/blockcache: evict entries from the cache in LRU order
This should improve hit rate for smaller caches
This commit is contained in:
parent
cdd632985f
commit
d4df32cc6b
1 changed files with 87 additions and 50 deletions
|
@ -1,6 +1,7 @@
|
||||||
package blockcache
|
package blockcache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/heap"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -135,7 +136,7 @@ func (c *Cache) Misses() uint64 {
|
||||||
func (c *Cache) cleaner() {
|
func (c *Cache) cleaner() {
|
||||||
ticker := time.NewTicker(57 * time.Second)
|
ticker := time.NewTicker(57 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
perKeyMissesTicker := time.NewTicker(7 * time.Minute)
|
perKeyMissesTicker := time.NewTicker(3 * time.Minute)
|
||||||
defer perKeyMissesTicker.Stop()
|
defer perKeyMissesTicker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -176,7 +177,7 @@ type cache struct {
|
||||||
getMaxSizeBytes func() int
|
getMaxSizeBytes func() int
|
||||||
|
|
||||||
// mu protects all the fields below.
|
// mu protects all the fields below.
|
||||||
mu sync.RWMutex
|
mu sync.Mutex
|
||||||
|
|
||||||
// m contains cached blocks keyed by Key.Part and then by Key.Offset
|
// m contains cached blocks keyed by Key.Part and then by Key.Offset
|
||||||
m map[interface{}]map[uint64]*cacheEntry
|
m map[interface{}]map[uint64]*cacheEntry
|
||||||
|
@ -185,6 +186,9 @@ type cache struct {
|
||||||
//
|
//
|
||||||
// Blocks with less than 2 cache misses aren't stored in the cache in order to prevent from eviction for frequently accessed items.
|
// Blocks with less than 2 cache misses aren't stored in the cache in order to prevent from eviction for frequently accessed items.
|
||||||
perKeyMisses map[Key]int
|
perKeyMisses map[Key]int
|
||||||
|
|
||||||
|
// The heap for removing the least recently used entries from m.
|
||||||
|
lah lastAccessHeap
|
||||||
}
|
}
|
||||||
|
|
||||||
// Key represents a key, which uniquely identifies the Block.
|
// Key represents a key, which uniquely identifies the Block.
|
||||||
|
@ -208,13 +212,17 @@ type Block interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type cacheEntry struct {
|
type cacheEntry struct {
|
||||||
// Atomically updated fields must go first in the struct, so they are properly
|
// The timestamp in seconds for the last access to the given entry.
|
||||||
// aligned to 8 bytes on 32-bit architectures.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
|
||||||
lastAccessTime uint64
|
lastAccessTime uint64
|
||||||
|
|
||||||
// block contains the cached block.
|
// heapIdx is the index for the entry in lastAccessHeap.
|
||||||
block Block
|
heapIdx int
|
||||||
|
|
||||||
|
// k contains the associated key for the given block.
|
||||||
|
k Key
|
||||||
|
|
||||||
|
// b contains the cached block.
|
||||||
|
b Block
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCache(getMaxSizeBytes func() int) *cache {
|
func newCache(getMaxSizeBytes func() int) *cache {
|
||||||
|
@ -227,14 +235,16 @@ func newCache(getMaxSizeBytes func() int) *cache {
|
||||||
|
|
||||||
func (c *cache) RemoveBlocksForPart(p interface{}) {
|
func (c *cache) RemoveBlocksForPart(p interface{}) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
sizeBytes := 0
|
sizeBytes := 0
|
||||||
for _, e := range c.m[p] {
|
for _, e := range c.m[p] {
|
||||||
sizeBytes += e.block.SizeBytes()
|
sizeBytes += e.b.SizeBytes()
|
||||||
|
heap.Remove(&c.lah, e.heapIdx)
|
||||||
// do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later.
|
// do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later.
|
||||||
}
|
}
|
||||||
c.updateSizeBytes(-sizeBytes)
|
c.updateSizeBytes(-sizeBytes)
|
||||||
delete(c.m, p)
|
delete(c.m, p)
|
||||||
c.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) updateSizeBytes(n int) {
|
func (c *cache) updateSizeBytes(n int) {
|
||||||
|
@ -248,55 +258,56 @@ func (c *cache) cleanPerKeyMisses() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) cleanByTimeout() {
|
func (c *cache) cleanByTimeout() {
|
||||||
// Delete items accessed more than five minutes ago.
|
// Delete items accessed more than three minutes ago.
|
||||||
// This time should be enough for repeated queries.
|
// This time should be enough for repeated queries.
|
||||||
lastAccessTime := fasttime.UnixTimestamp() - 5*60
|
lastAccessTime := fasttime.UnixTimestamp() - 3*60
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
for _, pes := range c.m {
|
for len(c.lah) > 0 {
|
||||||
for offset, e := range pes {
|
e := c.lah[0]
|
||||||
if lastAccessTime > atomic.LoadUint64(&e.lastAccessTime) {
|
if lastAccessTime < e.lastAccessTime {
|
||||||
c.updateSizeBytes(-e.block.SizeBytes())
|
break
|
||||||
delete(pes, offset)
|
|
||||||
// do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
c.updateSizeBytes(-e.b.SizeBytes())
|
||||||
|
pes := c.m[e.k.Part]
|
||||||
|
delete(pes, e.k.Offset)
|
||||||
|
heap.Pop(&c.lah)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) GetBlock(k Key) Block {
|
func (c *cache) GetBlock(k Key) Block {
|
||||||
atomic.AddUint64(&c.requests, 1)
|
atomic.AddUint64(&c.requests, 1)
|
||||||
var e *cacheEntry
|
var e *cacheEntry
|
||||||
c.mu.RLock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
pes := c.m[k.Part]
|
pes := c.m[k.Part]
|
||||||
if pes != nil {
|
if pes != nil {
|
||||||
e = pes[k.Offset]
|
e = pes[k.Offset]
|
||||||
}
|
if e != nil {
|
||||||
c.mu.RUnlock()
|
// Fast path - the block already exists in the cache, so return it to the caller.
|
||||||
if e != nil {
|
currentTime := fasttime.UnixTimestamp()
|
||||||
// Fast path - the block already exists in the cache, so return it to the caller.
|
if e.lastAccessTime != currentTime {
|
||||||
currentTime := fasttime.UnixTimestamp()
|
e.lastAccessTime = currentTime
|
||||||
if atomic.LoadUint64(&e.lastAccessTime) != currentTime {
|
heap.Fix(&c.lah, e.heapIdx)
|
||||||
atomic.StoreUint64(&e.lastAccessTime, currentTime)
|
}
|
||||||
|
return e.b
|
||||||
}
|
}
|
||||||
return e.block
|
|
||||||
}
|
}
|
||||||
// Slow path - the entry is missing in the cache.
|
// Slow path - the entry is missing in the cache.
|
||||||
c.mu.Lock()
|
|
||||||
c.perKeyMisses[k]++
|
c.perKeyMisses[k]++
|
||||||
c.mu.Unlock()
|
|
||||||
atomic.AddUint64(&c.misses, 1)
|
atomic.AddUint64(&c.misses, 1)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) PutBlock(k Key, b Block) {
|
func (c *cache) PutBlock(k Key, b Block) {
|
||||||
c.mu.RLock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
// If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon.
|
// If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon.
|
||||||
// Do not cache the entry only if there was only a single unsuccessful attempt to access it.
|
// Do not cache the entry only if there was only a single unsuccessful attempt to access it.
|
||||||
// This may be one-time-wonders entry, which won't be accessed more, so there is no need in caching it.
|
// This may be one-time-wonders entry, which won't be accessed more, so there is no need in caching it.
|
||||||
doNotCache := c.perKeyMisses[k] == 1
|
doNotCache := c.perKeyMisses[k] == 1
|
||||||
c.mu.RUnlock()
|
|
||||||
if doNotCache {
|
if doNotCache {
|
||||||
// Do not cache b if it has been requested only once (aka one-time-wonders items).
|
// Do not cache b if it has been requested only once (aka one-time-wonders items).
|
||||||
// This should reduce memory usage for the cache.
|
// This should reduce memory usage for the cache.
|
||||||
|
@ -304,9 +315,6 @@ func (c *cache) PutBlock(k Key, b Block) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store b in the cache.
|
// Store b in the cache.
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
pes := c.m[k.Part]
|
pes := c.m[k.Part]
|
||||||
if pes == nil {
|
if pes == nil {
|
||||||
pes = make(map[uint64]*cacheEntry)
|
pes = make(map[uint64]*cacheEntry)
|
||||||
|
@ -317,33 +325,30 @@ func (c *cache) PutBlock(k Key, b Block) {
|
||||||
}
|
}
|
||||||
e := &cacheEntry{
|
e := &cacheEntry{
|
||||||
lastAccessTime: fasttime.UnixTimestamp(),
|
lastAccessTime: fasttime.UnixTimestamp(),
|
||||||
block: b,
|
k: k,
|
||||||
|
b: b,
|
||||||
}
|
}
|
||||||
|
heap.Push(&c.lah, e)
|
||||||
pes[k.Offset] = e
|
pes[k.Offset] = e
|
||||||
c.updateSizeBytes(e.block.SizeBytes())
|
c.updateSizeBytes(e.b.SizeBytes())
|
||||||
maxSizeBytes := c.getMaxSizeBytes()
|
maxSizeBytes := c.getMaxSizeBytes()
|
||||||
if c.SizeBytes() > maxSizeBytes {
|
for c.SizeBytes() > maxSizeBytes && len(c.lah) > 0 {
|
||||||
// Entries in the cache occupy too much space. Free up space by deleting some entries.
|
e := c.lah[0]
|
||||||
for _, pes := range c.m {
|
c.updateSizeBytes(-e.b.SizeBytes())
|
||||||
for offset, e := range pes {
|
pes := c.m[e.k.Part]
|
||||||
c.updateSizeBytes(-e.block.SizeBytes())
|
delete(pes, e.k.Offset)
|
||||||
delete(pes, offset)
|
heap.Pop(&c.lah)
|
||||||
// do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later.
|
|
||||||
if c.SizeBytes() < maxSizeBytes {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) Len() int {
|
func (c *cache) Len() int {
|
||||||
c.mu.RLock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
n := 0
|
n := 0
|
||||||
for _, m := range c.m {
|
for _, m := range c.m {
|
||||||
n += len(m)
|
n += len(m)
|
||||||
}
|
}
|
||||||
c.mu.RUnlock()
|
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,3 +367,35 @@ func (c *cache) Requests() uint64 {
|
||||||
func (c *cache) Misses() uint64 {
|
func (c *cache) Misses() uint64 {
|
||||||
return atomic.LoadUint64(&c.misses)
|
return atomic.LoadUint64(&c.misses)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// lastAccessHeap implements heap.Interface
|
||||||
|
type lastAccessHeap []*cacheEntry
|
||||||
|
|
||||||
|
func (lah *lastAccessHeap) Len() int {
|
||||||
|
return len(*lah)
|
||||||
|
}
|
||||||
|
func (lah *lastAccessHeap) Swap(i, j int) {
|
||||||
|
h := *lah
|
||||||
|
a := h[i]
|
||||||
|
b := h[j]
|
||||||
|
a.heapIdx = j
|
||||||
|
b.heapIdx = i
|
||||||
|
h[i] = b
|
||||||
|
h[j] = a
|
||||||
|
}
|
||||||
|
func (lah *lastAccessHeap) Less(i, j int) bool {
|
||||||
|
h := *lah
|
||||||
|
return h[i].lastAccessTime < h[j].lastAccessTime
|
||||||
|
}
|
||||||
|
func (lah *lastAccessHeap) Push(x interface{}) {
|
||||||
|
e := x.(*cacheEntry)
|
||||||
|
h := *lah
|
||||||
|
e.heapIdx = len(h)
|
||||||
|
*lah = append(h, e)
|
||||||
|
}
|
||||||
|
func (lah *lastAccessHeap) Pop() interface{} {
|
||||||
|
h := *lah
|
||||||
|
e := h[len(h)-1]
|
||||||
|
*lah = h[:len(h)-1]
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue