mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vmselect/promql: reduce RAM usage for aggregates over big number of time series
Calculate incremental aggregates for `aggr(metric_selector)` function instead of keeping all the time series matching the given `metric_selector` in memory.
This commit is contained in:
parent
1fe6d784d8
commit
a336bb4e22
5 changed files with 511 additions and 70 deletions
270
app/vmselect/promql/aggr_incremental.go
Normal file
270
app/vmselect/promql/aggr_incremental.go
Normal file
|
@ -0,0 +1,270 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// callbacks for optimized incremental calculations for aggregate functions
|
||||
// over rollups over metricExpr.
|
||||
//
|
||||
// These calculations save RAM for aggregates over big number of time series.
|
||||
var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{
|
||||
"sum": {
|
||||
updateAggrFunc: updateAggrSum,
|
||||
finalizeAggrFunc: finalizeAggrCommon,
|
||||
},
|
||||
"min": {
|
||||
updateAggrFunc: updateAggrMin,
|
||||
finalizeAggrFunc: finalizeAggrCommon,
|
||||
},
|
||||
"max": {
|
||||
updateAggrFunc: updateAggrMax,
|
||||
finalizeAggrFunc: finalizeAggrCommon,
|
||||
},
|
||||
"avg": {
|
||||
updateAggrFunc: updateAggrAvg,
|
||||
finalizeAggrFunc: finalizeAggrAvg,
|
||||
},
|
||||
"count": {
|
||||
updateAggrFunc: updateAggrCount,
|
||||
finalizeAggrFunc: finalizeAggrCount,
|
||||
},
|
||||
"sum2": {
|
||||
updateAggrFunc: updateAggrSum2,
|
||||
finalizeAggrFunc: finalizeAggrCommon,
|
||||
},
|
||||
"geomean": {
|
||||
updateAggrFunc: updateAggrGeomean,
|
||||
finalizeAggrFunc: finalizeAggrGeomean,
|
||||
},
|
||||
}
|
||||
|
||||
type incrementalAggrFuncContext struct {
|
||||
ae *aggrFuncExpr
|
||||
|
||||
mu sync.Mutex
|
||||
m map[string]*incrementalAggrContext
|
||||
|
||||
callbacks *incrementalAggrFuncCallbacks
|
||||
}
|
||||
|
||||
func newIncrementalAggrFuncContext(ae *aggrFuncExpr, callbacks *incrementalAggrFuncCallbacks) *incrementalAggrFuncContext {
|
||||
return &incrementalAggrFuncContext{
|
||||
ae: ae,
|
||||
m: make(map[string]*incrementalAggrContext, 1),
|
||||
callbacks: callbacks,
|
||||
}
|
||||
}
|
||||
|
||||
func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries) {
|
||||
removeGroupTags(&ts.MetricName, &iafc.ae.Modifier)
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||
iafc.mu.Lock()
|
||||
iac := iafc.m[string(bb.B)]
|
||||
if iac == nil {
|
||||
tsAggr := ×eries{
|
||||
Values: make([]float64, len(ts.Values)),
|
||||
Timestamps: ts.Timestamps,
|
||||
denyReuse: true,
|
||||
}
|
||||
tsAggr.MetricName.CopyFrom(&ts.MetricName)
|
||||
iac = &incrementalAggrContext{
|
||||
ts: tsAggr,
|
||||
values: make([]float64, len(ts.Values)),
|
||||
}
|
||||
iafc.m[string(bb.B)] = iac
|
||||
}
|
||||
iafc.callbacks.updateAggrFunc(iac, ts.Values)
|
||||
iafc.mu.Unlock()
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
func (iafc *incrementalAggrFuncContext) finalizeTimeseries() []*timeseries {
|
||||
// There is no need in iafc.mu.Lock here, since getTimeseries must be called
|
||||
// without concurrent goroutines touching iafc.
|
||||
tss := make([]*timeseries, 0, len(iafc.m))
|
||||
finalizeAggrFunc := iafc.callbacks.finalizeAggrFunc
|
||||
for _, iac := range iafc.m {
|
||||
finalizeAggrFunc(iac)
|
||||
tss = append(tss, iac.ts)
|
||||
}
|
||||
return tss
|
||||
}
|
||||
|
||||
type incrementalAggrFuncCallbacks struct {
|
||||
updateAggrFunc func(iac *incrementalAggrContext, values []float64)
|
||||
finalizeAggrFunc func(iac *incrementalAggrContext)
|
||||
}
|
||||
|
||||
func getIncrementalAggrFuncCallbacks(name string) *incrementalAggrFuncCallbacks {
|
||||
name = strings.ToLower(name)
|
||||
return incrementalAggrFuncCallbacksMap[name]
|
||||
}
|
||||
|
||||
type incrementalAggrContext struct {
|
||||
ts *timeseries
|
||||
values []float64
|
||||
}
|
||||
|
||||
func finalizeAggrCommon(iac *incrementalAggrContext) {
|
||||
counts := iac.values
|
||||
dstValues := iac.ts.Values
|
||||
_ = dstValues[len(counts)-1]
|
||||
for i, v := range counts {
|
||||
if v == 0 {
|
||||
dstValues[i] = nan
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func updateAggrSum(iac *incrementalAggrContext, values []float64) {
|
||||
dstValues := iac.ts.Values
|
||||
dstCounts := iac.values
|
||||
_ = dstValues[len(values)-1]
|
||||
_ = dstCounts[len(values)-1]
|
||||
for i, v := range values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues[i] += v
|
||||
dstCounts[i] = 1
|
||||
}
|
||||
}
|
||||
|
||||
func updateAggrMin(iac *incrementalAggrContext, values []float64) {
|
||||
dstValues := iac.ts.Values
|
||||
dstCounts := iac.values
|
||||
_ = dstValues[len(values)-1]
|
||||
_ = dstCounts[len(values)-1]
|
||||
for i, v := range values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
if dstCounts[i] == 0 {
|
||||
dstValues[i] = v
|
||||
dstCounts[i] = 1
|
||||
continue
|
||||
}
|
||||
if v < dstValues[i] {
|
||||
dstValues[i] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func updateAggrMax(iac *incrementalAggrContext, values []float64) {
|
||||
dstValues := iac.ts.Values
|
||||
dstCounts := iac.values
|
||||
_ = dstValues[len(values)-1]
|
||||
_ = dstCounts[len(values)-1]
|
||||
for i, v := range values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
if dstCounts[i] == 0 {
|
||||
dstValues[i] = v
|
||||
dstCounts[i] = 1
|
||||
continue
|
||||
}
|
||||
if v > dstValues[i] {
|
||||
dstValues[i] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func updateAggrAvg(iac *incrementalAggrContext, values []float64) {
|
||||
// Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation,
|
||||
// since it is slower and has no obvious benefits in increased precision.
|
||||
dstValues := iac.ts.Values
|
||||
dstCounts := iac.values
|
||||
_ = dstValues[len(values)-1]
|
||||
_ = dstCounts[len(values)-1]
|
||||
for i, v := range values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
if dstCounts[i] == 0 {
|
||||
dstValues[i] = v
|
||||
dstCounts[i] = 1
|
||||
continue
|
||||
}
|
||||
dstValues[i] += v
|
||||
dstCounts[i]++
|
||||
}
|
||||
}
|
||||
|
||||
func finalizeAggrAvg(iac *incrementalAggrContext) {
|
||||
dstValues := iac.ts.Values
|
||||
counts := iac.values
|
||||
_ = dstValues[len(counts)-1]
|
||||
for i, v := range counts {
|
||||
if v == 0 {
|
||||
dstValues[i] = nan
|
||||
continue
|
||||
}
|
||||
dstValues[i] /= v
|
||||
}
|
||||
}
|
||||
|
||||
func updateAggrCount(iac *incrementalAggrContext, values []float64) {
|
||||
dstValues := iac.ts.Values
|
||||
_ = dstValues[len(values)-1]
|
||||
for i, v := range values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues[i]++
|
||||
}
|
||||
}
|
||||
|
||||
func finalizeAggrCount(iac *incrementalAggrContext) {
|
||||
// Nothing to do
|
||||
}
|
||||
|
||||
func updateAggrSum2(iac *incrementalAggrContext, values []float64) {
|
||||
dstValues := iac.ts.Values
|
||||
dstCounts := iac.values
|
||||
_ = dstValues[len(values)-1]
|
||||
_ = dstCounts[len(values)-1]
|
||||
for i, v := range values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues[i] += v * v
|
||||
dstCounts[i] = 1
|
||||
}
|
||||
}
|
||||
|
||||
func updateAggrGeomean(iac *incrementalAggrContext, values []float64) {
|
||||
dstValues := iac.ts.Values
|
||||
dstCounts := iac.values
|
||||
_ = dstValues[len(values)-1]
|
||||
_ = dstCounts[len(values)-1]
|
||||
for i, v := range values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
if dstCounts[i] == 0 {
|
||||
dstValues[i] = v
|
||||
dstCounts[i] = 1
|
||||
continue
|
||||
}
|
||||
dstValues[i] *= v
|
||||
dstCounts[i]++
|
||||
}
|
||||
}
|
||||
|
||||
func finalizeAggrGeomean(iac *incrementalAggrContext) {
|
||||
dstValues := iac.ts.Values
|
||||
counts := iac.values
|
||||
_ = dstValues[len(counts)-1]
|
||||
for i, v := range counts {
|
||||
if v == 0 {
|
||||
dstValues[i] = nan
|
||||
continue
|
||||
}
|
||||
dstValues[i] = math.Pow(dstValues[i], 1/v)
|
||||
}
|
||||
}
|
|
@ -145,14 +145,14 @@ func evalExpr(ec *EvalConfig, e expr) ([]*timeseries, error) {
|
|||
re := &rollupExpr{
|
||||
Expr: me,
|
||||
}
|
||||
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re)
|
||||
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(`cannot evaluate %q: %s`, me.AppendString(nil), err)
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
if re, ok := e.(*rollupExpr); ok {
|
||||
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re)
|
||||
rv, err := evalRollupFunc(ec, "default_rollup", rollupDefault, re, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(`cannot evaluate %q: %s`, re.AppendString(nil), err)
|
||||
}
|
||||
|
@ -188,13 +188,30 @@ func evalExpr(ec *EvalConfig, e expr) ([]*timeseries, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rv, err := evalRollupFunc(ec, fe.Name, rf, re)
|
||||
rv, err := evalRollupFunc(ec, fe.Name, rf, re, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(`cannot evaluate %q: %s`, fe.AppendString(nil), err)
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
if ae, ok := e.(*aggrFuncExpr); ok {
|
||||
if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
|
||||
fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
|
||||
if fe != nil {
|
||||
// There is an optimized path for calculating aggrFuncExpr over rollupFunc over metricExpr.
|
||||
// The optimized path saves RAM for aggregates over big number of time series.
|
||||
args, re, err := evalRollupFuncArgs(ec, fe)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rf, err := nrf(args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iafc := newIncrementalAggrFuncContext(ae, callbacks)
|
||||
return evalRollupFunc(ec, fe.Name, rf, re, iafc)
|
||||
}
|
||||
}
|
||||
args, err := evalExprs(ec, ae.Args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -249,6 +266,69 @@ func evalExpr(ec *EvalConfig, e expr) ([]*timeseries, error) {
|
|||
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
|
||||
}
|
||||
|
||||
func tryGetArgRollupFuncWithMetricExpr(ae *aggrFuncExpr) (*funcExpr, newRollupFunc) {
|
||||
if len(ae.Args) != 1 {
|
||||
return nil, nil
|
||||
}
|
||||
e := ae.Args[0]
|
||||
// Make sure e contains one of the following:
|
||||
// - metricExpr
|
||||
// - metricExpr[d]
|
||||
// - rollupFunc(metricExpr)
|
||||
// - rollupFunc(metricExpr[d])
|
||||
|
||||
if me, ok := e.(*metricExpr); ok {
|
||||
// e = metricExpr
|
||||
if me.IsEmpty() {
|
||||
return nil, nil
|
||||
}
|
||||
fe := &funcExpr{
|
||||
Name: "default_rollup",
|
||||
Args: []expr{me},
|
||||
}
|
||||
nrf := getRollupFunc(fe.Name)
|
||||
return fe, nrf
|
||||
}
|
||||
if re, ok := e.(*rollupExpr); ok {
|
||||
if me, ok := re.Expr.(*metricExpr); !ok || me.IsEmpty() {
|
||||
return nil, nil
|
||||
}
|
||||
// e = rollupExpr(metricExpr)
|
||||
fe := &funcExpr{
|
||||
Name: "default_rollup",
|
||||
Args: []expr{re},
|
||||
}
|
||||
nrf := getRollupFunc(fe.Name)
|
||||
return fe, nrf
|
||||
}
|
||||
fe, ok := e.(*funcExpr)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
nrf := getRollupFunc(fe.Name)
|
||||
if nrf == nil {
|
||||
return nil, nil
|
||||
}
|
||||
rollupArgIdx := getRollupArgIdx(fe.Name)
|
||||
arg := fe.Args[rollupArgIdx]
|
||||
if me, ok := arg.(*metricExpr); ok {
|
||||
if me.IsEmpty() {
|
||||
return nil, nil
|
||||
}
|
||||
return &funcExpr{
|
||||
Name: fe.Name,
|
||||
Args: []expr{me},
|
||||
}, nrf
|
||||
}
|
||||
if re, ok := arg.(*rollupExpr); ok {
|
||||
if me, ok := re.Expr.(*metricExpr); !ok || me.IsEmpty() {
|
||||
return nil, nil
|
||||
}
|
||||
return fe, nrf
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func evalExprs(ec *EvalConfig, es []expr) ([][]*timeseries, error) {
|
||||
var rvs [][]*timeseries
|
||||
for _, e := range es {
|
||||
|
@ -308,7 +388,7 @@ func getRollupExprArg(arg expr) *rollupExpr {
|
|||
return &reNew
|
||||
}
|
||||
|
||||
func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *rollupExpr) ([]*timeseries, error) {
|
||||
func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *rollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
|
||||
ecNew := ec
|
||||
var offset int64
|
||||
if len(re.Offset) > 0 {
|
||||
|
@ -325,19 +405,11 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, re *rollupExpr)
|
|||
var rvs []*timeseries
|
||||
var err error
|
||||
if me, ok := re.Expr.(*metricExpr); ok {
|
||||
if me.IsEmpty() {
|
||||
rvs = evalNumber(ecNew, nan)
|
||||
} else {
|
||||
var window int64
|
||||
if len(re.Window) > 0 {
|
||||
window, err = DurationValue(re.Window, ec.Step)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, me, window)
|
||||
}
|
||||
rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, me, iafc, re.Window)
|
||||
} else {
|
||||
if iafc != nil {
|
||||
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil))
|
||||
}
|
||||
rvs, err = evalRollupFuncWithSubquery(ecNew, name, rf, re)
|
||||
}
|
||||
if err != nil {
|
||||
|
@ -478,9 +550,21 @@ var (
|
|||
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
|
||||
)
|
||||
|
||||
func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me *metricExpr, window int64) ([]*timeseries, error) {
|
||||
func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me *metricExpr, iafc *incrementalAggrFuncContext, windowStr string) ([]*timeseries, error) {
|
||||
if me.IsEmpty() {
|
||||
return evalNumber(ec, nan), nil
|
||||
}
|
||||
var window int64
|
||||
if len(windowStr) > 0 {
|
||||
var err error
|
||||
window, err = DurationValue(windowStr, ec.Step)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Search for partial results in cache.
|
||||
tssCached, start := rollupResultCacheV.Get(name, ec, me, window)
|
||||
tssCached, start := rollupResultCacheV.Get(name, ec, me, iafc, window)
|
||||
if start > ec.End {
|
||||
// The result is fully cached.
|
||||
rollupResultCacheFullHits.Inc()
|
||||
|
@ -530,9 +614,70 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
|||
defer rml.Put(uint64(rollupMemorySize))
|
||||
|
||||
// Evaluate rollup
|
||||
tss := make([]*timeseries, 0, rssLen*len(rcs))
|
||||
var tss []*timeseries
|
||||
if iafc != nil {
|
||||
tss, err = evalRollupWithIncrementalAggregate(iafc, rss, rcs, preFunc, sharedTimestamps)
|
||||
} else {
|
||||
tss, err = evalRollupNoIncrementalAggregate(rss, rcs, preFunc, sharedTimestamps)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !rollupFuncsKeepMetricGroup[name] {
|
||||
tss = copyTimeseriesMetricNames(tss)
|
||||
for _, ts := range tss {
|
||||
ts.MetricName.ResetMetricGroup()
|
||||
}
|
||||
}
|
||||
tss = mergeTimeseries(tssCached, tss, start, ec)
|
||||
rollupResultCacheV.Put(name, ec, me, iafc, window, tss)
|
||||
|
||||
return tss, nil
|
||||
}
|
||||
|
||||
var (
|
||||
rollupMemoryLimiter memoryLimiter
|
||||
rollupMemoryLimiterOnce sync.Once
|
||||
)
|
||||
|
||||
func getRollupMemoryLimiter() *memoryLimiter {
|
||||
rollupMemoryLimiterOnce.Do(func() {
|
||||
rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4
|
||||
})
|
||||
return &rollupMemoryLimiter
|
||||
}
|
||||
|
||||
func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
|
||||
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
||||
err := rss.RunParallel(func(rs *netstorage.Result) {
|
||||
preFunc(rs.Values, rs.Timestamps)
|
||||
ts := getTimeseries()
|
||||
defer putTimeseries(ts)
|
||||
for _, rc := range rcs {
|
||||
ts.Reset()
|
||||
ts.MetricName.CopyFrom(&rs.MetricName)
|
||||
if len(rc.TagValue) > 0 {
|
||||
ts.MetricName.AddTag("rollup", rc.TagValue)
|
||||
}
|
||||
ts.Values = rc.Do(ts.Values[:0], rs.Values, rs.Timestamps)
|
||||
ts.Timestamps = sharedTimestamps
|
||||
iafc.updateTimeseries(ts)
|
||||
ts.Timestamps = nil
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tss := iafc.finalizeTimeseries()
|
||||
return tss, nil
|
||||
}
|
||||
|
||||
func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConfig,
|
||||
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
||||
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
|
||||
var tssLock sync.Mutex
|
||||
err = rss.RunParallel(func(rs *netstorage.Result) {
|
||||
err := rss.RunParallel(func(rs *netstorage.Result) {
|
||||
preFunc(rs.Values, rs.Timestamps)
|
||||
for _, rc := range rcs {
|
||||
var ts timeseries
|
||||
|
@ -552,30 +697,9 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !rollupFuncsKeepMetricGroup[name] {
|
||||
tss = copyTimeseriesMetricNames(tss)
|
||||
for _, ts := range tss {
|
||||
ts.MetricName.ResetMetricGroup()
|
||||
}
|
||||
}
|
||||
tss = mergeTimeseries(tssCached, tss, start, ec)
|
||||
rollupResultCacheV.Put(name, ec, me, window, tss)
|
||||
|
||||
return tss, nil
|
||||
}
|
||||
|
||||
var (
|
||||
rollupMemoryLimiter memoryLimiter
|
||||
rollupMemoryLimiterOnce sync.Once
|
||||
)
|
||||
|
||||
func getRollupMemoryLimiter() *memoryLimiter {
|
||||
rollupMemoryLimiterOnce.Do(func() {
|
||||
rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4
|
||||
})
|
||||
return &rollupMemoryLimiter
|
||||
}
|
||||
|
||||
func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, sharedTimestamps []int64) (func(values []float64, timestamps []int64), []*rollupConfig) {
|
||||
preFunc := func(values []float64, timestamps []int64) {}
|
||||
if rollupFuncsRemoveCounterResets[name] {
|
||||
|
|
|
@ -125,7 +125,7 @@ func ResetRollupResultCache() {
|
|||
rollupResultCacheV.c.Reset()
|
||||
}
|
||||
|
||||
func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExpr, window int64) (tss []*timeseries, newStart int64) {
|
||||
func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExpr, iafc *incrementalAggrFuncContext, window int64) (tss []*timeseries, newStart int64) {
|
||||
if *disableCache || !ec.mayCache() {
|
||||
return nil, ec.Start
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExp
|
|||
bb := bbPool.Get()
|
||||
defer bbPool.Put(bb)
|
||||
|
||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, window, ec.Step)
|
||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step)
|
||||
metainfoBuf := rrc.c.Get(nil, bb.B)
|
||||
if len(metainfoBuf) == 0 {
|
||||
return nil, ec.Start
|
||||
|
@ -152,7 +152,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExp
|
|||
if len(resultBuf) == 0 {
|
||||
mi.RemoveKey(key)
|
||||
metainfoBuf = mi.Marshal(metainfoBuf[:0])
|
||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, window, ec.Step)
|
||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step)
|
||||
rrc.c.Set(bb.B, metainfoBuf)
|
||||
return nil, ec.Start
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ func (rrc *rollupResultCache) Get(funcName string, ec *EvalConfig, me *metricExp
|
|||
return tss, newStart
|
||||
}
|
||||
|
||||
func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExpr, window int64, tss []*timeseries) {
|
||||
func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExpr, iafc *incrementalAggrFuncContext, window int64, tss []*timeseries) {
|
||||
if *disableCache || len(tss) == 0 || !ec.mayCache() {
|
||||
return
|
||||
}
|
||||
|
@ -242,7 +242,7 @@ func (rrc *rollupResultCache) Put(funcName string, ec *EvalConfig, me *metricExp
|
|||
bb.B = key.Marshal(bb.B[:0])
|
||||
rrc.c.SetBig(bb.B, tssMarshaled)
|
||||
|
||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, window, ec.Step)
|
||||
bb.B = marshalRollupResultCacheKey(bb.B[:0], funcName, me, iafc, window, ec.Step)
|
||||
metainfoBuf := rrc.c.Get(nil, bb.B)
|
||||
var mi rollupResultCacheMetainfo
|
||||
if len(metainfoBuf) > 0 {
|
||||
|
@ -270,10 +270,16 @@ var (
|
|||
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
|
||||
|
||||
// Increment this value every time the format of the cache changes.
|
||||
const rollupResultCacheVersion = 4
|
||||
const rollupResultCacheVersion = 5
|
||||
|
||||
func marshalRollupResultCacheKey(dst []byte, funcName string, me *metricExpr, window, step int64) []byte {
|
||||
func marshalRollupResultCacheKey(dst []byte, funcName string, me *metricExpr, iafc *incrementalAggrFuncContext, window, step int64) []byte {
|
||||
dst = append(dst, rollupResultCacheVersion)
|
||||
if iafc == nil {
|
||||
dst = append(dst, 0)
|
||||
} else {
|
||||
dst = append(dst, 1)
|
||||
dst = iafc.ae.AppendString(dst)
|
||||
}
|
||||
dst = encoding.MarshalUint64(dst, uint64(len(funcName)))
|
||||
dst = append(dst, funcName...)
|
||||
dst = encoding.MarshalInt64(dst, window)
|
||||
|
|
|
@ -23,10 +23,15 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Value: []byte("xxx"),
|
||||
}},
|
||||
}
|
||||
iafc := &incrementalAggrFuncContext{
|
||||
ae: &aggrFuncExpr{
|
||||
Name: "foobar",
|
||||
},
|
||||
}
|
||||
|
||||
// Try obtaining an empty value.
|
||||
t.Run("empty", func(t *testing.T) {
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != ec.Start {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, ec.Start)
|
||||
}
|
||||
|
@ -36,7 +41,7 @@ func TestRollupResultCache(t *testing.T) {
|
|||
})
|
||||
|
||||
// Store timeseries overlapping with start
|
||||
t.Run("start-overlap", func(t *testing.T) {
|
||||
t.Run("start-overlap-no-iafc", func(t *testing.T) {
|
||||
ResetRollupResultCache()
|
||||
tss := []*timeseries{
|
||||
{
|
||||
|
@ -44,8 +49,29 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{0, 1, 2},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 1400 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
||||
}
|
||||
tssExpected := []*timeseries{
|
||||
{
|
||||
Timestamps: []int64{1000, 1200},
|
||||
Values: []float64{1, 2},
|
||||
},
|
||||
}
|
||||
testTimeseriesEqual(t, tss, tssExpected)
|
||||
})
|
||||
t.Run("start-overlap-with-iafc", func(t *testing.T) {
|
||||
ResetRollupResultCache()
|
||||
tss := []*timeseries{
|
||||
{
|
||||
Timestamps: []int64{800, 1000, 1200},
|
||||
Values: []float64{0, 1, 2},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, iafc, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, iafc, window)
|
||||
if newStart != 1400 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
||||
}
|
||||
|
@ -67,8 +93,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{333, 0, 1, 2},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 1000 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||
}
|
||||
|
@ -86,8 +112,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{0, 1, 2},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 1000 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||
}
|
||||
|
@ -105,8 +131,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{0, 1, 2},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 1000 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||
}
|
||||
|
@ -124,8 +150,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{0, 1, 2},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 1000 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1000)
|
||||
}
|
||||
|
@ -143,8 +169,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{0, 1, 2, 3, 4, 5, 6, 7},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 2200 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
||||
}
|
||||
|
@ -166,8 +192,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{1, 2, 3, 4, 5, 6},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 2200 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
||||
}
|
||||
|
@ -191,8 +217,8 @@ func TestRollupResultCache(t *testing.T) {
|
|||
}
|
||||
tss = append(tss, ts)
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss)
|
||||
tssResult, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss)
|
||||
tssResult, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 2200 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 2200)
|
||||
}
|
||||
|
@ -220,10 +246,10 @@ func TestRollupResultCache(t *testing.T) {
|
|||
Values: []float64{0, 1, 2},
|
||||
},
|
||||
}
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss1)
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss2)
|
||||
rollupResultCacheV.Put(funcName, ec, me, window, tss3)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, window)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss1)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss2)
|
||||
rollupResultCacheV.Put(funcName, ec, me, nil, window, tss3)
|
||||
tss, newStart := rollupResultCacheV.Get(funcName, ec, me, nil, window)
|
||||
if newStart != 1400 {
|
||||
t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -61,6 +62,20 @@ func (ts *timeseries) CopyShallow(src *timeseries) {
|
|||
ts.denyReuse = true
|
||||
}
|
||||
|
||||
func getTimeseries() *timeseries {
|
||||
if v := timeseriesPool.Get(); v != nil {
|
||||
return v.(*timeseries)
|
||||
}
|
||||
return ×eries{}
|
||||
}
|
||||
|
||||
func putTimeseries(ts *timeseries) {
|
||||
ts.Reset()
|
||||
timeseriesPool.Put(ts)
|
||||
}
|
||||
|
||||
var timeseriesPool sync.Pool
|
||||
|
||||
func marshalTimeseriesFast(tss []*timeseries, maxSize int, step int64) []byte {
|
||||
if len(tss) == 0 {
|
||||
logger.Panicf("BUG: tss cannot be empty")
|
||||
|
|
Loading…
Reference in a new issue