Allow using quantile_over_time in aggr_over_time

This commit is contained in:
Anton Tykhyy 2023-08-02 13:45:39 +03:00
parent 5aed369132
commit b5f8997687
No known key found for this signature in database
GPG key ID: EAFEDE5E9F0F3665

View file

@ -4,6 +4,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"math" "math"
"strconv"
"strings" "strings"
"sync" "sync"
@ -148,6 +149,11 @@ var rollupAggrFuncs = map[string]rollupFunc{
"zscore_over_time": rollupZScoreOverTime, "zscore_over_time": rollupZScoreOverTime,
} }
// rollupAggrFuncsWithParam are functions with a single additional parameter that can be passed to `aggr_over_time()`
var rollupAggrFuncsWithParam = map[string]func(float64) rollupFunc{
"quantile_over_time": newRollupQuantileFromPhi,
}
// VictoriaMetrics can extends lookbehind window for these functions // VictoriaMetrics can extends lookbehind window for these functions
// in order to make sure it contains enough points for returning non-empty results. // in order to make sure it contains enough points for returning non-empty results.
// //
@ -234,7 +240,41 @@ var rollupFuncsKeepMetricName = map[string]bool{
"timestamp_with_name": true, "timestamp_with_name": true,
} }
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { type rollupAggrFunc struct {
// The function.
rf rollupFunc
// The name of the function as given in syntax and above maps.
name string
// The rollup tag.
tag string
}
func getRollupAggrFunc(s string) (rollupAggrFunc, error) {
if rf, ok := rollupAggrFuncs[s]; ok {
return rollupAggrFunc{
rf: rf,
name: s,
tag: s,
}, nil
}
if i := strings.IndexByte(s, '('); i > 0 && s[len(s)-1] == ')' {
name := s[0:i]
if pf, ok := rollupAggrFuncsWithParam[name]; ok {
if param, err := strconv.ParseFloat(s[i+1:len(s)-1], 64); err == nil {
return rollupAggrFunc{
rf: pf(param),
name: name,
tag: s,
}, nil
}
}
}
return rollupAggrFunc{}, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s)
}
func getRollupAggrFuncs(expr metricsql.Expr) ([]rollupAggrFunc, error) {
afe, ok := expr.(*metricsql.AggrFuncExpr) afe, ok := expr.(*metricsql.AggrFuncExpr)
if ok { if ok {
// This is for incremental aggregate function case: // This is for incremental aggregate function case:
@ -275,12 +315,14 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
if len(aggrFuncNames) == 0 { if len(aggrFuncNames) == 0 {
return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name") return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name")
} }
for _, s := range aggrFuncNames { aggrFuncs := make([]rollupAggrFunc, len(aggrFuncNames))
if rollupAggrFuncs[s] == nil { for i, s := range aggrFuncNames {
return nil, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s) var err error
if aggrFuncs[i], err = getRollupAggrFunc(s); err != nil {
return nil, err
} }
} }
return aggrFuncNames, nil return aggrFuncs, nil
} }
// getRollupTag returns the possible second arg from the expr. // getRollupTag returns the possible second arg from the expr.
@ -430,19 +472,18 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
} }
rcs, err = appendRollupConfigs(rcs, expr) rcs, err = appendRollupConfigs(rcs, expr)
case "aggr_over_time": case "aggr_over_time":
aggrFuncNames, err := getRollupAggrFuncNames(expr) aggrFuncs, err := getRollupAggrFuncs(expr)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("invalid args to %s: %w", expr.AppendString(nil), err) return nil, nil, fmt.Errorf("invalid args to %s: %w", expr.AppendString(nil), err)
} }
for _, aggrFuncName := range aggrFuncNames { for _, aggrFunc := range aggrFuncs {
if rollupFuncsRemoveCounterResets[aggrFuncName] { if rollupFuncsRemoveCounterResets[aggrFunc.name] {
// There is no need to save the previous preFunc, since it is either empty or the same. // There is no need to save the previous preFunc, since it is either empty or the same.
preFunc = func(values []float64, timestamps []int64) { preFunc = func(values []float64, timestamps []int64) {
removeCounterResets(values) removeCounterResets(values)
} }
} }
rf := rollupAggrFuncs[aggrFuncName] rcs = append(rcs, newRollupConfig(aggrFunc.rf, aggrFunc.tag))
rcs = append(rcs, newRollupConfig(rf, aggrFuncName))
} }
default: default:
rcs = append(rcs, newRollupConfig(rf, "")) rcs = append(rcs, newRollupConfig(rf, ""))
@ -1291,15 +1332,17 @@ func newRollupQuantile(args []interface{}) (rollupFunc, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rf := func(rfa *rollupFuncArg) float64 { return newRollupQuantileFromPhi(phis[0]), nil
}
func newRollupQuantileFromPhi(phi float64) rollupFunc {
return func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up // There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
phi := phis[rfa.idx]
qv := quantile(phi, values) qv := quantile(phi, values)
return qv return qv
} }
return rf, nil
} }
func rollupMAD(rfa *rollupFuncArg) float64 { func rollupMAD(rfa *rollupFuncArg) float64 {