mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: improve performance of parseCache on systems with many CPU cores
This commit is contained in:
parent
b3e2db5647
commit
5a0d0bb83d
4 changed files with 405 additions and 80 deletions
|
@ -6,8 +6,6 @@ import (
|
|||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||
|
@ -16,7 +14,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
|
@ -328,80 +325,3 @@ func escapeDots(s string) string {
|
|||
}
|
||||
return string(result)
|
||||
}
|
||||
|
||||
var parseCacheV = func() *parseCache {
|
||||
pc := &parseCache{
|
||||
m: make(map[string]*parseCacheValue),
|
||||
}
|
||||
metrics.NewGauge(`vm_cache_requests_total{type="promql/parse"}`, func() float64 {
|
||||
return float64(pc.Requests())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_misses_total{type="promql/parse"}`, func() float64 {
|
||||
return float64(pc.Misses())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_entries{type="promql/parse"}`, func() float64 {
|
||||
return float64(pc.Len())
|
||||
})
|
||||
return pc
|
||||
}()
|
||||
|
||||
const parseCacheMaxLen = 10e3
|
||||
|
||||
type parseCacheValue struct {
|
||||
e metricsql.Expr
|
||||
err error
|
||||
}
|
||||
|
||||
type parseCache struct {
|
||||
requests atomic.Uint64
|
||||
misses atomic.Uint64
|
||||
|
||||
m map[string]*parseCacheValue
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (pc *parseCache) Requests() uint64 {
|
||||
return pc.requests.Load()
|
||||
}
|
||||
|
||||
func (pc *parseCache) Misses() uint64 {
|
||||
return pc.misses.Load()
|
||||
}
|
||||
|
||||
func (pc *parseCache) Len() uint64 {
|
||||
pc.mu.RLock()
|
||||
n := len(pc.m)
|
||||
pc.mu.RUnlock()
|
||||
return uint64(n)
|
||||
}
|
||||
|
||||
func (pc *parseCache) Get(q string) *parseCacheValue {
|
||||
pc.requests.Add(1)
|
||||
|
||||
pc.mu.RLock()
|
||||
pcv := pc.m[q]
|
||||
pc.mu.RUnlock()
|
||||
|
||||
if pcv == nil {
|
||||
pc.misses.Add(1)
|
||||
}
|
||||
return pcv
|
||||
}
|
||||
|
||||
func (pc *parseCache) Put(q string, pcv *parseCacheValue) {
|
||||
pc.mu.Lock()
|
||||
overflow := len(pc.m) - parseCacheMaxLen
|
||||
if overflow > 0 {
|
||||
// Remove 10% of items from the cache.
|
||||
overflow = int(float64(len(pc.m)) * 0.1)
|
||||
for k := range pc.m {
|
||||
delete(pc.m, k)
|
||||
overflow--
|
||||
if overflow <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
pc.m[q] = pcv
|
||||
pc.mu.Unlock()
|
||||
}
|
||||
|
|
150
app/vmselect/promql/parse_cache.go
Normal file
150
app/vmselect/promql/parse_cache.go
Normal file
|
@ -0,0 +1,150 @@
|
|||
// Cache for metricsql expressions
|
||||
// Based on the fastcache idea of locking buckets in order to avoid whole cache locks.
|
||||
// See: https://github.com/VictoriaMetrics/fastcache
|
||||
package promql
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
var parseCacheV = func() *parseCache {
|
||||
pc := NewParseCache()
|
||||
metrics.NewGauge(`vm_cache_requests_total{type="promql/parse"}`, func() float64 {
|
||||
return float64(pc.Requests())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_misses_total{type="promql/parse"}`, func() float64 {
|
||||
return float64(pc.Misses())
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_entries{type="promql/parse"}`, func() float64 {
|
||||
return float64(pc.Len())
|
||||
})
|
||||
return pc
|
||||
}()
|
||||
|
||||
const (
|
||||
parseBucketCount = 128
|
||||
|
||||
parseCacheMaxLen int = 10e3
|
||||
|
||||
parseBucketMaxLen int = parseCacheMaxLen / parseBucketCount
|
||||
|
||||
parseBucketFreePercent float64 = 0.1
|
||||
)
|
||||
|
||||
type parseCacheValue struct {
|
||||
e metricsql.Expr
|
||||
err error
|
||||
}
|
||||
|
||||
type parseBucket struct {
|
||||
m map[string]*parseCacheValue
|
||||
mu sync.RWMutex
|
||||
requests atomic.Uint64
|
||||
misses atomic.Uint64
|
||||
}
|
||||
|
||||
type parseCache struct {
|
||||
buckets [parseBucketCount]parseBucket
|
||||
}
|
||||
|
||||
func NewParseCache() *parseCache {
|
||||
pc := new(parseCache)
|
||||
for i := 0; i < parseBucketCount; i++ {
|
||||
pc.buckets[i] = newParseBucket()
|
||||
}
|
||||
return pc
|
||||
}
|
||||
|
||||
func (pc *parseCache) Put(q string, pcv *parseCacheValue) {
|
||||
h := xxhash.Sum64String(q)
|
||||
idx := h % parseBucketCount
|
||||
pc.buckets[idx].Put(q, pcv)
|
||||
}
|
||||
|
||||
func (pc *parseCache) Get(q string) *parseCacheValue {
|
||||
h := xxhash.Sum64String(q)
|
||||
idx := h % parseBucketCount
|
||||
return pc.buckets[idx].Get(q)
|
||||
}
|
||||
|
||||
func (pc *parseCache) Requests() uint64 {
|
||||
var n uint64
|
||||
for i := 0; i < parseBucketCount; i++ {
|
||||
n += pc.buckets[i].Requests()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (pc *parseCache) Misses() uint64 {
|
||||
var n uint64
|
||||
for i := 0; i < parseBucketCount; i++ {
|
||||
n += pc.buckets[i].Misses()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (pc *parseCache) Len() uint64 {
|
||||
var n uint64
|
||||
for i := 0; i < parseBucketCount; i++ {
|
||||
n += pc.buckets[i].Len()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func newParseBucket() parseBucket {
|
||||
return parseBucket{
|
||||
m: make(map[string]*parseCacheValue, parseBucketMaxLen),
|
||||
}
|
||||
}
|
||||
|
||||
func (pb *parseBucket) Requests() uint64 {
|
||||
return pb.requests.Load()
|
||||
}
|
||||
|
||||
func (pb *parseBucket) Misses() uint64 {
|
||||
return pb.misses.Load()
|
||||
}
|
||||
|
||||
func (pb *parseBucket) Len() uint64 {
|
||||
pb.mu.RLock()
|
||||
n := len(pb.m)
|
||||
pb.mu.RUnlock()
|
||||
return uint64(n)
|
||||
}
|
||||
|
||||
func (pb *parseBucket) Get(q string) *parseCacheValue {
|
||||
pb.requests.Add(1)
|
||||
|
||||
pb.mu.RLock()
|
||||
pcv := pb.m[q]
|
||||
pb.mu.RUnlock()
|
||||
|
||||
if pcv == nil {
|
||||
pb.misses.Add(1)
|
||||
}
|
||||
return pcv
|
||||
}
|
||||
|
||||
func (pb *parseBucket) Put(q string, pcv *parseCacheValue) {
|
||||
pb.mu.Lock()
|
||||
overflow := len(pb.m) - parseBucketMaxLen
|
||||
if overflow > 0 {
|
||||
// Remove parseBucketDeletePercent*100 % of items from the bucket.
|
||||
overflow = int(float64(len(pb.m)) * parseBucketFreePercent)
|
||||
for k := range pb.m {
|
||||
delete(pb.m, k)
|
||||
overflow--
|
||||
if overflow <= 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
pb.m[q] = pcv
|
||||
pb.mu.Unlock()
|
||||
}
|
129
app/vmselect/promql/parse_cache_test.go
Normal file
129
app/vmselect/promql/parse_cache_test.go
Normal file
|
@ -0,0 +1,129 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
func testGetParseCacheValue(q string) *parseCacheValue {
|
||||
e, err := metricsql.Parse(q)
|
||||
return &parseCacheValue{
|
||||
e: e,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func testGenerateQueries(items int) []string {
|
||||
queries := make([]string, items)
|
||||
for i := 0; i < items; i++ {
|
||||
queries[i] = fmt.Sprintf(`node_time_seconds{instance="node%d", job="job%d"}`, i, i)
|
||||
}
|
||||
return queries
|
||||
}
|
||||
|
||||
func TestParseCache(t *testing.T) {
|
||||
pc := NewParseCache()
|
||||
if pc.Len() != 0 || pc.Misses() != 0 || pc.Requests() != 0 {
|
||||
t.Errorf("unexpected pc.Len()=%d, pc.Misses()=%d, pc.Requests()=%d; expected all to be zero.", pc.Len(), pc.Misses(), pc.Requests())
|
||||
}
|
||||
|
||||
q1 := `foo{bar="baz"}`
|
||||
v1 := testGetParseCacheValue(q1)
|
||||
|
||||
q2 := `foo1{bar1="baz1"}`
|
||||
v2 := testGetParseCacheValue(q2)
|
||||
|
||||
pc.Put(q1, v1)
|
||||
if len := pc.Len(); len != 1 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", len, 1)
|
||||
}
|
||||
|
||||
if res := pc.Get(q2); res != nil {
|
||||
t.Errorf("unexpected non-empty value obtained from cache: %d ", res)
|
||||
}
|
||||
if len := pc.Len(); len != 1 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", len, 1)
|
||||
}
|
||||
if miss := pc.Misses(); miss != 1 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", miss, 1)
|
||||
}
|
||||
if req := pc.Requests(); req != 1 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", req, 1)
|
||||
}
|
||||
|
||||
pc.Put(q2, v2)
|
||||
if len := pc.Len(); len != 2 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", len, 2)
|
||||
}
|
||||
|
||||
if res := pc.Get(q1); res != v1 {
|
||||
t.Errorf("unexpected value obtained; got %v; want %v", res, v1)
|
||||
}
|
||||
|
||||
if res := pc.Get(q2); res != v2 {
|
||||
t.Errorf("unexpected value obtained; got %v; want %v", res, v2)
|
||||
}
|
||||
|
||||
pc.Put(q2, v2)
|
||||
if len := pc.Len(); len != 2 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", len, 2)
|
||||
}
|
||||
if miss := pc.Misses(); miss != 1 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", miss, 1)
|
||||
}
|
||||
if req := pc.Requests(); req != 3 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", req, 3)
|
||||
}
|
||||
|
||||
if res := pc.Get(q2); res != v2 {
|
||||
t.Errorf("unexpected value obtained; got %v; want %v", res, v2)
|
||||
}
|
||||
if len := pc.Len(); len != 2 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", len, 2)
|
||||
}
|
||||
if miss := pc.Misses(); miss != 1 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", miss, 1)
|
||||
}
|
||||
if req := pc.Requests(); req != 4 {
|
||||
t.Errorf("unexpected value obtained; got %d; want %d", req, 4)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCacheBucketOverflow(t *testing.T) {
|
||||
b := newParseBucket()
|
||||
var expectedLen uint64
|
||||
|
||||
// +2 for overflow and clean up
|
||||
queries := testGenerateQueries(parseBucketMaxLen + 2)
|
||||
|
||||
// Same value for all keys
|
||||
v := testGetParseCacheValue(queries[0])
|
||||
|
||||
// Fill bucket
|
||||
for i := 0; i < parseBucketMaxLen; i++ {
|
||||
b.Put(queries[i], v)
|
||||
}
|
||||
expectedLen = uint64(parseBucketMaxLen)
|
||||
if len := b.Len(); len != expectedLen {
|
||||
t.Errorf("unexpected value obtained; got %v; want %v", len, expectedLen)
|
||||
}
|
||||
|
||||
// Overflow bucket
|
||||
expectedLen = uint64(parseBucketMaxLen + 1)
|
||||
b.Put(queries[parseBucketMaxLen], v)
|
||||
if len := b.Len(); len != uint64(expectedLen) {
|
||||
t.Errorf("unexpected value obtained; got %v; want %v", len, expectedLen)
|
||||
}
|
||||
|
||||
// Clean up;
|
||||
oldLen := b.Len()
|
||||
overflow := int(float64(oldLen) * parseBucketFreePercent)
|
||||
expectedLen = oldLen - uint64(overflow) + 1 // +1 for new entry
|
||||
|
||||
b.Put(queries[parseBucketMaxLen+1], v)
|
||||
if len := b.Len(); len != expectedLen {
|
||||
t.Errorf("unexpected value obtained; got %v; want %v", len, expectedLen)
|
||||
}
|
||||
}
|
126
app/vmselect/promql/parse_cache_timing_test.go
Normal file
126
app/vmselect/promql/parse_cache_timing_test.go
Normal file
|
@ -0,0 +1,126 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkCachePutNoOverFlow(b *testing.B) {
|
||||
const items int = (parseCacheMaxLen / 2)
|
||||
pc := NewParseCache()
|
||||
|
||||
queries := testGenerateQueries(items)
|
||||
v := testGetParseCacheValue(queries[0])
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
for i := 0; i < items; i++ {
|
||||
pc.Put(queries[i], v)
|
||||
}
|
||||
}
|
||||
})
|
||||
if len := pc.Len(); len != uint64(items) {
|
||||
b.Errorf("unexpected value obtained; got %d; want %d", len, items)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCacheGetNoOverflow(b *testing.B) {
|
||||
const items int = parseCacheMaxLen / 2
|
||||
pc := NewParseCache()
|
||||
|
||||
queries := testGenerateQueries(items)
|
||||
v := testGetParseCacheValue(queries[0])
|
||||
|
||||
for i := 0; i < len(queries); i++ {
|
||||
pc.Put(queries[i], v)
|
||||
}
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
for i := 0; i < items; i++ {
|
||||
if v := pc.Get(queries[i]); v == nil {
|
||||
b.Errorf("unexpected nil value obtained from cache for query: %s ", queries[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkCachePutGetNoOverflow(b *testing.B) {
|
||||
const items int = parseCacheMaxLen / 2
|
||||
pc := NewParseCache()
|
||||
|
||||
queries := testGenerateQueries(items)
|
||||
v := testGetParseCacheValue(queries[0])
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
for i := 0; i < items; i++ {
|
||||
pc.Put(queries[i], v)
|
||||
if res := pc.Get(queries[i]); res == nil {
|
||||
b.Errorf("unexpected nil value obtained from cache for query: %s ", queries[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
if len := pc.Len(); len != uint64(items) {
|
||||
b.Errorf("unexpected value obtained; got %d; want %d", len, items)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCachePutOverflow(b *testing.B) {
|
||||
const items int = parseCacheMaxLen + (parseCacheMaxLen / 2)
|
||||
c := NewParseCache()
|
||||
|
||||
queries := testGenerateQueries(items)
|
||||
v := testGetParseCacheValue(queries[0])
|
||||
|
||||
for i := 0; i < parseCacheMaxLen; i++ {
|
||||
c.Put(queries[i], v)
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
for i := parseCacheMaxLen; i < items; i++ {
|
||||
c.Put(queries[i], v)
|
||||
}
|
||||
}
|
||||
})
|
||||
maxElemnts := uint64(parseCacheMaxLen + parseBucketCount)
|
||||
if len := c.Len(); len > maxElemnts {
|
||||
b.Errorf("cache length is more than expected; got %d, expected %d", len, maxElemnts)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCachePutGetOverflow(b *testing.B) {
|
||||
const items int = parseCacheMaxLen + (parseCacheMaxLen / 2)
|
||||
c := NewParseCache()
|
||||
|
||||
queries := testGenerateQueries(items)
|
||||
v := testGetParseCacheValue(queries[0])
|
||||
|
||||
for i := 0; i < parseCacheMaxLen; i++ {
|
||||
c.Put(queries[i], v)
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
for i := parseCacheMaxLen; i < items; i++ {
|
||||
c.Put(queries[i], v)
|
||||
c.Get(queries[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
maxElemnts := uint64(parseCacheMaxLen + parseBucketCount)
|
||||
if len := c.Len(); len > maxElemnts {
|
||||
b.Errorf("cache length is more than expected; got %d, expected %d", len, maxElemnts)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue