From 5a0d0bb83dbd1ab2fed76fd3a58c24cecf270c18 Mon Sep 17 00:00:00 2001 From: kiriklo Date: Fri, 1 Nov 2024 21:34:42 +0300 Subject: [PATCH] app/vmselect/promql: improve performance of parseCache on systems with many CPU cores --- app/vmselect/promql/exec.go | 80 ---------- app/vmselect/promql/parse_cache.go | 150 ++++++++++++++++++ app/vmselect/promql/parse_cache_test.go | 129 +++++++++++++++ .../promql/parse_cache_timing_test.go | 126 +++++++++++++++ 4 files changed, 405 insertions(+), 80 deletions(-) create mode 100644 app/vmselect/promql/parse_cache.go create mode 100644 app/vmselect/promql/parse_cache_test.go create mode 100644 app/vmselect/promql/parse_cache_timing_test.go diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 5f2a798e0..6f207a399 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -6,8 +6,6 @@ import ( "math" "sort" "strings" - "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" @@ -16,7 +14,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" ) @@ -328,80 +325,3 @@ func escapeDots(s string) string { } return string(result) } - -var parseCacheV = func() *parseCache { - pc := &parseCache{ - m: make(map[string]*parseCacheValue), - } - metrics.NewGauge(`vm_cache_requests_total{type="promql/parse"}`, func() float64 { - return float64(pc.Requests()) - }) - metrics.NewGauge(`vm_cache_misses_total{type="promql/parse"}`, func() float64 { - return float64(pc.Misses()) - }) - metrics.NewGauge(`vm_cache_entries{type="promql/parse"}`, func() float64 { - return float64(pc.Len()) - }) - return pc -}() - -const parseCacheMaxLen = 10e3 - -type parseCacheValue struct { - e metricsql.Expr - err error -} - -type parseCache struct { - requests atomic.Uint64 - misses atomic.Uint64 - - m map[string]*parseCacheValue - mu sync.RWMutex -} - -func (pc *parseCache) Requests() uint64 { - return pc.requests.Load() -} - -func (pc *parseCache) Misses() uint64 { - return pc.misses.Load() -} - -func (pc *parseCache) Len() uint64 { - pc.mu.RLock() - n := len(pc.m) - pc.mu.RUnlock() - return uint64(n) -} - -func (pc *parseCache) Get(q string) *parseCacheValue { - pc.requests.Add(1) - - pc.mu.RLock() - pcv := pc.m[q] - pc.mu.RUnlock() - - if pcv == nil { - pc.misses.Add(1) - } - return pcv -} - -func (pc *parseCache) Put(q string, pcv *parseCacheValue) { - pc.mu.Lock() - overflow := len(pc.m) - parseCacheMaxLen - if overflow > 0 { - // Remove 10% of items from the cache. - overflow = int(float64(len(pc.m)) * 0.1) - for k := range pc.m { - delete(pc.m, k) - overflow-- - if overflow <= 0 { - break - } - } - } - pc.m[q] = pcv - pc.mu.Unlock() -} diff --git a/app/vmselect/promql/parse_cache.go b/app/vmselect/promql/parse_cache.go new file mode 100644 index 000000000..65e92e4b2 --- /dev/null +++ b/app/vmselect/promql/parse_cache.go @@ -0,0 +1,150 @@ +// Cache for metricsql expressions +// Based on the fastcache idea of locking buckets in order to avoid whole cache locks. +// See: https://github.com/VictoriaMetrics/fastcache +package promql + +import ( + "sync" + "sync/atomic" + + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" + + xxhash "github.com/cespare/xxhash/v2" +) + +var parseCacheV = func() *parseCache { + pc := NewParseCache() + metrics.NewGauge(`vm_cache_requests_total{type="promql/parse"}`, func() float64 { + return float64(pc.Requests()) + }) + metrics.NewGauge(`vm_cache_misses_total{type="promql/parse"}`, func() float64 { + return float64(pc.Misses()) + }) + metrics.NewGauge(`vm_cache_entries{type="promql/parse"}`, func() float64 { + return float64(pc.Len()) + }) + return pc +}() + +const ( + parseBucketCount = 128 + + parseCacheMaxLen int = 10e3 + + parseBucketMaxLen int = parseCacheMaxLen / parseBucketCount + + parseBucketFreePercent float64 = 0.1 +) + +type parseCacheValue struct { + e metricsql.Expr + err error +} + +type parseBucket struct { + m map[string]*parseCacheValue + mu sync.RWMutex + requests atomic.Uint64 + misses atomic.Uint64 +} + +type parseCache struct { + buckets [parseBucketCount]parseBucket +} + +func NewParseCache() *parseCache { + pc := new(parseCache) + for i := 0; i < parseBucketCount; i++ { + pc.buckets[i] = newParseBucket() + } + return pc +} + +func (pc *parseCache) Put(q string, pcv *parseCacheValue) { + h := xxhash.Sum64String(q) + idx := h % parseBucketCount + pc.buckets[idx].Put(q, pcv) +} + +func (pc *parseCache) Get(q string) *parseCacheValue { + h := xxhash.Sum64String(q) + idx := h % parseBucketCount + return pc.buckets[idx].Get(q) +} + +func (pc *parseCache) Requests() uint64 { + var n uint64 + for i := 0; i < parseBucketCount; i++ { + n += pc.buckets[i].Requests() + } + return n +} + +func (pc *parseCache) Misses() uint64 { + var n uint64 + for i := 0; i < parseBucketCount; i++ { + n += pc.buckets[i].Misses() + } + return n +} + +func (pc *parseCache) Len() uint64 { + var n uint64 + for i := 0; i < parseBucketCount; i++ { + n += pc.buckets[i].Len() + } + return n +} + +func newParseBucket() parseBucket { + return parseBucket{ + m: make(map[string]*parseCacheValue, parseBucketMaxLen), + } +} + +func (pb *parseBucket) Requests() uint64 { + return pb.requests.Load() +} + +func (pb *parseBucket) Misses() uint64 { + return pb.misses.Load() +} + +func (pb *parseBucket) Len() uint64 { + pb.mu.RLock() + n := len(pb.m) + pb.mu.RUnlock() + return uint64(n) +} + +func (pb *parseBucket) Get(q string) *parseCacheValue { + pb.requests.Add(1) + + pb.mu.RLock() + pcv := pb.m[q] + pb.mu.RUnlock() + + if pcv == nil { + pb.misses.Add(1) + } + return pcv +} + +func (pb *parseBucket) Put(q string, pcv *parseCacheValue) { + pb.mu.Lock() + overflow := len(pb.m) - parseBucketMaxLen + if overflow > 0 { + // Remove parseBucketDeletePercent*100 % of items from the bucket. + overflow = int(float64(len(pb.m)) * parseBucketFreePercent) + for k := range pb.m { + delete(pb.m, k) + overflow-- + if overflow <= 0 { + break + } + } + } + pb.m[q] = pcv + pb.mu.Unlock() +} diff --git a/app/vmselect/promql/parse_cache_test.go b/app/vmselect/promql/parse_cache_test.go new file mode 100644 index 000000000..77bd964c2 --- /dev/null +++ b/app/vmselect/promql/parse_cache_test.go @@ -0,0 +1,129 @@ +package promql + +import ( + "fmt" + "testing" + + "github.com/VictoriaMetrics/metricsql" +) + +func testGetParseCacheValue(q string) *parseCacheValue { + e, err := metricsql.Parse(q) + return &parseCacheValue{ + e: e, + err: err, + } +} + +func testGenerateQueries(items int) []string { + queries := make([]string, items) + for i := 0; i < items; i++ { + queries[i] = fmt.Sprintf(`node_time_seconds{instance="node%d", job="job%d"}`, i, i) + } + return queries +} + +func TestParseCache(t *testing.T) { + pc := NewParseCache() + if pc.Len() != 0 || pc.Misses() != 0 || pc.Requests() != 0 { + t.Errorf("unexpected pc.Len()=%d, pc.Misses()=%d, pc.Requests()=%d; expected all to be zero.", pc.Len(), pc.Misses(), pc.Requests()) + } + + q1 := `foo{bar="baz"}` + v1 := testGetParseCacheValue(q1) + + q2 := `foo1{bar1="baz1"}` + v2 := testGetParseCacheValue(q2) + + pc.Put(q1, v1) + if len := pc.Len(); len != 1 { + t.Errorf("unexpected value obtained; got %d; want %d", len, 1) + } + + if res := pc.Get(q2); res != nil { + t.Errorf("unexpected non-empty value obtained from cache: %d ", res) + } + if len := pc.Len(); len != 1 { + t.Errorf("unexpected value obtained; got %d; want %d", len, 1) + } + if miss := pc.Misses(); miss != 1 { + t.Errorf("unexpected value obtained; got %d; want %d", miss, 1) + } + if req := pc.Requests(); req != 1 { + t.Errorf("unexpected value obtained; got %d; want %d", req, 1) + } + + pc.Put(q2, v2) + if len := pc.Len(); len != 2 { + t.Errorf("unexpected value obtained; got %d; want %d", len, 2) + } + + if res := pc.Get(q1); res != v1 { + t.Errorf("unexpected value obtained; got %v; want %v", res, v1) + } + + if res := pc.Get(q2); res != v2 { + t.Errorf("unexpected value obtained; got %v; want %v", res, v2) + } + + pc.Put(q2, v2) + if len := pc.Len(); len != 2 { + t.Errorf("unexpected value obtained; got %d; want %d", len, 2) + } + if miss := pc.Misses(); miss != 1 { + t.Errorf("unexpected value obtained; got %d; want %d", miss, 1) + } + if req := pc.Requests(); req != 3 { + t.Errorf("unexpected value obtained; got %d; want %d", req, 3) + } + + if res := pc.Get(q2); res != v2 { + t.Errorf("unexpected value obtained; got %v; want %v", res, v2) + } + if len := pc.Len(); len != 2 { + t.Errorf("unexpected value obtained; got %d; want %d", len, 2) + } + if miss := pc.Misses(); miss != 1 { + t.Errorf("unexpected value obtained; got %d; want %d", miss, 1) + } + if req := pc.Requests(); req != 4 { + t.Errorf("unexpected value obtained; got %d; want %d", req, 4) + } +} + +func TestParseCacheBucketOverflow(t *testing.T) { + b := newParseBucket() + var expectedLen uint64 + + // +2 for overflow and clean up + queries := testGenerateQueries(parseBucketMaxLen + 2) + + // Same value for all keys + v := testGetParseCacheValue(queries[0]) + + // Fill bucket + for i := 0; i < parseBucketMaxLen; i++ { + b.Put(queries[i], v) + } + expectedLen = uint64(parseBucketMaxLen) + if len := b.Len(); len != expectedLen { + t.Errorf("unexpected value obtained; got %v; want %v", len, expectedLen) + } + + // Overflow bucket + expectedLen = uint64(parseBucketMaxLen + 1) + b.Put(queries[parseBucketMaxLen], v) + if len := b.Len(); len != uint64(expectedLen) { + t.Errorf("unexpected value obtained; got %v; want %v", len, expectedLen) + } + + // Clean up; + oldLen := b.Len() + overflow := int(float64(oldLen) * parseBucketFreePercent) + expectedLen = oldLen - uint64(overflow) + 1 // +1 for new entry + + b.Put(queries[parseBucketMaxLen+1], v) + if len := b.Len(); len != expectedLen { + t.Errorf("unexpected value obtained; got %v; want %v", len, expectedLen) + } +} diff --git a/app/vmselect/promql/parse_cache_timing_test.go b/app/vmselect/promql/parse_cache_timing_test.go new file mode 100644 index 000000000..a27f2c93a --- /dev/null +++ b/app/vmselect/promql/parse_cache_timing_test.go @@ -0,0 +1,126 @@ +package promql + +import ( + "testing" +) + +func BenchmarkCachePutNoOverFlow(b *testing.B) { + const items int = (parseCacheMaxLen / 2) + pc := NewParseCache() + + queries := testGenerateQueries(items) + v := testGetParseCacheValue(queries[0]) + + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := 0; i < items; i++ { + pc.Put(queries[i], v) + } + } + }) + if len := pc.Len(); len != uint64(items) { + b.Errorf("unexpected value obtained; got %d; want %d", len, items) + } +} + +func BenchmarkCacheGetNoOverflow(b *testing.B) { + const items int = parseCacheMaxLen / 2 + pc := NewParseCache() + + queries := testGenerateQueries(items) + v := testGetParseCacheValue(queries[0]) + + for i := 0; i < len(queries); i++ { + pc.Put(queries[i], v) + } + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := 0; i < items; i++ { + if v := pc.Get(queries[i]); v == nil { + b.Errorf("unexpected nil value obtained from cache for query: %s ", queries[i]) + } + } + } + }) +} + +func BenchmarkCachePutGetNoOverflow(b *testing.B) { + const items int = parseCacheMaxLen / 2 + pc := NewParseCache() + + queries := testGenerateQueries(items) + v := testGetParseCacheValue(queries[0]) + + b.ResetTimer() + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := 0; i < items; i++ { + pc.Put(queries[i], v) + if res := pc.Get(queries[i]); res == nil { + b.Errorf("unexpected nil value obtained from cache for query: %s ", queries[i]) + } + } + } + }) + if len := pc.Len(); len != uint64(items) { + b.Errorf("unexpected value obtained; got %d; want %d", len, items) + } +} + +func BenchmarkCachePutOverflow(b *testing.B) { + const items int = parseCacheMaxLen + (parseCacheMaxLen / 2) + c := NewParseCache() + + queries := testGenerateQueries(items) + v := testGetParseCacheValue(queries[0]) + + for i := 0; i < parseCacheMaxLen; i++ { + c.Put(queries[i], v) + } + + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := parseCacheMaxLen; i < items; i++ { + c.Put(queries[i], v) + } + } + }) + maxElemnts := uint64(parseCacheMaxLen + parseBucketCount) + if len := c.Len(); len > maxElemnts { + b.Errorf("cache length is more than expected; got %d, expected %d", len, maxElemnts) + } +} + +func BenchmarkCachePutGetOverflow(b *testing.B) { + const items int = parseCacheMaxLen + (parseCacheMaxLen / 2) + c := NewParseCache() + + queries := testGenerateQueries(items) + v := testGetParseCacheValue(queries[0]) + + for i := 0; i < parseCacheMaxLen; i++ { + c.Put(queries[i], v) + } + + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := parseCacheMaxLen; i < items; i++ { + c.Put(queries[i], v) + c.Get(queries[i]) + } + } + }) + maxElemnts := uint64(parseCacheMaxLen + parseBucketCount) + if len := c.Len(); len > maxElemnts { + b.Errorf("cache length is more than expected; got %d, expected %d", len, maxElemnts) + } +}