app/vmselect/promql: allow passing multiple args to aggregate functions such as avg(q1, q2, q3)

This commit is contained in:
Aliaksandr Valialkin 2020-08-15 01:15:01 +03:00
parent cd96248480
commit 7d89fafe1a
3 changed files with 46 additions and 14 deletions

View file

@ -65,14 +65,25 @@ func getAggrFunc(s string) aggrFunc {
func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc {
return func(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 1); err != nil {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false)
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false)
}
}
func getAggrTimeseries(args [][]*timeseries) ([]*timeseries, error) {
if len(args) == 0 {
return nil, fmt.Errorf("expecting at least one arg")
}
tss := args[0]
for _, arg := range args[1:] {
tss = append(tss, arg...)
}
return tss, nil
}
func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.ModifierExpr) {
groupOp := strings.ToLower(modifier.Op)
switch groupOp {
@ -126,8 +137,8 @@ func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeserie
}
func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 1); err != nil {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
@ -138,7 +149,7 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
// Only a single time series per group must be returned
limit = 1
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, true)
return aggrFuncExt(afe, tss, &afa.ae.Modifier, limit, true)
}
func aggrFuncGroup(tss []*timeseries) []*timeseries {
@ -434,8 +445,8 @@ func aggrFuncMode(tss []*timeseries) []*timeseries {
}
func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 1); err != nil {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
@ -476,7 +487,7 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
}
return tss
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, true)
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, true)
}
// modeNoNaNs returns mode for a.
@ -811,13 +822,13 @@ func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) {
}
func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 1); err != nil {
tss, err := getAggrTimeseries(afa.args)
if err != nil {
return nil, err
}
phis := evalNumber(afa.ec, 0.5)[0].Values
afe := newAggrQuantileFunc(phis)
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, false)
return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false)
}
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries {

View file

@ -3370,6 +3370,28 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum(multi-args)`, func(t *testing.T) {
t.Parallel()
q := `sum(1, 2, 3)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{6, 6, 6, 6, 6, 6},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum(union-args)`, func(t *testing.T) {
t.Parallel()
q := `sum((1, 2, 3))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`sum(scalar) by ()`, func(t *testing.T) {
t.Parallel()
q := `sum(123) by ()`
@ -5952,7 +5974,6 @@ func TestExecError(t *testing.T) {
f(`label_move()`)
f(`median_over_time()`)
f(`median()`)
f(`median("foo", "bar")`)
f(`keep_last_value()`)
f(`keep_next_value()`)
f(`interpolate()`)
@ -6054,7 +6075,6 @@ func TestExecError(t *testing.T) {
) + 10`)
// Invalid aggregates
f(`sum(1, 2)`)
f(`sum(1) foo (bar)`)
f(`sum foo () (bar)`)
f(`sum(foo) by (1)`)

View file

@ -37,6 +37,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `offset` may be negative. For example, `q offset -1h`.
- [Range duration](https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors) and [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be fractional. For instance, `rate(node_network_receive_bytes_total[1.5m] offset 0.5d)`.
- `default` binary operator. `q1 default q2` fills gaps in `q1` with the corresponding values from `q2`.
- Most aggregate functions accept arbitrary number of args. For example, `avg(q1, q2, q3)` would return the average values for every point across `q1`, `q2` and `q3`.
- `histogram_quantile` accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706).
- `if` binary operator. `q1 if q2` removes values from `q1` for missing values from `q2`.
- `ifnot` binary operator. `q1 ifnot q2` removes values from `q1` for existing values from `q2`.