adds enforced tag filters into cache key (#1095)

This commit is contained in:
Nikolay 2021-02-27 01:15:53 +03:00 committed by Aliaksandr Valialkin
parent 44975e28fe
commit 673b10dd7f

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -149,7 +150,7 @@ func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window in
bb := bbPool.Get() bb := bbPool.Get()
defer bbPool.Put(bb) defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilters)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
if len(metainfoBuf) == 0 { if len(metainfoBuf) == 0 {
return nil, ec.Start return nil, ec.Start
@ -169,7 +170,7 @@ func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window in
if len(compressedResultBuf.B) == 0 { if len(compressedResultBuf.B) == 0 {
mi.RemoveKey(key) mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0]) metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilters)
rrc.c.Set(bb.B, metainfoBuf) rrc.c.Set(bb.B, metainfoBuf)
return nil, ec.Start return nil, ec.Start
} }
@ -272,7 +273,7 @@ func (rrc *rollupResultCache) Put(ec *EvalConfig, expr metricsql.Expr, window in
bb.B = key.Marshal(bb.B[:0]) bb.B = key.Marshal(bb.B[:0])
rrc.c.SetBig(bb.B, compressedResultBuf.B) rrc.c.SetBig(bb.B, compressedResultBuf.B)
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilters)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
var mi rollupResultCacheMetainfo var mi rollupResultCacheMetainfo
if len(metainfoBuf) > 0 { if len(metainfoBuf) > 0 {
@ -302,13 +303,16 @@ var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes. // Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 7 const rollupResultCacheVersion = 7
func marshalRollupResultCacheKey(dst []byte, at *auth.Token, expr metricsql.Expr, window, step int64) []byte { func marshalRollupResultCacheKey(dst []byte, at *auth.Token, expr metricsql.Expr, window, step int64, filters []storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion) dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint32(dst, at.AccountID) dst = encoding.MarshalUint32(dst, at.AccountID)
dst = encoding.MarshalUint32(dst, at.ProjectID) dst = encoding.MarshalUint32(dst, at.ProjectID)
dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step) dst = encoding.MarshalInt64(dst, step)
dst = expr.AppendString(dst) dst = expr.AppendString(dst)
for _, f := range filters {
dst = f.Marshal(dst)
}
return dst return dst
} }