app/vmselect/promql: store compressed results in the cache

This should increase rollup results cache capacity.
This commit is contained in:
Aliaksandr Valialkin 2019-08-14 01:39:41 +03:00
parent 9d41e0dcae
commit 9ecb994671
3 changed files with 28 additions and 12 deletions

View file

@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
@ -153,15 +154,23 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExp
return nil, ec.Start
}
bb.B = key.Marshal(bb.B[:0])
resultBuf := rrc.c.GetBig(nil, bb.B)
if len(resultBuf) == 0 {
compressedResultBuf := resultBufPool.Get()
defer resultBufPool.Put(compressedResultBuf)
compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], bb.B)
if len(compressedResultBuf.B) == 0 {
mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step)
rrc.c.Set(bb.B, metainfoBuf)
return nil, ec.Start
}
tss, err := unmarshalTimeseriesFast(resultBuf)
// Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast
// refers to the byte slice, so it cannot be returned to the resultBufPool.
resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B)
if err != nil {
logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err)
}
tss, err = unmarshalTimeseriesFast(resultBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err)
}
@ -201,6 +210,8 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExp
return tss, newStart
}
var resultBufPool bytesutil.ByteBufferPool
func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExpr, iafc *incrementalAggrFuncContext, window int64, tss []*timeseries) {
if *disableCache || len(tss) == 0 || !ec.mayCache() {
return
@ -232,11 +243,16 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExp
// Store tss in the cache.
maxMarshaledSize := getRollupResultCacheSize() / 4
tssMarshaled := marshalTimeseriesFast(tss, maxMarshaledSize, ec.Step)
if tssMarshaled == nil {
resultBuf := resultBufPool.Get()
defer resultBufPool.Put(resultBuf)
resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, ec.Step)
if len(resultBuf.B) == 0 {
tooBigRollupResults.Inc()
return
}
compressedResultBuf := resultBufPool.Get()
defer resultBufPool.Put(compressedResultBuf)
compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1)
bb := bbPool.Get()
defer bbPool.Put(bb)
@ -245,7 +261,7 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExp
key.prefix = rollupResultCacheKeyPrefix
key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1)
bb.B = key.Marshal(bb.B[:0])
rrc.c.SetBig(bb.B, tssMarshaled)
rrc.c.SetBig(bb.B, compressedResultBuf.B)
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step)
metainfoBuf := rrc.c.Get(nil, bb.B)
@ -275,7 +291,7 @@ var (
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 5
const rollupResultCacheVersion = 6
func marshalRollupResultCacheKey(dst []byte, funcName string, me *metricExpr, iafc *incrementalAggrFuncContext, window, step int64) []byte {
dst = append(dst, rollupResultCacheVersion)

View file

@ -76,7 +76,7 @@ func putTimeseries(ts *timeseries) {
var timeseriesPool sync.Pool
func marshalTimeseriesFast(tss []*timeseries, maxSize int, step int64) []byte {
func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int64) []byte {
if len(tss) == 0 {
logger.Panicf("BUG: tss cannot be empty")
}
@ -92,13 +92,13 @@ func marshalTimeseriesFast(tss []*timeseries, maxSize int, step int64) []byte {
if size > maxSize {
// Do not marshal tss, since it would occupy too much space
return nil
return dst
}
// Allocate the buffer for the marshaled tss before its' marshaling.
// This should reduce memory fragmentation and memory usage.
dst := make([]byte, 0, size)
dst = marshalFastTimestamps(dst, tss[0].Timestamps)
dst = bytesutil.Resize(dst, size)
dst = marshalFastTimestamps(dst[:0], tss[0].Timestamps)
for _, ts := range tss {
dst = ts.marshalFastNoTimestamps(dst)
}

View file

@ -74,7 +74,7 @@ func TestTimeseriesMarshalUnmarshalFast(t *testing.T) {
tssOrig = append(tssOrig, &ts)
}
buf := marshalTimeseriesFast(tssOrig, 1e6, 123)
buf := marshalTimeseriesFast(nil, tssOrig, 1e6, 123)
tssGot, err := unmarshalTimeseriesFast(buf)
if err != nil {
t.Fatalf("error in unmarshalTimeseriesFast: %s", err)