adds enforced tag filters into cache key (#1095)

This commit is contained in:
Nikolay 2021-02-27 01:15:53 +03:00 committed by GitHub
parent a78948ae8b
commit 186c078fac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -178,7 +178,7 @@ func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window in
bb := bbPool.Get() bb := bbPool.Get()
defer bbPool.Put(bb) defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilters)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
if len(metainfoBuf) == 0 { if len(metainfoBuf) == 0 {
return nil, ec.Start return nil, ec.Start
@ -198,7 +198,7 @@ func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window in
if len(compressedResultBuf.B) == 0 { if len(compressedResultBuf.B) == 0 {
mi.RemoveKey(key) mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0]) metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilters)
rrc.c.Set(bb.B, metainfoBuf) rrc.c.Set(bb.B, metainfoBuf)
return nil, ec.Start return nil, ec.Start
} }
@ -301,7 +301,7 @@ func (rrc *rollupResultCache) Put(ec *EvalConfig, expr metricsql.Expr, window in
bb.B = key.Marshal(bb.B[:0]) bb.B = key.Marshal(bb.B[:0])
rrc.c.SetBig(bb.B, compressedResultBuf.B) rrc.c.SetBig(bb.B, compressedResultBuf.B)
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilters)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
var mi rollupResultCacheMetainfo var mi rollupResultCacheMetainfo
if len(metainfoBuf) > 0 { if len(metainfoBuf) > 0 {
@ -331,11 +331,14 @@ var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes. // Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 7 const rollupResultCacheVersion = 7
func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64) []byte { func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64, filters []storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion) dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step) dst = encoding.MarshalInt64(dst, step)
dst = expr.AppendString(dst) dst = expr.AppendString(dst)
for _, f := range filters {
dst = f.Marshal(dst)
}
return dst return dst
} }