mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{storage,regexpcache}: replaces regexpCacheMap with LRU cache (#2293)
* lib/{storage,regexpcache}: replaces regexpCacheMap with LRU cache It should decrease memory usage for regexp caching with storing cacheEntry by pointer - golang map should be able to effectivly shrink it's size original issue with this case - unexpected map grows and storage OOM Apply suggestions from code review Co-authored-by: Roman Khavronenko <roman@victoriametrics.com> Adds missing metrics for regexp cache and regexpPrefixes cache * wip * wip Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
6e364e19ef
commit
9a88c1a91e
5 changed files with 551 additions and 74 deletions
|
@ -680,6 +680,10 @@ func registerStorageMetrics() {
|
|||
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
|
||||
return float64(storage.RegexpCacheSize())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_entries{type="storage/regexpPrefixes"}`, func() float64 {
|
||||
return float64(storage.RegexpPrefixesCacheSize())
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_cache_entries{type="storage/prefetchedMetricIDs"}`, func() float64 {
|
||||
return float64(m().PrefetchedMetricIDsSize)
|
||||
})
|
||||
|
@ -714,6 +718,12 @@ func registerStorageMetrics() {
|
|||
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
|
||||
return float64(idbm().TagFiltersCacheSizeBytes)
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/regexps"}`, func() float64 {
|
||||
return float64(storage.RegexpCacheSizeBytes())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/regexpPrefixes"}`, func() float64 {
|
||||
return float64(storage.RegexpPrefixesCacheSizeBytes())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/prefetchedMetricIDs"}`, func() float64 {
|
||||
return float64(m().PrefetchedMetricIDsSizeBytes)
|
||||
})
|
||||
|
@ -739,6 +749,12 @@ func registerStorageMetrics() {
|
|||
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFilters"}`, func() float64 {
|
||||
return float64(idbm().TagFiltersCacheSizeMaxBytes)
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/regexps"}`, func() float64 {
|
||||
return float64(storage.RegexpCacheMaxSizeBytes())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/regexpPrefixes"}`, func() float64 {
|
||||
return float64(storage.RegexpPrefixesCacheMaxSizeBytes())
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 {
|
||||
return float64(m().TSIDCacheRequests)
|
||||
|
@ -764,6 +780,9 @@ func registerStorageMetrics() {
|
|||
metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 {
|
||||
return float64(storage.RegexpCacheRequests())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_requests_total{type="storage/regexpPrefixes"}`, func() float64 {
|
||||
return float64(storage.RegexpPrefixesCacheRequests())
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_cache_misses_total{type="storage/tsid"}`, func() float64 {
|
||||
return float64(m().TSIDCacheMisses)
|
||||
|
@ -789,6 +808,9 @@ func registerStorageMetrics() {
|
|||
metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 {
|
||||
return float64(storage.RegexpCacheMisses())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_misses_total{type="storage/regexpPrefixes"}`, func() float64 {
|
||||
return float64(storage.RegexpPrefixesCacheMisses())
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_deleted_metrics_total{type="indexdb"}`, func() float64 {
|
||||
return float64(idbm().DeletedMetricsCount)
|
||||
|
|
|
@ -133,20 +133,20 @@ func TestCacheConcurrentAccess(t *testing.T) {
|
|||
var wg sync.WaitGroup
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
go func(worker int) {
|
||||
defer wg.Done()
|
||||
testCacheSetGet(c)
|
||||
}()
|
||||
testCacheSetGet(c, worker)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func testCacheSetGet(c *Cache) {
|
||||
func testCacheSetGet(c *Cache, worker int) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
part := (interface{})(i)
|
||||
b := testBlock{}
|
||||
k := Key{
|
||||
Offset: uint64(i),
|
||||
Offset: uint64(worker*1000 + i),
|
||||
Part: part,
|
||||
}
|
||||
c.PutBlock(k, &b)
|
||||
|
|
327
lib/lrucache/lrucache.go
Normal file
327
lib/lrucache/lrucache.go
Normal file
|
@ -0,0 +1,327 @@
|
|||
package lrucache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// Cache caches Entry entries.
|
||||
//
|
||||
// Call NewCache() for creating new Cache.
|
||||
type Cache struct {
|
||||
shards []*cache
|
||||
|
||||
cleanerMustStopCh chan struct{}
|
||||
cleanerStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
// NewCache creates new cache.
|
||||
//
|
||||
// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback.
|
||||
// Call MustStop() in order to free up resources occupied by Cache.
|
||||
func NewCache(getMaxSizeBytes func() int) *Cache {
|
||||
cpusCount := cgroup.AvailableCPUs()
|
||||
shardsCount := cgroup.AvailableCPUs()
|
||||
// Increase the number of shards with the increased number of available CPU cores.
|
||||
// This should reduce contention on per-shard mutexes.
|
||||
multiplier := cpusCount
|
||||
if multiplier > 16 {
|
||||
multiplier = 16
|
||||
}
|
||||
shardsCount *= multiplier
|
||||
shards := make([]*cache, shardsCount)
|
||||
getMaxShardBytes := func() int {
|
||||
n := getMaxSizeBytes()
|
||||
return n / shardsCount
|
||||
}
|
||||
for i := range shards {
|
||||
shards[i] = newCache(getMaxShardBytes)
|
||||
}
|
||||
c := &Cache{
|
||||
shards: shards,
|
||||
cleanerMustStopCh: make(chan struct{}),
|
||||
cleanerStoppedCh: make(chan struct{}),
|
||||
}
|
||||
go c.cleaner()
|
||||
return c
|
||||
}
|
||||
|
||||
// MustStop frees up resources occupied by c.
|
||||
func (c *Cache) MustStop() {
|
||||
close(c.cleanerMustStopCh)
|
||||
<-c.cleanerStoppedCh
|
||||
}
|
||||
|
||||
// GetEntry returns an Entry for the given key k from c.
|
||||
func (c *Cache) GetEntry(k string) Entry {
|
||||
idx := uint64(0)
|
||||
if len(c.shards) > 1 {
|
||||
h := hashUint64(k)
|
||||
idx = h % uint64(len(c.shards))
|
||||
}
|
||||
shard := c.shards[idx]
|
||||
return shard.GetEntry(k)
|
||||
}
|
||||
|
||||
// PutEntry puts the given Entry e under the given key k into c.
|
||||
func (c *Cache) PutEntry(k string, e Entry) {
|
||||
idx := uint64(0)
|
||||
if len(c.shards) > 1 {
|
||||
h := hashUint64(k)
|
||||
idx = h % uint64(len(c.shards))
|
||||
}
|
||||
shard := c.shards[idx]
|
||||
shard.PutEntry(k, e)
|
||||
}
|
||||
|
||||
// Len returns the number of blocks in the cache c.
|
||||
func (c *Cache) Len() int {
|
||||
n := 0
|
||||
for _, shard := range c.shards {
|
||||
n += shard.Len()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// SizeBytes returns an approximate size in bytes of all the blocks stored in the cache c.
|
||||
func (c *Cache) SizeBytes() int {
|
||||
n := 0
|
||||
for _, shard := range c.shards {
|
||||
n += shard.SizeBytes()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// SizeMaxBytes returns the max allowed size in bytes for c.
|
||||
func (c *Cache) SizeMaxBytes() int {
|
||||
n := 0
|
||||
for _, shard := range c.shards {
|
||||
n += shard.SizeMaxBytes()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Requests returns the number of requests served by c.
|
||||
func (c *Cache) Requests() uint64 {
|
||||
n := uint64(0)
|
||||
for _, shard := range c.shards {
|
||||
n += shard.Requests()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Misses returns the number of cache misses for c.
|
||||
func (c *Cache) Misses() uint64 {
|
||||
n := uint64(0)
|
||||
for _, shard := range c.shards {
|
||||
n += shard.Misses()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (c *Cache) cleaner() {
|
||||
ticker := time.NewTicker(53 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.cleanerMustStopCh:
|
||||
close(c.cleanerStoppedCh)
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.cleanByTimeout()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) cleanByTimeout() {
|
||||
for _, shard := range c.shards {
|
||||
shard.cleanByTimeout()
|
||||
}
|
||||
}
|
||||
|
||||
type cache struct {
|
||||
// Atomically updated fields must go first in the struct, so they are properly
|
||||
// aligned to 8 bytes on 32-bit architectures.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
requests uint64
|
||||
misses uint64
|
||||
|
||||
// sizeBytes contains an approximate size for all the blocks stored in the cache.
|
||||
sizeBytes int64
|
||||
|
||||
// getMaxSizeBytes() is a callback, which returns the maximum allowed cache size in bytes.
|
||||
getMaxSizeBytes func() int
|
||||
|
||||
// mu protects all the fields below.
|
||||
mu sync.Mutex
|
||||
|
||||
// m contains cached entries
|
||||
m map[string]*cacheEntry
|
||||
|
||||
// The heap for removing the least recently used entries from m.
|
||||
lah lastAccessHeap
|
||||
}
|
||||
|
||||
func hashUint64(s string) uint64 {
|
||||
b := bytesutil.ToUnsafeBytes(s)
|
||||
return xxhash.Sum64(b)
|
||||
}
|
||||
|
||||
// Entry is an item, which may be cached in the Cache.
|
||||
type Entry interface {
|
||||
// SizeBytes must return the approximate size of the given entry in bytes
|
||||
SizeBytes() int
|
||||
}
|
||||
|
||||
type cacheEntry struct {
|
||||
// The timestamp in seconds for the last access to the given entry.
|
||||
lastAccessTime uint64
|
||||
|
||||
// heapIdx is the index for the entry in lastAccessHeap.
|
||||
heapIdx int
|
||||
|
||||
// k contains the associated key for the given entry.
|
||||
k string
|
||||
|
||||
// e contains the cached entry.
|
||||
e Entry
|
||||
}
|
||||
|
||||
func newCache(getMaxSizeBytes func() int) *cache {
|
||||
var c cache
|
||||
c.getMaxSizeBytes = getMaxSizeBytes
|
||||
c.m = make(map[string]*cacheEntry)
|
||||
return &c
|
||||
}
|
||||
|
||||
func (c *cache) updateSizeBytes(n int) {
|
||||
atomic.AddInt64(&c.sizeBytes, int64(n))
|
||||
}
|
||||
|
||||
func (c *cache) cleanByTimeout() {
|
||||
// Delete items accessed more than three minutes ago.
|
||||
// This time should be enough for repeated queries.
|
||||
lastAccessTime := fasttime.UnixTimestamp() - 3*60
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for len(c.lah) > 0 {
|
||||
if lastAccessTime < c.lah[0].lastAccessTime {
|
||||
break
|
||||
}
|
||||
c.removeLeastRecentlyAccessedItem()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) GetEntry(k string) Entry {
|
||||
atomic.AddUint64(&c.requests, 1)
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
ce := c.m[k]
|
||||
if ce == nil {
|
||||
atomic.AddUint64(&c.misses, 1)
|
||||
return nil
|
||||
}
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
if ce.lastAccessTime != currentTime {
|
||||
ce.lastAccessTime = currentTime
|
||||
heap.Fix(&c.lah, ce.heapIdx)
|
||||
}
|
||||
return ce.e
|
||||
}
|
||||
|
||||
func (c *cache) PutEntry(k string, e Entry) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
ce := c.m[k]
|
||||
if ce != nil {
|
||||
// The entry has been already registered by concurrent goroutine.
|
||||
return
|
||||
}
|
||||
ce = &cacheEntry{
|
||||
lastAccessTime: fasttime.UnixTimestamp(),
|
||||
k: k,
|
||||
e: e,
|
||||
}
|
||||
heap.Push(&c.lah, ce)
|
||||
c.m[k] = ce
|
||||
c.updateSizeBytes(e.SizeBytes())
|
||||
maxSizeBytes := c.getMaxSizeBytes()
|
||||
for c.SizeBytes() > maxSizeBytes && len(c.lah) > 0 {
|
||||
c.removeLeastRecentlyAccessedItem()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) removeLeastRecentlyAccessedItem() {
|
||||
ce := c.lah[0]
|
||||
c.updateSizeBytes(-ce.e.SizeBytes())
|
||||
delete(c.m, ce.k)
|
||||
heap.Pop(&c.lah)
|
||||
}
|
||||
|
||||
func (c *cache) Len() int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return len(c.m)
|
||||
}
|
||||
|
||||
func (c *cache) SizeBytes() int {
|
||||
return int(atomic.LoadInt64(&c.sizeBytes))
|
||||
}
|
||||
|
||||
func (c *cache) SizeMaxBytes() int {
|
||||
return c.getMaxSizeBytes()
|
||||
}
|
||||
|
||||
func (c *cache) Requests() uint64 {
|
||||
return atomic.LoadUint64(&c.requests)
|
||||
}
|
||||
|
||||
func (c *cache) Misses() uint64 {
|
||||
return atomic.LoadUint64(&c.misses)
|
||||
}
|
||||
|
||||
// lastAccessHeap implements heap.Interface
|
||||
type lastAccessHeap []*cacheEntry
|
||||
|
||||
func (lah *lastAccessHeap) Len() int {
|
||||
return len(*lah)
|
||||
}
|
||||
func (lah *lastAccessHeap) Swap(i, j int) {
|
||||
h := *lah
|
||||
a := h[i]
|
||||
b := h[j]
|
||||
a.heapIdx = j
|
||||
b.heapIdx = i
|
||||
h[i] = b
|
||||
h[j] = a
|
||||
}
|
||||
func (lah *lastAccessHeap) Less(i, j int) bool {
|
||||
h := *lah
|
||||
return h[i].lastAccessTime < h[j].lastAccessTime
|
||||
}
|
||||
func (lah *lastAccessHeap) Push(x interface{}) {
|
||||
e := x.(*cacheEntry)
|
||||
h := *lah
|
||||
e.heapIdx = len(h)
|
||||
*lah = append(h, e)
|
||||
}
|
||||
func (lah *lastAccessHeap) Pop() interface{} {
|
||||
h := *lah
|
||||
e := h[len(h)-1]
|
||||
|
||||
// Remove the reference to deleted entry, so Go GC could free up memory occupied by the deleted entry.
|
||||
h[len(h)-1] = nil
|
||||
|
||||
*lah = h[:len(h)-1]
|
||||
return e
|
||||
}
|
126
lib/lrucache/lrucache_test.go
Normal file
126
lib/lrucache/lrucache_test.go
Normal file
|
@ -0,0 +1,126 @@
|
|||
package lrucache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
sizeMaxBytes := 64 * 1024
|
||||
// Multiply sizeMaxBytes by the square of available CPU cores
|
||||
// in order to get proper distribution of sizes between cache shards.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2204
|
||||
cpus := cgroup.AvailableCPUs()
|
||||
sizeMaxBytes *= cpus * cpus
|
||||
getMaxSize := func() int {
|
||||
return sizeMaxBytes
|
||||
}
|
||||
c := NewCache(getMaxSize)
|
||||
defer c.MustStop()
|
||||
if n := c.SizeBytes(); n != 0 {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.SizeMaxBytes(); n != sizeMaxBytes {
|
||||
t.Fatalf("unexpected SizeMaxBytes(); got %d; want %d", n, sizeMaxBytes)
|
||||
}
|
||||
k := "foobar"
|
||||
var e testEntry
|
||||
entrySize := e.SizeBytes()
|
||||
// Put a single entry into cache
|
||||
c.PutEntry(k, &e)
|
||||
if n := c.Len(); n != 1 {
|
||||
t.Fatalf("unexpected number of items in the cache; got %d; want %d", n, 1)
|
||||
}
|
||||
if n := c.SizeBytes(); n != entrySize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, entrySize)
|
||||
}
|
||||
if n := c.Requests(); n != 0 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.Misses(); n != 0 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
|
||||
}
|
||||
// Obtain this entry from the cache
|
||||
if e1 := c.GetEntry(k); e1 != &e {
|
||||
t.Fatalf("unexpected entry obtained; got %v; want %v", e1, &e)
|
||||
}
|
||||
if n := c.Requests(); n != 1 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 1)
|
||||
}
|
||||
if n := c.Misses(); n != 0 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
|
||||
}
|
||||
// Obtain non-existing entry from the cache
|
||||
if e1 := c.GetEntry("non-existing-key"); e1 != nil {
|
||||
t.Fatalf("unexpected non-nil block obtained for non-existing key: %v", e1)
|
||||
}
|
||||
if n := c.Requests(); n != 2 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 2)
|
||||
}
|
||||
if n := c.Misses(); n != 1 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 1)
|
||||
}
|
||||
// Store the entry again.
|
||||
c.PutEntry(k, &e)
|
||||
if n := c.SizeBytes(); n != entrySize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, entrySize)
|
||||
}
|
||||
if e1 := c.GetEntry(k); e1 != &e {
|
||||
t.Fatalf("unexpected entry obtained; got %v; want %v", e1, &e)
|
||||
}
|
||||
if n := c.Requests(); n != 3 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 3)
|
||||
}
|
||||
if n := c.Misses(); n != 1 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 1)
|
||||
}
|
||||
|
||||
// Manually clean the cache. The entry shouldn't be deleted because it was recently accessed.
|
||||
c.cleanByTimeout()
|
||||
if n := c.SizeBytes(); n != entrySize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, entrySize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheConcurrentAccess(t *testing.T) {
|
||||
const sizeMaxBytes = 16 * 1024 * 1024
|
||||
getMaxSize := func() int {
|
||||
return sizeMaxBytes
|
||||
}
|
||||
c := NewCache(getMaxSize)
|
||||
defer c.MustStop()
|
||||
|
||||
workers := 5
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
go func(worker int) {
|
||||
defer wg.Done()
|
||||
testCacheSetGet(c, worker)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func testCacheSetGet(c *Cache, worker int) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
e := testEntry{}
|
||||
k := fmt.Sprintf("key_%d_%d", worker, i)
|
||||
c.PutEntry(k, &e)
|
||||
if e1 := c.GetEntry(k); e1 != &e {
|
||||
panic(fmt.Errorf("unexpected entry obtained; got %v; want %v", e1, &e))
|
||||
}
|
||||
if e1 := c.GetEntry("non-existing-key"); e1 != nil {
|
||||
panic(fmt.Errorf("unexpected non-nil entry obtained: %v", e1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type testEntry struct{}
|
||||
|
||||
func (tb *testEntry) SizeBytes() int {
|
||||
return 42
|
||||
}
|
|
@ -9,8 +9,11 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/lrucache"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
)
|
||||
|
||||
|
@ -471,42 +474,42 @@ func (tf *tagFilter) matchSuffix(b []byte) (bool, error) {
|
|||
|
||||
// RegexpCacheSize returns the number of cached regexps for tag filters.
|
||||
func RegexpCacheSize() int {
|
||||
regexpCacheLock.RLock()
|
||||
n := len(regexpCacheMap)
|
||||
regexpCacheLock.RUnlock()
|
||||
return n
|
||||
return regexpCache.Len()
|
||||
}
|
||||
|
||||
// RegexpCacheRequests returns the number of requests to regexp cache.
|
||||
// RegexpCacheSizeBytes returns an approximate size in bytes for the cached regexps for tag filters.
|
||||
func RegexpCacheSizeBytes() int {
|
||||
return regexpCache.SizeBytes()
|
||||
}
|
||||
|
||||
// RegexpCacheMaxSizeBytes returns the maximum size in bytes for the cached regexps for tag filters.
|
||||
func RegexpCacheMaxSizeBytes() int {
|
||||
return regexpCache.SizeMaxBytes()
|
||||
}
|
||||
|
||||
// RegexpCacheRequests returns the number of requests to regexp cache for tag filters.
|
||||
func RegexpCacheRequests() uint64 {
|
||||
return atomic.LoadUint64(®expCacheRequests)
|
||||
return regexpCache.Requests()
|
||||
}
|
||||
|
||||
// RegexpCacheMisses returns the number of cache misses for regexp cache.
|
||||
// RegexpCacheMisses returns the number of cache misses for regexp cache for tag filters.
|
||||
func RegexpCacheMisses() uint64 {
|
||||
return atomic.LoadUint64(®expCacheMisses)
|
||||
return regexpCache.Misses()
|
||||
}
|
||||
|
||||
func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
|
||||
atomic.AddUint64(®expCacheRequests, 1)
|
||||
|
||||
regexpCacheLock.RLock()
|
||||
rcv, ok := regexpCacheMap[string(expr)]
|
||||
regexpCacheLock.RUnlock()
|
||||
if ok {
|
||||
func getRegexpFromCache(expr []byte) (*regexpCacheValue, error) {
|
||||
if rcv := regexpCache.GetEntry(bytesutil.ToUnsafeString(expr)); rcv != nil {
|
||||
// Fast path - the regexp found in the cache.
|
||||
return rcv, nil
|
||||
return rcv.(*regexpCacheValue), nil
|
||||
}
|
||||
|
||||
// Slow path - build the regexp.
|
||||
atomic.AddUint64(®expCacheMisses, 1)
|
||||
exprOrig := string(expr)
|
||||
|
||||
expr = []byte(tagCharsRegexpEscaper.Replace(exprOrig))
|
||||
exprStr := fmt.Sprintf("^(%s)$", expr)
|
||||
re, err := regexp.Compile(exprStr)
|
||||
if err != nil {
|
||||
return rcv, fmt.Errorf("invalid regexp %q: %w", exprStr, err)
|
||||
return nil, fmt.Errorf("invalid regexp %q: %w", exprStr, err)
|
||||
}
|
||||
|
||||
sExpr := string(expr)
|
||||
|
@ -521,26 +524,16 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
|
|||
}
|
||||
|
||||
// Put the reMatch in the cache.
|
||||
var rcv regexpCacheValue
|
||||
rcv.orValues = orValues
|
||||
rcv.reMatch = reMatch
|
||||
rcv.reCost = reCost
|
||||
rcv.literalSuffix = literalSuffix
|
||||
// heuristic for rcv in-memory size
|
||||
rcv.sizeBytes = 8*len(exprOrig) + len(literalSuffix)
|
||||
regexpCache.PutEntry(exprOrig, &rcv)
|
||||
|
||||
regexpCacheLock.Lock()
|
||||
if overflow := len(regexpCacheMap) - getMaxRegexpCacheSize(); overflow > 0 {
|
||||
overflow = int(float64(len(regexpCacheMap)) * 0.1)
|
||||
for k := range regexpCacheMap {
|
||||
delete(regexpCacheMap, k)
|
||||
overflow--
|
||||
if overflow <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
regexpCacheMap[exprOrig] = rcv
|
||||
regexpCacheLock.Unlock()
|
||||
|
||||
return rcv, nil
|
||||
return &rcv, nil
|
||||
}
|
||||
|
||||
func newMatchFuncForOrSuffixes(orValues []string) (reMatch func(b []byte) bool, reCost uint64) {
|
||||
|
@ -888,11 +881,7 @@ var tagCharsReverseRegexpEscaper = strings.NewReplacer(
|
|||
|
||||
func getMaxRegexpCacheSize() int {
|
||||
maxRegexpCacheSizeOnce.Do(func() {
|
||||
n := memory.Allowed() / 1024 / 1024
|
||||
if n < 100 {
|
||||
n = 100
|
||||
}
|
||||
maxRegexpCacheSize = n
|
||||
maxRegexpCacheSize = int(0.05 * float64(memory.Allowed()))
|
||||
})
|
||||
return maxRegexpCacheSize
|
||||
}
|
||||
|
@ -903,11 +892,7 @@ var (
|
|||
)
|
||||
|
||||
var (
|
||||
regexpCacheMap = make(map[string]regexpCacheValue)
|
||||
regexpCacheLock sync.RWMutex
|
||||
|
||||
regexpCacheRequests uint64
|
||||
regexpCacheMisses uint64
|
||||
regexpCache = lrucache.NewCache(getMaxRegexpCacheSize)
|
||||
)
|
||||
|
||||
type regexpCacheValue struct {
|
||||
|
@ -915,15 +900,18 @@ type regexpCacheValue struct {
|
|||
reMatch func(b []byte) bool
|
||||
reCost uint64
|
||||
literalSuffix string
|
||||
sizeBytes int
|
||||
}
|
||||
|
||||
// SizeBytes implements lrucache.Entry interface
|
||||
func (rcv *regexpCacheValue) SizeBytes() int {
|
||||
return rcv.sizeBytes
|
||||
}
|
||||
|
||||
func getRegexpPrefix(b []byte) ([]byte, []byte) {
|
||||
// Fast path - search the prefix in the cache.
|
||||
prefixesCacheLock.RLock()
|
||||
ps, ok := prefixesCacheMap[string(b)]
|
||||
prefixesCacheLock.RUnlock()
|
||||
|
||||
if ok {
|
||||
if ps := prefixesCache.GetEntry(bytesutil.ToUnsafeString(b)); ps != nil {
|
||||
ps := ps.(*prefixSuffix)
|
||||
return ps.prefix, ps.suffix
|
||||
}
|
||||
|
||||
|
@ -931,33 +919,18 @@ func getRegexpPrefix(b []byte) ([]byte, []byte) {
|
|||
prefix, suffix := extractRegexpPrefix(b)
|
||||
|
||||
// Put the prefix and the suffix to the cache.
|
||||
prefixesCacheLock.Lock()
|
||||
if overflow := len(prefixesCacheMap) - getMaxPrefixesCacheSize(); overflow > 0 {
|
||||
overflow = int(float64(len(prefixesCacheMap)) * 0.1)
|
||||
for k := range prefixesCacheMap {
|
||||
delete(prefixesCacheMap, k)
|
||||
overflow--
|
||||
if overflow <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
prefixesCacheMap[string(b)] = prefixSuffix{
|
||||
ps := &prefixSuffix{
|
||||
prefix: prefix,
|
||||
suffix: suffix,
|
||||
}
|
||||
prefixesCacheLock.Unlock()
|
||||
prefixesCache.PutEntry(string(b), ps)
|
||||
|
||||
return prefix, suffix
|
||||
}
|
||||
|
||||
func getMaxPrefixesCacheSize() int {
|
||||
maxPrefixesCacheSizeOnce.Do(func() {
|
||||
n := memory.Allowed() / 1024 / 1024
|
||||
if n < 100 {
|
||||
n = 100
|
||||
}
|
||||
maxPrefixesCacheSize = n
|
||||
maxPrefixesCacheSize = int(0.05 * float64(memory.Allowed()))
|
||||
})
|
||||
return maxPrefixesCacheSize
|
||||
}
|
||||
|
@ -968,15 +941,44 @@ var (
|
|||
)
|
||||
|
||||
var (
|
||||
prefixesCacheMap = make(map[string]prefixSuffix)
|
||||
prefixesCacheLock sync.RWMutex
|
||||
prefixesCache = lrucache.NewCache(getMaxPrefixesCacheSize)
|
||||
)
|
||||
|
||||
// RegexpPrefixesCacheSize returns the number of cached regexp prefixes for tag filters.
|
||||
func RegexpPrefixesCacheSize() int {
|
||||
return prefixesCache.Len()
|
||||
}
|
||||
|
||||
// RegexpPrefixesCacheSizeBytes returns an approximate size in bytes for cached regexp prefixes for tag filters.
|
||||
func RegexpPrefixesCacheSizeBytes() int {
|
||||
return prefixesCache.SizeBytes()
|
||||
}
|
||||
|
||||
// RegexpPrefixesCacheMaxSizeBytes returns the maximum size in bytes for cached regexp prefixes for tag filters in bytes.
|
||||
func RegexpPrefixesCacheMaxSizeBytes() int {
|
||||
return prefixesCache.SizeMaxBytes()
|
||||
}
|
||||
|
||||
// RegexpPrefixesCacheRequests returns the number of requests to regexp prefixes cache.
|
||||
func RegexpPrefixesCacheRequests() uint64 {
|
||||
return prefixesCache.Requests()
|
||||
}
|
||||
|
||||
// RegexpPrefixesCacheMisses returns the number of cache misses for regexp prefixes cache.
|
||||
func RegexpPrefixesCacheMisses() uint64 {
|
||||
return prefixesCache.Misses()
|
||||
}
|
||||
|
||||
type prefixSuffix struct {
|
||||
prefix []byte
|
||||
suffix []byte
|
||||
}
|
||||
|
||||
// SizeBytes implements lrucache.Entry interface
|
||||
func (ps *prefixSuffix) SizeBytes() int {
|
||||
return cap(ps.prefix) + cap(ps.suffix) + int(unsafe.Sizeof(*ps))
|
||||
}
|
||||
|
||||
func extractRegexpPrefix(b []byte) ([]byte, []byte) {
|
||||
sre, err := syntax.Parse(string(b), syntax.Perl)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue