diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index f33ed3e304..472004f343 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -153,15 +154,23 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExp return nil, ec.Start } bb.B = key.Marshal(bb.B[:0]) - resultBuf := rrc.c.GetBig(nil, bb.B) - if len(resultBuf) == 0 { + compressedResultBuf := resultBufPool.Get() + defer resultBufPool.Put(compressedResultBuf) + compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], bb.B) + if len(compressedResultBuf.B) == 0 { mi.RemoveKey(key) metainfoBuf = mi.Marshal(metainfoBuf[:0]) bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) rrc.c.Set(bb.B, metainfoBuf) return nil, ec.Start } - tss, err := unmarshalTimeseriesFast(resultBuf) + // Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast + // refers to the byte slice, so it cannot be returned to the resultBufPool. + resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B) + if err != nil { + logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err) + } + tss, err = unmarshalTimeseriesFast(resultBuf) if err != nil { logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err) } @@ -201,6 +210,8 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExp return tss, newStart } +var resultBufPool bytesutil.ByteBufferPool + func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExpr, iafc *incrementalAggrFuncContext, window int64, tss []*timeseries) { if *disableCache || len(tss) == 0 || !ec.mayCache() { return @@ -232,11 +243,16 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExp // Store tss in the cache. maxMarshaledSize := getRollupResultCacheSize() / 4 - tssMarshaled := marshalTimeseriesFast(tss, maxMarshaledSize, ec.Step) - if tssMarshaled == nil { + resultBuf := resultBufPool.Get() + defer resultBufPool.Put(resultBuf) + resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, ec.Step) + if len(resultBuf.B) == 0 { tooBigRollupResults.Inc() return } + compressedResultBuf := resultBufPool.Get() + defer resultBufPool.Put(compressedResultBuf) + compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1) bb := bbPool.Get() defer bbPool.Put(bb) @@ -245,7 +261,7 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExp key.prefix = rollupResultCacheKeyPrefix key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1) bb.B = key.Marshal(bb.B[:0]) - rrc.c.SetBig(bb.B, tssMarshaled) + rrc.c.SetBig(bb.B, compressedResultBuf.B) bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step) metainfoBuf := rrc.c.Get(nil, bb.B) @@ -275,7 +291,7 @@ var ( var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total") // Increment this value every time the format of the cache changes. -const rollupResultCacheVersion = 5 +const rollupResultCacheVersion = 6 func marshalRollupResultCacheKey(dst []byte, funcName string, me *metricExpr, iafc *incrementalAggrFuncContext, window, step int64) []byte { dst = append(dst, rollupResultCacheVersion) diff --git a/app/vmselect/promql/timeseries.go b/app/vmselect/promql/timeseries.go index 95bdfa000a..f40907cc13 100644 --- a/app/vmselect/promql/timeseries.go +++ b/app/vmselect/promql/timeseries.go @@ -76,7 +76,7 @@ func putTimeseries(ts *timeseries) { var timeseriesPool sync.Pool -func marshalTimeseriesFast(tss []*timeseries, maxSize int, step int64) []byte { +func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int64) []byte { if len(tss) == 0 { logger.Panicf("BUG: tss cannot be empty") } @@ -92,13 +92,13 @@ func marshalTimeseriesFast(tss []*timeseries, maxSize int, step int64) []byte { if size > maxSize { // Do not marshal tss, since it would occupy too much space - return nil + return dst } // Allocate the buffer for the marshaled tss before its' marshaling. // This should reduce memory fragmentation and memory usage. - dst := make([]byte, 0, size) - dst = marshalFastTimestamps(dst, tss[0].Timestamps) + dst = bytesutil.Resize(dst, size) + dst = marshalFastTimestamps(dst[:0], tss[0].Timestamps) for _, ts := range tss { dst = ts.marshalFastNoTimestamps(dst) } diff --git a/app/vmselect/promql/timeseries_test.go b/app/vmselect/promql/timeseries_test.go index fb1c6ad541..87a9618a7a 100644 --- a/app/vmselect/promql/timeseries_test.go +++ b/app/vmselect/promql/timeseries_test.go @@ -74,7 +74,7 @@ func TestTimeseriesMarshalUnmarshalFast(t *testing.T) { tssOrig = append(tssOrig, &ts) } - buf := marshalTimeseriesFast(tssOrig, 1e6, 123) + buf := marshalTimeseriesFast(nil, tssOrig, 1e6, 123) tssGot, err := unmarshalTimeseriesFast(buf) if err != nil { t.Fatalf("error in unmarshalTimeseriesFast: %s", err)