From 5159a9451f963904b2e3d1891cfb0218ac052141 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Thu, 20 Jan 2022 18:49:58 +0200
Subject: [PATCH] lib/blockcache: add missing dependency after
 145337792d6c8131cb64b728ab62624e54208290

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007
---
 lib/blockcache/blockcache.go | 225 +++++++++++++++++++++++++++++++++++
 1 file changed, 225 insertions(+)
 create mode 100644 lib/blockcache/blockcache.go

diff --git a/lib/blockcache/blockcache.go b/lib/blockcache/blockcache.go
new file mode 100644
index 0000000000..92f03f348c
--- /dev/null
+++ b/lib/blockcache/blockcache.go
@@ -0,0 +1,225 @@
+package blockcache
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
+)
+
+// Cache caches Block entries.
+//
+// Call NewCache() for creating new Cache.
+type Cache struct {
+	// Atomically updated fields must go first in the struct, so they are properly
+	// aligned to 8 bytes on 32-bit architectures.
+	// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
+	requests uint64
+	misses   uint64
+
+	// sizeBytes contains an approximate size for all the blocks stored in the cache.
+	sizeBytes uint64
+
+	// getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes.
+	getMaxSizeBytes func() int
+
+	// mu protects all the fields below.
+	mu sync.RWMutex
+
+	// m contains cached blocks.
+	m map[Key]*cacheEntry
+
+	// perPartEntries contains all the blocks for the given part.
+	//
+	// It is needed for fast deletion of blocks belonging to the given part.
+	perPartEntries map[interface{}]map[uint64]*cacheEntry
+
+	// perKeyMisses contains per-block cache misses.
+	//
+	// Blocks with less than 2 cache misses aren't stored in the cache in order to prevent from eviction for frequently accessed items.
+	perKeyMisses map[Key]int
+}
+
+// Key represents a key, which uniquely identifies the Block.
+type Key struct {
+	// Part must contain a pointer to part structure where the block belongs to.
+	Part interface{}
+
+	// Offset is the offset of the block in the part.
+	Offset uint64
+}
+
+// Block is an item, which may be cached in the Cache.
+type Block interface {
+	// SizeBytes must return the approximate size of the given block in bytes
+	SizeBytes() int
+}
+
+type cacheEntry struct {
+	// Atomically updated fields must go first in the struct, so they are properly
+	// aligned to 8 bytes on 32-bit architectures.
+	// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
+	lastAccessTime uint64
+
+	// key contains the key for the block.
+	key Key
+
+	// block contains the cached block.
+	block Block
+}
+
+// NewCache creates new cache.
+//
+// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback.
+func NewCache(getMaxSizeBytes func() int) *Cache {
+	var c Cache
+	c.getMaxSizeBytes = getMaxSizeBytes
+	c.m = make(map[Key]*cacheEntry)
+	c.perPartEntries = make(map[interface{}]map[uint64]*cacheEntry)
+	c.perKeyMisses = make(map[Key]int)
+	go c.cleaner()
+	return &c
+}
+
+// RemoveBlocksForPart removes all the blocks for the given part from the cache.
+func (c *Cache) RemoveBlocksForPart(p interface{}) {
+	c.mu.Lock()
+	for _, e := range c.perPartEntries[p] {
+		c.deleteEntryLocked(e, false)
+	}
+	delete(c.perPartEntries, p)
+	c.mu.Unlock()
+}
+
+func (c *Cache) deleteEntryLocked(e *cacheEntry, removePartEntry bool) {
+	n := uint64(e.block.SizeBytes())
+	atomic.AddUint64(&c.sizeBytes, (^n)+1)
+	key := e.key
+	delete(c.m, key)
+	delete(c.perKeyMisses, key)
+	if removePartEntry {
+		delete(c.perPartEntries[key.Part], key.Offset)
+	}
+}
+
+// cleaner periodically cleans least recently used entries in c.
+func (c *Cache) cleaner() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer ticker.Stop()
+	perKeyMissesTicker := time.NewTicker(2 * time.Minute)
+	defer perKeyMissesTicker.Stop()
+	for {
+		select {
+		case <-ticker.C:
+			c.cleanByTimeout()
+		case <-perKeyMissesTicker.C:
+			c.mu.Lock()
+			c.perKeyMisses = make(map[Key]int, len(c.perKeyMisses))
+			c.mu.Unlock()
+		}
+	}
+}
+
+func (c *Cache) cleanByTimeout() {
+	currentTime := fasttime.UnixTimestamp()
+	c.mu.Lock()
+	for _, e := range c.m {
+		// Delete items accessed more than two minutes ago.
+		// This time should be enough for repeated queries.
+		if currentTime-atomic.LoadUint64(&e.lastAccessTime) > 2*60 {
+			c.deleteEntryLocked(e, true)
+		}
+	}
+	c.mu.Unlock()
+}
+
+// GetBlock returns a block for the given key k from c.
+func (c *Cache) GetBlock(k Key) Block {
+	atomic.AddUint64(&c.requests, 1)
+	c.mu.RLock()
+	e := c.m[k]
+	c.mu.RUnlock()
+	if e != nil {
+		// Fast path - the block already exists in the cache, so return it to the caller.
+		currentTime := fasttime.UnixTimestamp()
+		if atomic.LoadUint64(&e.lastAccessTime) != currentTime {
+			atomic.StoreUint64(&e.lastAccessTime, currentTime)
+		}
+		return e.block
+	}
+	// Slow path - the entry is missing in the cache.
+	c.mu.Lock()
+	c.perKeyMisses[k]++
+	c.mu.Unlock()
+	atomic.AddUint64(&c.misses, 1)
+	return nil
+}
+
+// PutBlock puts the given block under the given key into c.
+func (c *Cache) PutBlock(key Key, block Block) {
+	c.mu.RLock()
+	doNotCache := c.perKeyMisses[key] < 2
+	c.mu.RUnlock()
+	if doNotCache {
+		// Do not cache b if it has been requested only once (aka one-time-wonders items).
+		// This should reduce memory usage for the cache.
+		return
+	}
+
+	// Store b in the cache.
+	c.mu.Lock()
+	e := &cacheEntry{
+		lastAccessTime: fasttime.UnixTimestamp(),
+		key:            key,
+		block:          block,
+	}
+	c.m[key] = e
+	pes := c.perPartEntries[key.Part]
+	if pes == nil {
+		pes = make(map[uint64]*cacheEntry)
+		c.perPartEntries[key.Part] = pes
+	}
+	pes[key.Offset] = e
+	n := uint64(e.block.SizeBytes())
+	atomic.AddUint64(&c.sizeBytes, n)
+	maxSizeBytes := c.getMaxSizeBytes()
+	if c.SizeBytes() > maxSizeBytes {
+		// Entries in the cache occupy too much space. Free up space by deleting some entries.
+		for _, e := range c.m {
+			c.deleteEntryLocked(e, true)
+			if c.SizeBytes() < maxSizeBytes {
+				break
+			}
+		}
+	}
+	c.mu.Unlock()
+}
+
+// Len returns the number of blocks in the cache c.
+func (c *Cache) Len() int {
+	c.mu.RLock()
+	n := len(c.m)
+	c.mu.RUnlock()
+	return n
+}
+
+// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c.
+func (c *Cache) SizeBytes() int {
+	return int(atomic.LoadUint64(&c.sizeBytes))
+}
+
+// SizeMaxBytes returns the max allowed size in bytes for c.
+func (c *Cache) SizeMaxBytes() int {
+	return c.getMaxSizeBytes()
+}
+
+// Requests returns the number of requests served by c.
+func (c *Cache) Requests() uint64 {
+	return atomic.LoadUint64(&c.requests)
+}
+
+// Misses returns the number of cache misses for c.
+func (c *Cache) Misses() uint64 {
+	return atomic.LoadUint64(&c.misses)
+}