mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: add aggr_over_time(("aggr_func1", "aggr_func2", ...), m[d])
function
This function can be used for simultaneous calculating of multiple `aggr_func*` functions that accept range vector. For example, `aggr_over_time(("min_over_time", "max_over_time"), m[d])` would calculate `min_over_time` and `max_over_time` for `m[d]`.
This commit is contained in:
parent
c4632faa9d
commit
164278151f
5 changed files with 235 additions and 62 deletions
|
@ -359,6 +359,9 @@ func evalExprs(ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
|
|||
func evalRollupFuncArgs(ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
|
||||
var re *metricsql.RollupExpr
|
||||
rollupArgIdx := getRollupArgIdx(fe.Name)
|
||||
if len(fe.Args) <= rollupArgIdx {
|
||||
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx, fe.Name, len(fe.Args), fe.AppendString(nil))
|
||||
}
|
||||
args := make([]interface{}, len(fe.Args))
|
||||
for i, arg := range fe.Args {
|
||||
if i == rollupArgIdx {
|
||||
|
@ -430,7 +433,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E
|
|||
if iafc != nil {
|
||||
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil))
|
||||
}
|
||||
rvs, err = evalRollupFuncWithSubquery(ecNew, name, rf, re)
|
||||
rvs, err = evalRollupFuncWithSubquery(ecNew, name, rf, expr, re)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -449,7 +452,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E
|
|||
return rvs, nil
|
||||
}
|
||||
|
||||
func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *metricsql.RollupExpr) ([]*timeseries, error) {
|
||||
func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
|
||||
// TODO: determine whether to use rollupResultCacheV here.
|
||||
var step int64
|
||||
if len(re.Step) > 0 {
|
||||
|
@ -490,7 +493,10 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *
|
|||
}
|
||||
|
||||
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step)
|
||||
preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
|
||||
preFunc, rcs, err := getRollupConfigs(name, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
||||
var tssLock sync.Mutex
|
||||
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
|
||||
|
@ -624,7 +630,11 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
|
|||
return tss, nil
|
||||
}
|
||||
sharedTimestamps := getTimestamps(start, ec.End, ec.Step)
|
||||
preFunc, rcs := getRollupConfigs(name, rf, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
|
||||
preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
|
||||
if err != nil {
|
||||
rss.Cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Verify timeseries fit available memory after the rollup.
|
||||
// Take into account points from tssCached.
|
||||
|
@ -751,62 +761,6 @@ func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.M
|
|||
tsDst.denyReuse = true
|
||||
}
|
||||
|
||||
func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) (
|
||||
func(values []float64, timestamps []int64), []*rollupConfig) {
|
||||
preFunc := func(values []float64, timestamps []int64) {}
|
||||
if rollupFuncsRemoveCounterResets[name] {
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
removeCounterResets(values)
|
||||
}
|
||||
}
|
||||
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
|
||||
return &rollupConfig{
|
||||
TagValue: tagValue,
|
||||
Func: rf,
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
Window: window,
|
||||
MayAdjustWindow: rollupFuncsMayAdjustWindow[name],
|
||||
LookbackDelta: lookbackDelta,
|
||||
Timestamps: sharedTimestamps,
|
||||
}
|
||||
}
|
||||
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
||||
dst = append(dst, newRollupConfig(rollupMin, "min"))
|
||||
dst = append(dst, newRollupConfig(rollupMax, "max"))
|
||||
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
|
||||
return dst
|
||||
}
|
||||
var rcs []*rollupConfig
|
||||
switch name {
|
||||
case "rollup":
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
case "rollup_rate", "rollup_deriv":
|
||||
preFuncPrev := preFunc
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
preFuncPrev(values, timestamps)
|
||||
derivValues(values, timestamps)
|
||||
}
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
case "rollup_increase", "rollup_delta":
|
||||
preFuncPrev := preFunc
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
preFuncPrev(values, timestamps)
|
||||
deltaValues(values)
|
||||
}
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
case "rollup_candlestick":
|
||||
rcs = append(rcs, newRollupConfig(rollupFirst, "open"))
|
||||
rcs = append(rcs, newRollupConfig(rollupLast, "close"))
|
||||
rcs = append(rcs, newRollupConfig(rollupMin, "low"))
|
||||
rcs = append(rcs, newRollupConfig(rollupMax, "high"))
|
||||
default:
|
||||
rcs = append(rcs, newRollupConfig(rf, ""))
|
||||
}
|
||||
return preFunc, rcs
|
||||
}
|
||||
|
||||
var bbPool bytesutil.ByteBufferPool
|
||||
|
||||
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
|
||||
|
|
|
@ -4480,6 +4480,54 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`aggr_over_time(single-func)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `aggr_over_time("increase", rand(0)[:10s])`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{5.465672601448873, 6.642207999066246, 6.8400051805114295, 7.182425481980655, 5.1677922402706, 6.594060518641982},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("increase"),
|
||||
}}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`aggr_over_time(multi-func)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(aggr_over_time(("min_over_time", "count_over_time", "max_over_time"), round(rand(0),0.1)[:10s]))`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{0, 0, 0, 0, 0, 0},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("min_over_time"),
|
||||
}}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{0.9, 0.9, 1, 0.9, 1, 0.9},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("max_over_time"),
|
||||
}}
|
||||
r3 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{20, 20, 20, 20, 20, 20},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r3.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("rollup"),
|
||||
Value: []byte("count_over_time"),
|
||||
}}
|
||||
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`rollup_candlestick()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(rollup_candlestick(round(rand(0),0.01)[:10s]))`
|
||||
|
@ -5181,6 +5229,9 @@ func TestExecError(t *testing.T) {
|
|||
f(`alias(1, "foo", "bar")`)
|
||||
f(`lifetime()`)
|
||||
f(`lag()`)
|
||||
f(`aggr_over_time()`)
|
||||
f(`aggr_over_time(foo)`)
|
||||
f(`aggr_over_time("foo", bar, 1)`)
|
||||
|
||||
// Invalid argument type
|
||||
f(`median_over_time({}, 2)`)
|
||||
|
@ -5216,6 +5267,8 @@ func TestExecError(t *testing.T) {
|
|||
f(`label_transform(1, "foo", "bar", 4)`)
|
||||
f(`label_transform(1, "foo", "invalid(regexp", "baz`)
|
||||
f(`alias(1, 2)`)
|
||||
f(`aggr_over_time(1, 2)`)
|
||||
f(`aggr_over_time(("foo", "bar"), 3)`)
|
||||
|
||||
// Duplicate timeseries
|
||||
f(`(label_set(1, "foo", "bar") or label_set(2, "foo", "baz"))
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/histogram"
|
||||
|
@ -63,6 +64,45 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||
"rollup_delta": newRollupFuncOneArg(rollupFake),
|
||||
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
|
||||
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
|
||||
}
|
||||
|
||||
// rollupAggrFuncs are functions that can be passed to `aggr_over_time()`
|
||||
var rollupAggrFuncs = map[string]rollupFunc{
|
||||
// Standard rollup funcs from PromQL.
|
||||
"changes": rollupChanges,
|
||||
"delta": rollupDelta,
|
||||
"deriv": rollupDerivSlow,
|
||||
"deriv_fast": rollupDerivFast,
|
||||
"idelta": rollupIdelta,
|
||||
"increase": rollupIncrease, // + rollupFuncsRemoveCounterResets
|
||||
"irate": rollupIderiv, // + rollupFuncsRemoveCounterResets
|
||||
"rate": rollupDerivFast, // + rollupFuncsRemoveCounterResets
|
||||
"resets": rollupResets,
|
||||
"avg_over_time": rollupAvg,
|
||||
"min_over_time": rollupMin,
|
||||
"max_over_time": rollupMax,
|
||||
"sum_over_time": rollupSum,
|
||||
"count_over_time": rollupCount,
|
||||
"stddev_over_time": rollupStddev,
|
||||
"stdvar_over_time": rollupStdvar,
|
||||
"absent_over_time": rollupAbsent,
|
||||
|
||||
// Additional rollup funcs.
|
||||
"sum2_over_time": rollupSum2,
|
||||
"geomean_over_time": rollupGeomean,
|
||||
"first_over_time": rollupFirst,
|
||||
"last_over_time": rollupLast,
|
||||
"distinct_over_time": rollupDistinct,
|
||||
"increases_over_time": rollupIncreases,
|
||||
"decreases_over_time": rollupDecreases,
|
||||
"integrate": rollupIntegrate,
|
||||
"ideriv": rollupIderiv,
|
||||
"lifetime": rollupLifetime,
|
||||
"lag": rollupLag,
|
||||
"scrape_interval": rollupScrapeInterval,
|
||||
"tmin_over_time": rollupTmin,
|
||||
"tmax_over_time": rollupTmax,
|
||||
}
|
||||
|
||||
var rollupFuncsMayAdjustWindow = map[string]bool{
|
||||
|
@ -95,15 +135,128 @@ var rollupFuncsKeepMetricGroup = map[string]bool{
|
|||
"geomean_over_time": true,
|
||||
}
|
||||
|
||||
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
|
||||
fe, ok := expr.(*metricsql.FuncExpr)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))
|
||||
}
|
||||
if fe.Name != "aggr_over_time" {
|
||||
logger.Panicf("BUG: unexpected function name: %q; want `aggr_over_time`", fe.Name)
|
||||
}
|
||||
if len(fe.Args) != 2 {
|
||||
return nil, fmt.Errorf("unexpected number of args to aggr_over_time(); got %d; want %d", len(fe.Args), 2)
|
||||
}
|
||||
arg := fe.Args[0]
|
||||
var aggrFuncNames []string
|
||||
if se, ok := arg.(*metricsql.StringExpr); ok {
|
||||
aggrFuncNames = append(aggrFuncNames, se.S)
|
||||
} else {
|
||||
fe, ok := arg.(*metricsql.FuncExpr)
|
||||
if !ok || fe.Name != "" {
|
||||
return nil, fmt.Errorf("%s cannot be passed to aggr_over_time(); expecting quoted aggregate function name or a list of quoted aggregate function names",
|
||||
arg.AppendString(nil))
|
||||
}
|
||||
for _, e := range fe.Args {
|
||||
se, ok := e.(*metricsql.StringExpr)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%s cannot be passed here; expecting quoted aggregate function name", e.AppendString(nil))
|
||||
}
|
||||
aggrFuncNames = append(aggrFuncNames, se.S)
|
||||
}
|
||||
}
|
||||
if len(aggrFuncNames) == 0 {
|
||||
return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name")
|
||||
}
|
||||
for _, s := range aggrFuncNames {
|
||||
if rollupAggrFuncs[s] == nil {
|
||||
return nil, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s)
|
||||
}
|
||||
}
|
||||
return aggrFuncNames, nil
|
||||
}
|
||||
|
||||
func getRollupArgIdx(funcName string) int {
|
||||
funcName = strings.ToLower(funcName)
|
||||
if rollupFuncs[funcName] == nil {
|
||||
logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName)
|
||||
}
|
||||
if funcName == "quantile_over_time" {
|
||||
switch funcName {
|
||||
case "quantile_over_time", "aggr_over_time":
|
||||
return 1
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) (
|
||||
func(values []float64, timestamps []int64), []*rollupConfig, error) {
|
||||
preFunc := func(values []float64, timestamps []int64) {}
|
||||
if rollupFuncsRemoveCounterResets[name] {
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
removeCounterResets(values)
|
||||
}
|
||||
}
|
||||
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
|
||||
return &rollupConfig{
|
||||
TagValue: tagValue,
|
||||
Func: rf,
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
Window: window,
|
||||
MayAdjustWindow: rollupFuncsMayAdjustWindow[name],
|
||||
LookbackDelta: lookbackDelta,
|
||||
Timestamps: sharedTimestamps,
|
||||
}
|
||||
}
|
||||
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
||||
dst = append(dst, newRollupConfig(rollupMin, "min"))
|
||||
dst = append(dst, newRollupConfig(rollupMax, "max"))
|
||||
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
|
||||
return dst
|
||||
}
|
||||
var rcs []*rollupConfig
|
||||
switch name {
|
||||
case "rollup":
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
case "rollup_rate", "rollup_deriv":
|
||||
preFuncPrev := preFunc
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
preFuncPrev(values, timestamps)
|
||||
derivValues(values, timestamps)
|
||||
}
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
case "rollup_increase", "rollup_delta":
|
||||
preFuncPrev := preFunc
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
preFuncPrev(values, timestamps)
|
||||
deltaValues(values)
|
||||
}
|
||||
rcs = appendRollupConfigs(rcs)
|
||||
case "rollup_candlestick":
|
||||
rcs = append(rcs, newRollupConfig(rollupFirst, "open"))
|
||||
rcs = append(rcs, newRollupConfig(rollupLast, "close"))
|
||||
rcs = append(rcs, newRollupConfig(rollupMin, "low"))
|
||||
rcs = append(rcs, newRollupConfig(rollupMax, "high"))
|
||||
case "aggr_over_time":
|
||||
aggrFuncNames, err := getRollupAggrFuncNames(expr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid args to %s: %s", expr.AppendString(nil), err)
|
||||
}
|
||||
for _, aggrFuncName := range aggrFuncNames {
|
||||
if rollupFuncsRemoveCounterResets[aggrFuncName] {
|
||||
// There is no need to save the previous preFunc, since it is either empty or the same.
|
||||
preFunc = func(values []float64, timestamps []int64) {
|
||||
removeCounterResets(values)
|
||||
}
|
||||
}
|
||||
rf := rollupAggrFuncs[aggrFuncName]
|
||||
rcs = append(rcs, newRollupConfig(rf, aggrFuncName))
|
||||
}
|
||||
default:
|
||||
rcs = append(rcs, newRollupConfig(rf, ""))
|
||||
}
|
||||
return preFunc, rcs, nil
|
||||
}
|
||||
|
||||
func getRollupFunc(funcName string) newRollupFunc {
|
||||
|
@ -489,6 +642,15 @@ func newRollupFuncOneArg(rf rollupFunc) newRollupFunc {
|
|||
}
|
||||
}
|
||||
|
||||
func newRollupFuncTwoArgs(rf rollupFunc) newRollupFunc {
|
||||
return func(args []interface{}) (rollupFunc, error) {
|
||||
if err := expectRollupArgsNum(args, 2); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rf, nil
|
||||
}
|
||||
}
|
||||
|
||||
func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
|
||||
if err := expectRollupArgsNum(args, 3); err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -100,3 +100,6 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
|
|||
Example: `share_gt_over_time(up[24h], 0)` - returns service availability for the last 24 hours.
|
||||
- `tmin_over_time(m[d])` - returns timestamp for the minimum value for `m` over `d` time range.
|
||||
- `tmax_over_time(m[d])` - returns timestamp for the maximum value for `m` over `d` time range.
|
||||
- `aggr_over_time(("aggr_func1", "aggr_func2", ...), m[d])` - simultaneously calculates all the listed `aggr_func*` for `m` over `d` time range.
|
||||
`aggr_func*` can contain any functions that accept range vector. For instance, `aggr_over_time(("min_over_time", "max_over_time", "rate"), m[d])`
|
||||
would calculate `min_over_time`, `max_over_time` and `rate` for `m[d]`.
|
||||
|
|
|
@ -53,6 +53,7 @@ var rollupFuncs = map[string]bool{
|
|||
"rollup_delta": true,
|
||||
"rollup_increase": true,
|
||||
"rollup_candlestick": true,
|
||||
"aggr_over_time": true,
|
||||
}
|
||||
|
||||
// IsRollupFunc returns whether funcName is known rollup function.
|
||||
|
|
Loading…
Reference in a new issue