diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index d7c742bec..9348c54e0 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "math" + "strconv" "strings" "sync" @@ -148,6 +149,11 @@ var rollupAggrFuncs = map[string]rollupFunc{ "zscore_over_time": rollupZScoreOverTime, } +// rollupAggrFuncsWithParam are functions with a single additional parameter that can be passed to `aggr_over_time()` +var rollupAggrFuncsWithParam = map[string]func(float64) rollupFunc{ + "quantile_over_time": newRollupQuantileFromPhi, +} + // VictoriaMetrics can extends lookbehind window for these functions // in order to make sure it contains enough points for returning non-empty results. // @@ -234,7 +240,41 @@ var rollupFuncsKeepMetricName = map[string]bool{ "timestamp_with_name": true, } -func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { +type rollupAggrFunc struct { + // The function. + rf rollupFunc + + // The name of the function as given in syntax and above maps. + name string + + // The rollup tag. + tag string +} + +func getRollupAggrFunc(s string) (rollupAggrFunc, error) { + if rf, ok := rollupAggrFuncs[s]; ok { + return rollupAggrFunc{ + rf: rf, + name: s, + tag: s, + }, nil + } + if i := strings.IndexByte(s, '('); i > 0 && s[len(s)-1] == ')' { + name := s[0:i] + if pf, ok := rollupAggrFuncsWithParam[name]; ok { + if param, err := strconv.ParseFloat(s[i+1:len(s)-1], 64); err == nil { + return rollupAggrFunc{ + rf: pf(param), + name: name, + tag: s, + }, nil + } + } + } + return rollupAggrFunc{}, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s) +} + +func getRollupAggrFuncs(expr metricsql.Expr) ([]rollupAggrFunc, error) { afe, ok := expr.(*metricsql.AggrFuncExpr) if ok { // This is for incremental aggregate function case: @@ -275,12 +315,14 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { if len(aggrFuncNames) == 0 { return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name") } - for _, s := range aggrFuncNames { - if rollupAggrFuncs[s] == nil { - return nil, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s) + aggrFuncs := make([]rollupAggrFunc, len(aggrFuncNames)) + for i, s := range aggrFuncNames { + var err error + if aggrFuncs[i], err = getRollupAggrFunc(s); err != nil { + return nil, err } } - return aggrFuncNames, nil + return aggrFuncs, nil } // getRollupTag returns the possible second arg from the expr. @@ -430,19 +472,18 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start } rcs, err = appendRollupConfigs(rcs, expr) case "aggr_over_time": - aggrFuncNames, err := getRollupAggrFuncNames(expr) + aggrFuncs, err := getRollupAggrFuncs(expr) if err != nil { return nil, nil, fmt.Errorf("invalid args to %s: %w", expr.AppendString(nil), err) } - for _, aggrFuncName := range aggrFuncNames { - if rollupFuncsRemoveCounterResets[aggrFuncName] { + for _, aggrFunc := range aggrFuncs { + if rollupFuncsRemoveCounterResets[aggrFunc.name] { // There is no need to save the previous preFunc, since it is either empty or the same. preFunc = func(values []float64, timestamps []int64) { removeCounterResets(values) } } - rf := rollupAggrFuncs[aggrFuncName] - rcs = append(rcs, newRollupConfig(rf, aggrFuncName)) + rcs = append(rcs, newRollupConfig(aggrFunc.rf, aggrFunc.tag)) } default: rcs = append(rcs, newRollupConfig(rf, "")) @@ -1291,15 +1332,17 @@ func newRollupQuantile(args []interface{}) (rollupFunc, error) { if err != nil { return nil, err } - rf := func(rfa *rollupFuncArg) float64 { + return newRollupQuantileFromPhi(phis[0]), nil +} + +func newRollupQuantileFromPhi(phi float64) rollupFunc { + return func(rfa *rollupFuncArg) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs. values := rfa.values - phi := phis[rfa.idx] qv := quantile(phi, values) return qv } - return rf, nil } func rollupMAD(rfa *rollupFuncArg) float64 {