From fae304086881d56a43d5b373dd5116c7eff28c3e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 8 Feb 2022 19:44:26 +0200 Subject: [PATCH] lib/blockcache: split the cache into multiple shards This should reduce contention on cache mutex on hosts with many CPU cores, which, in turn, should increase overall throughput for the cache. This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007 --- lib/blockcache/blockcache.go | 142 ++++++++++++++++++++++++++++------- 1 file changed, 114 insertions(+), 28 deletions(-) diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index 4e829cdec..8c8cafc56 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -12,6 +13,103 @@ import ( // // Call NewCache() for creating new Cache. type Cache struct { + shards []*cache +} + +// NewCache creates new cache. +// +// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback. +func NewCache(getMaxSizeBytes func() int) *Cache { + shardsCount := cgroup.AvailableCPUs() + shards := make([]*cache, shardsCount) + getMaxShardBytes := func() int { + n := getMaxSizeBytes() + return n / shardsCount + } + for i := range shards { + shards[i] = newCache(getMaxShardBytes) + } + return &Cache{ + shards: shards, + } +} + +// RemoveBlocksForPart removes all the blocks for the given part from the cache. +func (c *Cache) RemoveBlocksForPart(p interface{}) { + for _, shard := range c.shards { + shard.RemoveBlocksForPart(p) + } +} + +// GetBlock returns a block for the given key k from c. +func (c *Cache) GetBlock(k Key) Block { + h := fastHashUint64(k.Offset) + idx := h % uint64(len(c.shards)) + shard := c.shards[idx] + return shard.GetBlock(k) +} + +// PutBlock puts the given block b under the given key k into c. +func (c *Cache) PutBlock(k Key, b Block) { + h := fastHashUint64(k.Offset) + idx := h % uint64(len(c.shards)) + shard := c.shards[idx] + shard.PutBlock(k, b) +} + +// Len returns the number of blocks in the cache c. +func (c *Cache) Len() int { + n := 0 + for _, shard := range c.shards { + n += shard.Len() + } + return n +} + +// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c. +func (c *Cache) SizeBytes() int { + n := 0 + for _, shard := range c.shards { + n += shard.SizeBytes() + } + return n +} + +// SizeMaxBytes returns the max allowed size in bytes for c. +func (c *Cache) SizeMaxBytes() int { + n := 0 + for _, shard := range c.shards { + n += shard.SizeMaxBytes() + } + return n +} + +// Requests returns the number of requests served by c. +func (c *Cache) Requests() uint64 { + n := uint64(0) + for _, shard := range c.shards { + n += shard.Requests() + } + return n +} + +// Misses returns the number of cache misses for c. +func (c *Cache) Misses() uint64 { + n := uint64(0) + for _, shard := range c.shards { + n += shard.Misses() + } + return n +} + +func fastHashUint64(x uint64) uint64 { + x ^= x >> 12 // a + x ^= x << 25 // b + x ^= x >> 27 // c + return x * 2685821657736338717 +} + +type cache struct { // Atomically updated fields must go first in the struct, so they are properly // aligned to 8 bytes on 32-bit architectures. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 @@ -61,11 +159,8 @@ type cacheEntry struct { block Block } -// NewCache creates new cache. -// -// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback. -func NewCache(getMaxSizeBytes func() int) *Cache { - var c Cache +func newCache(getMaxSizeBytes func() int) *cache { + var c cache c.getMaxSizeBytes = getMaxSizeBytes c.m = make(map[interface{}]map[uint64]*cacheEntry) c.perKeyMisses = make(map[Key]int) @@ -73,25 +168,23 @@ func NewCache(getMaxSizeBytes func() int) *Cache { return &c } -// RemoveBlocksForPart removes all the blocks for the given part from the cache. -func (c *Cache) RemoveBlocksForPart(p interface{}) { +func (c *cache) RemoveBlocksForPart(p interface{}) { c.mu.Lock() sizeBytes := 0 for _, e := range c.m[p] { sizeBytes += e.block.SizeBytes() - // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. } c.updateSizeBytes(-sizeBytes) delete(c.m, p) c.mu.Unlock() } -func (c *Cache) updateSizeBytes(n int) { +func (c *cache) updateSizeBytes(n int) { atomic.AddInt64(&c.sizeBytes, int64(n)) } -// cleaner periodically cleans least recently used entries in c. -func (c *Cache) cleaner() { +func (c *cache) cleaner() { ticker := time.NewTicker(57 * time.Second) defer ticker.Stop() perKeyMissesTicker := time.NewTicker(2 * time.Minute) @@ -108,7 +201,7 @@ func (c *Cache) cleaner() { } } -func (c *Cache) cleanByTimeout() { +func (c *cache) cleanByTimeout() { // Delete items accessed more than five minutes ago. // This time should be enough for repeated queries. lastAccessTime := fasttime.UnixTimestamp() - 5*60 @@ -120,14 +213,13 @@ func (c *Cache) cleanByTimeout() { if lastAccessTime > atomic.LoadUint64(&e.lastAccessTime) { c.updateSizeBytes(-e.block.SizeBytes()) delete(pes, offset) - // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. } } } } -// GetBlock returns a block for the given key k from c. -func (c *Cache) GetBlock(k Key) Block { +func (c *cache) GetBlock(k Key) Block { atomic.AddUint64(&c.requests, 1) var e *cacheEntry c.mu.RLock() @@ -152,8 +244,7 @@ func (c *Cache) GetBlock(k Key) Block { return nil } -// PutBlock puts the given block b under the given key k into c. -func (c *Cache) PutBlock(k Key, b Block) { +func (c *cache) PutBlock(k Key, b Block) { c.mu.RLock() // If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon. // Do not cache the entry only if there was only a single unsuccessful attempt to access it. @@ -191,7 +282,7 @@ func (c *Cache) PutBlock(k Key, b Block) { for offset, e := range pes { c.updateSizeBytes(-e.block.SizeBytes()) delete(pes, offset) - // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. if c.SizeBytes() < maxSizeBytes { return } @@ -200,8 +291,7 @@ func (c *Cache) PutBlock(k Key, b Block) { } } -// Len returns the number of blocks in the cache c. -func (c *Cache) Len() int { +func (c *cache) Len() int { c.mu.RLock() n := 0 for _, m := range c.m { @@ -211,22 +301,18 @@ func (c *Cache) Len() int { return n } -// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c. -func (c *Cache) SizeBytes() int { +func (c *cache) SizeBytes() int { return int(atomic.LoadInt64(&c.sizeBytes)) } -// SizeMaxBytes returns the max allowed size in bytes for c. -func (c *Cache) SizeMaxBytes() int { +func (c *cache) SizeMaxBytes() int { return c.getMaxSizeBytes() } -// Requests returns the number of requests served by c. -func (c *Cache) Requests() uint64 { +func (c *cache) Requests() uint64 { return atomic.LoadUint64(&c.requests) } -// Misses returns the number of cache misses for c. -func (c *Cache) Misses() uint64 { +func (c *cache) Misses() uint64 { return atomic.LoadUint64(&c.misses) }