mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/blockcache: use higher number of shards for higher number of CPU cores
This should reduce mutex contention and increase performance Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007
This commit is contained in:
parent
6cb2954612
commit
102c9a4bf9
3 changed files with 282 additions and 27 deletions
|
@ -4,9 +4,11 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// Cache caches Block entries.
|
||||
|
@ -14,13 +16,25 @@ import (
|
|||
// Call NewCache() for creating new Cache.
|
||||
type Cache struct {
|
||||
shards []*cache
|
||||
|
||||
cleanerMustStopCh chan struct{}
|
||||
cleanerStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
// NewCache creates new cache.
|
||||
//
|
||||
// Cache size in bytes is limited by the value returned by getMaxSizeBytes() callback.
|
||||
// Call MustStop() in order to free up resources occupied by Cache.
|
||||
func NewCache(getMaxSizeBytes func() int) *Cache {
|
||||
cpusCount := cgroup.AvailableCPUs()
|
||||
shardsCount := cgroup.AvailableCPUs()
|
||||
// Increase the number of shards with the increased number of available CPU cores.
|
||||
// This should reduce contention on per-shard mutexes.
|
||||
multiplier := cpusCount
|
||||
if multiplier > 16 {
|
||||
multiplier = 16
|
||||
}
|
||||
shardsCount *= multiplier
|
||||
shards := make([]*cache, shardsCount)
|
||||
getMaxShardBytes := func() int {
|
||||
n := getMaxSizeBytes()
|
||||
|
@ -29,9 +43,19 @@ func NewCache(getMaxSizeBytes func() int) *Cache {
|
|||
for i := range shards {
|
||||
shards[i] = newCache(getMaxShardBytes)
|
||||
}
|
||||
return &Cache{
|
||||
shards: shards,
|
||||
c := &Cache{
|
||||
shards: shards,
|
||||
cleanerMustStopCh: make(chan struct{}),
|
||||
cleanerStoppedCh: make(chan struct{}),
|
||||
}
|
||||
go c.cleaner()
|
||||
return c
|
||||
}
|
||||
|
||||
// MustStop frees up resources occupied by c.
|
||||
func (c *Cache) MustStop() {
|
||||
close(c.cleanerMustStopCh)
|
||||
<-c.cleanerStoppedCh
|
||||
}
|
||||
|
||||
// RemoveBlocksForPart removes all the blocks for the given part from the cache.
|
||||
|
@ -43,16 +67,22 @@ func (c *Cache) RemoveBlocksForPart(p interface{}) {
|
|||
|
||||
// GetBlock returns a block for the given key k from c.
|
||||
func (c *Cache) GetBlock(k Key) Block {
|
||||
h := fastHashUint64(k.Offset)
|
||||
idx := h % uint64(len(c.shards))
|
||||
idx := uint64(0)
|
||||
if len(c.shards) > 1 {
|
||||
h := k.hashUint64()
|
||||
idx = h % uint64(len(c.shards))
|
||||
}
|
||||
shard := c.shards[idx]
|
||||
return shard.GetBlock(k)
|
||||
}
|
||||
|
||||
// PutBlock puts the given block b under the given key k into c.
|
||||
func (c *Cache) PutBlock(k Key, b Block) {
|
||||
h := fastHashUint64(k.Offset)
|
||||
idx := h % uint64(len(c.shards))
|
||||
idx := uint64(0)
|
||||
if len(c.shards) > 1 {
|
||||
h := k.hashUint64()
|
||||
idx = h % uint64(len(c.shards))
|
||||
}
|
||||
shard := c.shards[idx]
|
||||
shard.PutBlock(k, b)
|
||||
}
|
||||
|
@ -102,11 +132,34 @@ func (c *Cache) Misses() uint64 {
|
|||
return n
|
||||
}
|
||||
|
||||
func fastHashUint64(x uint64) uint64 {
|
||||
x ^= x >> 12 // a
|
||||
x ^= x << 25 // b
|
||||
x ^= x >> 27 // c
|
||||
return x * 2685821657736338717
|
||||
func (c *Cache) cleaner() {
|
||||
ticker := time.NewTicker(57 * time.Second)
|
||||
defer ticker.Stop()
|
||||
perKeyMissesTicker := time.NewTicker(7 * time.Minute)
|
||||
defer perKeyMissesTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.cleanerMustStopCh:
|
||||
close(c.cleanerStoppedCh)
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.cleanByTimeout()
|
||||
case <-perKeyMissesTicker.C:
|
||||
c.cleanPerKeyMisses()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) cleanByTimeout() {
|
||||
for _, shard := range c.shards {
|
||||
shard.cleanByTimeout()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) cleanPerKeyMisses() {
|
||||
for _, shard := range c.shards {
|
||||
shard.cleanPerKeyMisses()
|
||||
}
|
||||
}
|
||||
|
||||
type cache struct {
|
||||
|
@ -143,6 +196,11 @@ type Key struct {
|
|||
Offset uint64
|
||||
}
|
||||
|
||||
func (k *Key) hashUint64() uint64 {
|
||||
buf := (*[unsafe.Sizeof(*k)]byte)(unsafe.Pointer(k))
|
||||
return xxhash.Sum64(buf[:])
|
||||
}
|
||||
|
||||
// Block is an item, which may be cached in the Cache.
|
||||
type Block interface {
|
||||
// SizeBytes must return the approximate size of the given block in bytes
|
||||
|
@ -164,7 +222,6 @@ func newCache(getMaxSizeBytes func() int) *cache {
|
|||
c.getMaxSizeBytes = getMaxSizeBytes
|
||||
c.m = make(map[interface{}]map[uint64]*cacheEntry)
|
||||
c.perKeyMisses = make(map[Key]int)
|
||||
go c.cleaner()
|
||||
return &c
|
||||
}
|
||||
|
||||
|
@ -184,21 +241,10 @@ func (c *cache) updateSizeBytes(n int) {
|
|||
atomic.AddInt64(&c.sizeBytes, int64(n))
|
||||
}
|
||||
|
||||
func (c *cache) cleaner() {
|
||||
ticker := time.NewTicker(57 * time.Second)
|
||||
defer ticker.Stop()
|
||||
perKeyMissesTicker := time.NewTicker(2 * time.Minute)
|
||||
defer perKeyMissesTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
c.cleanByTimeout()
|
||||
case <-perKeyMissesTicker.C:
|
||||
c.mu.Lock()
|
||||
c.perKeyMisses = make(map[Key]int, len(c.perKeyMisses))
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
func (c *cache) cleanPerKeyMisses() {
|
||||
c.mu.Lock()
|
||||
c.perKeyMisses = make(map[Key]int, len(c.perKeyMisses))
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *cache) cleanByTimeout() {
|
||||
|
|
159
lib/blockcache/blockcache_test.go
Normal file
159
lib/blockcache/blockcache_test.go
Normal file
|
@ -0,0 +1,159 @@
|
|||
package blockcache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
const sizeMaxBytes = 1024 * 1024
|
||||
getMaxSize := func() int {
|
||||
return sizeMaxBytes
|
||||
}
|
||||
c := NewCache(getMaxSize)
|
||||
defer c.MustStop()
|
||||
if n := c.SizeBytes(); n != 0 {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.SizeMaxBytes(); n != sizeMaxBytes {
|
||||
t.Fatalf("unexpected SizeMaxBytes(); got %d; want %d", n, sizeMaxBytes)
|
||||
}
|
||||
offset := uint64(1234)
|
||||
part := (interface{})("foobar")
|
||||
k := Key{
|
||||
Offset: offset,
|
||||
Part: part,
|
||||
}
|
||||
var b testBlock
|
||||
blockSize := b.SizeBytes()
|
||||
// Put a single entry into cache
|
||||
c.PutBlock(k, &b)
|
||||
if n := c.Len(); n != 1 {
|
||||
t.Fatalf("unexpected number of items in the cache; got %d; want %d", n, 1)
|
||||
}
|
||||
if n := c.SizeBytes(); n != blockSize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, blockSize)
|
||||
}
|
||||
if n := c.Requests(); n != 0 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.Misses(); n != 0 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
|
||||
}
|
||||
// Obtain this entry from the cache
|
||||
if b1 := c.GetBlock(k); b1 != &b {
|
||||
t.Fatalf("unexpected block obtained; got %v; want %v", b1, &b)
|
||||
}
|
||||
if n := c.Requests(); n != 1 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 1)
|
||||
}
|
||||
if n := c.Misses(); n != 0 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
|
||||
}
|
||||
// Obtain non-existing entry from the cache
|
||||
if b1 := c.GetBlock(Key{Offset: offset + 1}); b1 != nil {
|
||||
t.Fatalf("unexpected non-nil block obtained for non-existing key: %v", b1)
|
||||
}
|
||||
if n := c.Requests(); n != 2 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 2)
|
||||
}
|
||||
if n := c.Misses(); n != 1 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 1)
|
||||
}
|
||||
// Remove entries for the given part from the cache
|
||||
c.RemoveBlocksForPart(part)
|
||||
if n := c.SizeBytes(); n != 0 {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0)
|
||||
}
|
||||
// Verify that the entry has been removed from the cache
|
||||
if b1 := c.GetBlock(k); b1 != nil {
|
||||
t.Fatalf("unexpected non-nil block obtained after removing all the blocks for the part; got %v", b1)
|
||||
}
|
||||
if n := c.Requests(); n != 3 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 3)
|
||||
}
|
||||
if n := c.Misses(); n != 2 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 2)
|
||||
}
|
||||
// Store the missed entry to the cache. It shouldn't be stored because of the previous cache miss
|
||||
c.PutBlock(k, &b)
|
||||
if n := c.SizeBytes(); n != 0 {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0)
|
||||
}
|
||||
// Verify that the entry wasn't stored to the cache.
|
||||
if b1 := c.GetBlock(k); b1 != nil {
|
||||
t.Fatalf("unexpected non-nil block obtained after removing all the blocks for the part; got %v", b1)
|
||||
}
|
||||
if n := c.Requests(); n != 4 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 4)
|
||||
}
|
||||
if n := c.Misses(); n != 3 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 3)
|
||||
}
|
||||
// Store the entry again. Now it must be stored because of the second cache miss.
|
||||
c.PutBlock(k, &b)
|
||||
if n := c.SizeBytes(); n != blockSize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, blockSize)
|
||||
}
|
||||
if b1 := c.GetBlock(k); b1 != &b {
|
||||
t.Fatalf("unexpected block obtained; got %v; want %v", b1, &b)
|
||||
}
|
||||
if n := c.Requests(); n != 5 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 5)
|
||||
}
|
||||
if n := c.Misses(); n != 3 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 3)
|
||||
}
|
||||
|
||||
// Manually clean the cache. The entry shouldn't be deleted because it was recently accessed.
|
||||
c.cleanPerKeyMisses()
|
||||
c.cleanByTimeout()
|
||||
if n := c.SizeBytes(); n != blockSize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, blockSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheConcurrentAccess(t *testing.T) {
|
||||
const sizeMaxBytes = 16 * 1024 * 1024
|
||||
getMaxSize := func() int {
|
||||
return sizeMaxBytes
|
||||
}
|
||||
c := NewCache(getMaxSize)
|
||||
defer c.MustStop()
|
||||
|
||||
workers := 5
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
testCacheSetGet(c)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func testCacheSetGet(c *Cache) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
part := (interface{})(i)
|
||||
b := testBlock{}
|
||||
k := Key{
|
||||
Offset: uint64(i),
|
||||
Part: part,
|
||||
}
|
||||
c.PutBlock(k, &b)
|
||||
if b1 := c.GetBlock(k); b1 != &b {
|
||||
panic(fmt.Errorf("unexpected block obtained; got %v; want %v", b1, &b))
|
||||
}
|
||||
if b1 := c.GetBlock(Key{}); b1 != nil {
|
||||
panic(fmt.Errorf("unexpected non-nil block obtained: %v", b1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type testBlock struct{}
|
||||
|
||||
func (tb *testBlock) SizeBytes() int {
|
||||
return 42
|
||||
}
|
50
lib/blockcache/blockcache_timing_test.go
Normal file
50
lib/blockcache/blockcache_timing_test.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package blockcache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkKeyHashUint64(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var hSum uint64
|
||||
var k Key
|
||||
for pb.Next() {
|
||||
k.Offset++
|
||||
h := k.hashUint64()
|
||||
hSum += h
|
||||
}
|
||||
atomic.AddUint64(&BenchSink, hSum)
|
||||
})
|
||||
}
|
||||
|
||||
var BenchSink uint64
|
||||
|
||||
func BenchmarkCacheGet(b *testing.B) {
|
||||
c := NewCache(func() int {
|
||||
return 1024 * 1024 * 16
|
||||
})
|
||||
defer c.MustStop()
|
||||
const blocksCount = 10000
|
||||
blocks := make([]*testBlock, blocksCount)
|
||||
for i := 0; i < blocksCount; i++ {
|
||||
blocks[i] = &testBlock{}
|
||||
c.PutBlock(Key{Offset: uint64(i)}, blocks[i])
|
||||
}
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(blocks)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var k Key
|
||||
for pb.Next() {
|
||||
for i := 0; i < blocksCount; i++ {
|
||||
k.Offset = uint64(i)
|
||||
b := c.GetBlock(k)
|
||||
if b != blocks[i] {
|
||||
panic(fmt.Errorf("unexpected block obtained from the cache; got %v; want %v", b, blocks[i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue