diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go index 4e829cdec..8c8cafc56 100644 --- a/lib/blockcache/blockcache.go +++ b/lib/blockcache/blockcache.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) @@ -12,6 +13,103 @@ import ( // // Call NewCache() for creating new Cache. type Cache struct { + shards []*cache +} + +// NewCache creates new cache. +// +// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback. +func NewCache(getMaxSizeBytes func() int) *Cache { + shardsCount := cgroup.AvailableCPUs() + shards := make([]*cache, shardsCount) + getMaxShardBytes := func() int { + n := getMaxSizeBytes() + return n / shardsCount + } + for i := range shards { + shards[i] = newCache(getMaxShardBytes) + } + return &Cache{ + shards: shards, + } +} + +// RemoveBlocksForPart removes all the blocks for the given part from the cache. +func (c *Cache) RemoveBlocksForPart(p interface{}) { + for _, shard := range c.shards { + shard.RemoveBlocksForPart(p) + } +} + +// GetBlock returns a block for the given key k from c. +func (c *Cache) GetBlock(k Key) Block { + h := fastHashUint64(k.Offset) + idx := h % uint64(len(c.shards)) + shard := c.shards[idx] + return shard.GetBlock(k) +} + +// PutBlock puts the given block b under the given key k into c. +func (c *Cache) PutBlock(k Key, b Block) { + h := fastHashUint64(k.Offset) + idx := h % uint64(len(c.shards)) + shard := c.shards[idx] + shard.PutBlock(k, b) +} + +// Len returns the number of blocks in the cache c. +func (c *Cache) Len() int { + n := 0 + for _, shard := range c.shards { + n += shard.Len() + } + return n +} + +// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c. +func (c *Cache) SizeBytes() int { + n := 0 + for _, shard := range c.shards { + n += shard.SizeBytes() + } + return n +} + +// SizeMaxBytes returns the max allowed size in bytes for c. +func (c *Cache) SizeMaxBytes() int { + n := 0 + for _, shard := range c.shards { + n += shard.SizeMaxBytes() + } + return n +} + +// Requests returns the number of requests served by c. +func (c *Cache) Requests() uint64 { + n := uint64(0) + for _, shard := range c.shards { + n += shard.Requests() + } + return n +} + +// Misses returns the number of cache misses for c. +func (c *Cache) Misses() uint64 { + n := uint64(0) + for _, shard := range c.shards { + n += shard.Misses() + } + return n +} + +func fastHashUint64(x uint64) uint64 { + x ^= x >> 12 // a + x ^= x << 25 // b + x ^= x >> 27 // c + return x * 2685821657736338717 +} + +type cache struct { // Atomically updated fields must go first in the struct, so they are properly // aligned to 8 bytes on 32-bit architectures. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 @@ -61,11 +159,8 @@ type cacheEntry struct { block Block } -// NewCache creates new cache. -// -// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback. -func NewCache(getMaxSizeBytes func() int) *Cache { - var c Cache +func newCache(getMaxSizeBytes func() int) *cache { + var c cache c.getMaxSizeBytes = getMaxSizeBytes c.m = make(map[interface{}]map[uint64]*cacheEntry) c.perKeyMisses = make(map[Key]int) @@ -73,25 +168,23 @@ func NewCache(getMaxSizeBytes func() int) *Cache { return &c } -// RemoveBlocksForPart removes all the blocks for the given part from the cache. -func (c *Cache) RemoveBlocksForPart(p interface{}) { +func (c *cache) RemoveBlocksForPart(p interface{}) { c.mu.Lock() sizeBytes := 0 for _, e := range c.m[p] { sizeBytes += e.block.SizeBytes() - // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. } c.updateSizeBytes(-sizeBytes) delete(c.m, p) c.mu.Unlock() } -func (c *Cache) updateSizeBytes(n int) { +func (c *cache) updateSizeBytes(n int) { atomic.AddInt64(&c.sizeBytes, int64(n)) } -// cleaner periodically cleans least recently used entries in c. -func (c *Cache) cleaner() { +func (c *cache) cleaner() { ticker := time.NewTicker(57 * time.Second) defer ticker.Stop() perKeyMissesTicker := time.NewTicker(2 * time.Minute) @@ -108,7 +201,7 @@ func (c *Cache) cleaner() { } } -func (c *Cache) cleanByTimeout() { +func (c *cache) cleanByTimeout() { // Delete items accessed more than five minutes ago. // This time should be enough for repeated queries. lastAccessTime := fasttime.UnixTimestamp() - 5*60 @@ -120,14 +213,13 @@ func (c *Cache) cleanByTimeout() { if lastAccessTime > atomic.LoadUint64(&e.lastAccessTime) { c.updateSizeBytes(-e.block.SizeBytes()) delete(pes, offset) - // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. } } } } -// GetBlock returns a block for the given key k from c. -func (c *Cache) GetBlock(k Key) Block { +func (c *cache) GetBlock(k Key) Block { atomic.AddUint64(&c.requests, 1) var e *cacheEntry c.mu.RLock() @@ -152,8 +244,7 @@ func (c *Cache) GetBlock(k Key) Block { return nil } -// PutBlock puts the given block b under the given key k into c. -func (c *Cache) PutBlock(k Key, b Block) { +func (c *cache) PutBlock(k Key, b Block) { c.mu.RLock() // If the entry wasn't accessed yet (e.g. c.perKeyMisses[k] == 0), then cache it, since it is likely it will be accessed soon. // Do not cache the entry only if there was only a single unsuccessful attempt to access it. @@ -191,7 +282,7 @@ func (c *Cache) PutBlock(k Key, b Block) { for offset, e := range pes { c.updateSizeBytes(-e.block.SizeBytes()) delete(pes, offset) - // do not delete the entry from c.perKeyMisses, since it is removed by Cache.cleaner later. + // do not delete the entry from c.perKeyMisses, since it is removed by cache.cleaner later. if c.SizeBytes() < maxSizeBytes { return } @@ -200,8 +291,7 @@ func (c *Cache) PutBlock(k Key, b Block) { } } -// Len returns the number of blocks in the cache c. -func (c *Cache) Len() int { +func (c *cache) Len() int { c.mu.RLock() n := 0 for _, m := range c.m { @@ -211,22 +301,18 @@ func (c *Cache) Len() int { return n } -// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c. -func (c *Cache) SizeBytes() int { +func (c *cache) SizeBytes() int { return int(atomic.LoadInt64(&c.sizeBytes)) } -// SizeMaxBytes returns the max allowed size in bytes for c. -func (c *Cache) SizeMaxBytes() int { +func (c *cache) SizeMaxBytes() int { return c.getMaxSizeBytes() } -// Requests returns the number of requests served by c. -func (c *Cache) Requests() uint64 { +func (c *cache) Requests() uint64 { return atomic.LoadUint64(&c.requests) } -// Misses returns the number of cache misses for c. -func (c *Cache) Misses() uint64 { +func (c *cache) Misses() uint64 { return atomic.LoadUint64(&c.misses) }